Files
Tenantos-JNPR-L3/main.py
2025-08-14 03:42:31 +08:00

144 lines
5.6 KiB
Python

import argparse
import switchlib, tenantosapi
### Define API
TENANTOS:str = ""
AUTH_KEY:str = ""
ACTION_TAG:str = ""
VRF:str = ""
###
'''
# Header
header = {
"Authorization": AUTH_KEY,
"Content-Type": "application/json",
"Accept": "application/json"
}
# Tenantos API
def get_switch_info(server_id:int):
server_url = f'https://{TENANTOS}/api/servers/{server_id}/connections'
server_response = requests.get(server_url, headers=header)
data = server_response.json()
for value in data["result"]:
if "switchId" in value["related_data"]["meta"]:
switch:int = value["related_data"]["meta"]["switchId"]
port_id:int = value["related_data"]["meta"]["portId"]
switch_url = f"https://{TENANTOS}/api/networkDevices/{switch}"
switch_response = requests.get(switch_url, headers=header)
switch_data = switch_response.json()
switch_address:str = switch_data["result"]["host"]
switch_user:str = switch_data["result"]["managementUser"]
switch_pass:str = switch_data["result"]["managementPassword"]
for port in switch_data["result"]["ports"]:
if port["id"] == port_id:
port_name:str = port["name"]
return {"switch_address": switch_address, "switch_user": switch_user, "switch_pass": switch_pass, "port_name": port_name}
def check_tag(server_id:int):
exist:bool = False
tag_url = f'https://{TENANTOS}/api/servers/{server_id}'
tag_response = requests.get(tag_url, headers=header)
tag_data = tag_response.json()
tag_lists:list = tag_data["result"]["tags"]
for tag in tag_lists:
if tag["name"] == ACTION_TAG:
exist = True
return exist
def get_af(subnet_id:int):
type:int = 4
af_url = f'https://{TENANTOS}/api/subnets/{subnet_id}'
af_response = requests.get(af_url, headers=header)
af_data = af_response.json()
if af_data["result"]["type"] == "v4":
return type
else:
type = 6
return type
def get_cidr(subnet_id:int):
url = f'https://{TENANTOS}/api/subnets/{subnet_id}'
response = requests.get(url, headers=header)
data = response.json()
return data["result"]["cidr"]
'''
if __name__ == "__main__":
# Define arguments
parser = argparse.ArgumentParser(
prog="main.py"
)
parser.add_argument("--action", type=str, required=True)
parser.add_argument("--server", type=int, required=True)
# parser.add_argument("--subnet", type=int, required=True)
# parser.add_argument("--ip", type=str, required=True)
args = parser.parse_args()
server_info = tenantosapi.APIClient(TENANTOS, AUTH_KEY, args.server).get_all_info()
if args.acttion == "add":
'''
# Process
process = check_tag(args.server)
if process:
switch = get_switch_info(args.server)
ip = args.ip
af = get_af(args.subnet)
if args.action == "add":
switch_connect = ConnectHandler(
device_type = "juniper_junos",
host = switch["switch_address"],
username = switch["switch_user"],
password = switch["switch_pass"],
)
if af == 4:
config_command = []
config_command = [ f'set routing-instances {VRF} routing-options static route {ip}/32 next-hop {switch["port_name"]}.0' ]
elif af == 6:
config_command = []
cidr = get_cidr(args.subnet)
if cidr == 127:
subnet = ipaddress.ip_network(f'{ip}/127')
config_command = [ f'set interface {switch["port_name"]}.0 family inet6 address {subnet[0]}/127' ]
else:
show_command = [ f'show interfaces {switch["port_name"]}.0 terse | match inet6' ]
show_output = switch_connect.send_command(show_command)
if not show_output.strip():
ptp = ipaddress.ip_network(f'{re.findall(r'inet6\s+([\da-f:]+)', show_output)}/127')[1]
config_command = [ f'set routing-instances {VRF} routing-options rib {VRF}.inet6.0 static route {ip}/{cidr} next-hop {ptp}' ]
switch_connect.send_config_set(config_command)
switch_connect.disconnect()
elif args.action == "del":
switch_connect = ConnectHandler(
device_type = "juniper_junos",
host = switch["switch_address"],
username = switch["switch_user"],
password = switch["switch_pass"],
)
if af == 4:
config_command = []
config_command = [ f'delete routing-instances {VRF} routing-options static route {ip}/32 next-hop {switch["port_name"]}.0' ]
elif af == 6:
config_command = []
cidr = get_cidr(args.subnet)
if cidr == 127:
subnet = ipaddress.ip_network(f'{ip}/127')
config_command = [ f'delete interface {switch["port_name"]}.0 family inet6 address {subnet[0]}/127' ]
else:
show_command = [ f'show interfaces {switch["port_name"]}.0 terse | match inet6' ]
show_output = switch_connect.send_command(show_command)
if not show_output.strip():
ptp = ipaddress.ip_network(f'{re.findall(r'inet6\s+([\da-f:]+)', show_output)}/127')[1]
config_command = [ f'delete routing-instances {VRF} routing-options rib {VRF}.inet6.0 static route {ip}/{cidr} next-hop {ptp}' ]
switch_connect.send_config_set(config_command)
switch_connect.disconnect()
'''