import argparse, requests, json, ipaddress, re from netmiko import ConnectHandler ### Define API TENANTOS:str = "" AUTH_KEY:str = "" ACTION_TAG:str = "" VRF:str = "" ### # Header header = { "Authorization": AUTH_KEY, "Content-Type": "application/json", "Accept": "application/json" } # Tenantos API def get_switch_info(server_id:int): server_url = f'https://{TENANTOS}/api/servers/{server_id}/connections' server_response = requests.get(server_url, headers=header) data = server_response.json() for value in data["result"]: if "switchId" in value["related_data"]["meta"]: switch:int = value["related_data"]["meta"]["switchId"] port_id:int = value["related_data"]["meta"]["portId"] switch_url = f"https://{TENANTOS}/api/networkDevices/{switch}" switch_response = requests.get(switch_url, headers=header) switch_data = switch_response.json() switch_address:str = switch_data["result"]["host"] switch_user:str = switch_data["result"]["managementUser"] switch_pass:str = switch_data["result"]["managementPassword"] for port in switch_data["result"]["ports"]: if port["id"] == port_id: port_name:str = port["name"] return {"switch_address": switch_address, "switch_user": switch_user, "switch_pass": switch_pass, "port_name": port_name} def check_tag(server_id:int): exist:bool = False tag_url = f'https://{TENANTOS}/api/servers/{server_id}' tag_response = requests.get(tag_url, headers=header) tag_data = tag_response.json() tag_lists:list = tag_data["result"]["tags"] for tag in tag_lists: if tag["name"] == ACTION_TAG: exist = True return exist def get_af(subnet_id:int): type:int = 4 af_url = f'https://{TENANTOS}/api/subnets/{subnet_id}' af_response = requests.get(af_url, headers=header) af_data = af_response.json() if af_data["result"]["type"] == "v4": return type else: type = 6 return type def get_cidr(subnet_id:int): url = f'https://{TENANTOS}/api/subnets/{subnet_id}' response = requests.get(url, headers=header) data = response.json() return data["result"]["cidr"] if __name__ == "__main__": # Define arguments parser = argparse.ArgumentParser( prog="main.py" ) parser.add_argument("--action", type=str, required=True) parser.add_argument("--server", type=int, required=True) parser.add_argument("--subnet", type=int, required=True) parser.add_argument("--ip", type=str, required=True) args = parser.parse_args() # Process process = check_tag(args.server) if process: switch = get_switch_info(args.server) ip = args.ip af = get_af(args.subnet) if args.action == "add": switch_connect = ConnectHandler( device_type = "juniper_junos", host = switch["switch_address"], username = switch["switch_user"], password = switch["switch_pass"], ) if af == 4: config_command = [] config_command = [ f'set routing-instances {VRF} routing-options static route {ip}/32 next-hop {switch["port_name"]}.0' ] elif af == 6: config_command = [] cidr = get_cidr(args.subnet) if cidr == 127: subnet = ipaddress.ip_network(f'{ip}/127') config_command = [ f'set interface {switch["port_name"]}.0 family inet6 address {subnet[0]}/127' ] else: show_command = [ f'show interfaces {switch["port_name"]}.0 terse | match inet6' ] show_output = switch_connect.send_command(show_command) if not show_output.strip(): ptp = ipaddress.ip_network(f'{re.findall(r'inet6\s+([\da-f:]+)', show_output)}/127')[1] config_command = [ f'set routing-instances {VRF} routing-options static route {ip}/{cidr} next-hop {ptp}' ] switch_connect.send_config_set(config_command) switch_connect.disconnect() elif args.action == "del": switch_connect = ConnectHandler( device_type = "juniper_junos", host = switch["switch_address"], username = switch["switch_user"], password = switch["switch_pass"], ) if af == 4: config_command = [] config_command = [ f'delete routing-instances {VRF} routing-options static route {ip}/32 next-hop {switch["port_name"]}.0' ] elif af == 6: config_command = [] cidr = get_cidr(args.subnet) if cidr == 127: subnet = ipaddress.ip_network(f'{ip}/127') config_command = [ f'delete interface {switch["port_name"]}.0 family inet6 address {subnet[0]}/127' ] else: show_command = [ f'show interfaces {switch["port_name"]}.0 terse | match inet6' ] show_output = switch_connect.send_command(show_command) if not show_output.strip(): ptp = ipaddress.ip_network(f'{re.findall(r'inet6\s+([\da-f:]+)', show_output)}/127')[1] config_command = [ f'delete routing-instances {VRF} routing-options static route {ip}/{cidr} next-hop {ptp}' ] switch_connect.send_config_set(config_command) switch_connect.disconnect()