add 2 libs, main.py not done yet

This commit is contained in:
arT
2025-08-14 03:42:31 +08:00
parent 0bcef4d88b
commit c5ee621215
3 changed files with 185 additions and 6 deletions

19
main.py
View File

@ -1,5 +1,5 @@
import argparse, requests, json, ipaddress, re import argparse
from netmiko import ConnectHandler import switchlib, tenantosapi
### Define API ### Define API
@ -9,7 +9,7 @@ ACTION_TAG:str = ""
VRF:str = "" VRF:str = ""
### ###
'''
# Header # Header
header = { header = {
@ -67,6 +67,8 @@ def get_cidr(subnet_id:int):
data = response.json() data = response.json()
return data["result"]["cidr"] return data["result"]["cidr"]
'''
if __name__ == "__main__": if __name__ == "__main__":
# Define arguments # Define arguments
@ -75,10 +77,16 @@ if __name__ == "__main__":
) )
parser.add_argument("--action", type=str, required=True) parser.add_argument("--action", type=str, required=True)
parser.add_argument("--server", type=int, required=True) parser.add_argument("--server", type=int, required=True)
parser.add_argument("--subnet", type=int, required=True) # parser.add_argument("--subnet", type=int, required=True)
parser.add_argument("--ip", type=str, required=True) # parser.add_argument("--ip", type=str, required=True)
args = parser.parse_args() args = parser.parse_args()
server_info = tenantosapi.APIClient(TENANTOS, AUTH_KEY, args.server).get_all_info()
if args.acttion == "add":
'''
# Process # Process
process = check_tag(args.server) process = check_tag(args.server)
if process: if process:
@ -133,3 +141,4 @@ if __name__ == "__main__":
config_command = [ f'delete routing-instances {VRF} routing-options rib {VRF}.inet6.0 static route {ip}/{cidr} next-hop {ptp}' ] config_command = [ f'delete routing-instances {VRF} routing-options rib {VRF}.inet6.0 static route {ip}/{cidr} next-hop {ptp}' ]
switch_connect.send_config_set(config_command) switch_connect.send_config_set(config_command)
switch_connect.disconnect() switch_connect.disconnect()
'''

108
switchlib.py Normal file
View File

@ -0,0 +1,108 @@
from netmiko import ConnectHandler
import ipaddress, json, re
class Switch_Praser:
def __init__(self, ip:str, user:str, password:str, port:int, vrf:str, vendor:str, action:str, prefix:str):
self.ip = ip
self.user = user
self.password = password
self.port = port
self.vrf = vrf
self.vendor = vendor
self.action = action
self.prefix = prefix
# If prefix exist in switch fib
def get_prefix_exist(self):
output = ""
command = ""
if self.vendor == "ciscoNxosSsh":
device = {
"device_type": "cisco_nxos",
"host": self.ip,
"username": self.user,
"password": self.password,
"port": self.port
}
switch = ConnectHandler(**device)
if ipaddress.ip_network(self.prefix).version == 4:
command = f"show ip route {self.prefix} vrf {self.vrf} | json"
elif ipaddress.ip_network(self.prefix).version == 6:
command = f"show ipv6 route {self.prefix} vrf {self.vrf} | json"
output = switch.send_command(command)
switch.disconnect()
if "TABLE_prefix" in json.loads(output)["TABLE_vrf"]["ROW_vrf"]["TABLE_addrf"]["ROW_addrf"]:
return True
else:
return False
# JunOS under development
def get_if_ipv6(self):
output = ""
command:str = ""
address:list = []
if self.vendor == "ciscoNxosSsh":
device = {
"device_type": "cisco_nxos",
"host": self.ip,
"username": self.user,
"password": self.password,
"port": self.port
}
switch = ConnectHandler(**device)
command = f"show ipv6 interface {self.port} vrf {self.vrf} | json"
for line in json.loads(switch.send_command(command))["TABLE_intf"]["ROW_intf"]["TABLE_addr"]["ROW_addr"]:
address.append(line["addr"])
switch.disconnect()
return address
# JunOS under development
def add_route(self):
command:list = []
if self.vendor == "ciscoNxosSsh":
device = {
"device_type": "cisco_nxos",
"host": self.ip,
"username": self.user,
"password": self.password,
"port": self.port
}
switch = ConnectHandler(**device)
if ipaddress.ip_network(self.prefix).version == 4:
command = [ f"vrf context {self.vrf}",
f"ip route {self.prefix} {self.port}" ]
elif ipaddress.ip_network(self.prefix).version == 6:
if_addrs = self.get_if_ipv6()
for line in if_addrs:
if re.match(r'^\s*(?:(?:[0-9A-Fa-f]{1,4}:){7}[0-9A-Fa-f]{1,4}|(?:[0-9A-Fa-f]{1,4}:){1,7}:|(?:[0-9A-Fa-f]{1,4}:){1,6}:[0-9A-Fa-f]{1,4}|(?:[0-9A-Fa-f]{1,4}:){1,5}(?::[0-9A-Fa-f]{1,4}){1,2}|(?:[0-9A-Fa-f]{1,4}:){1,4}(?::[0-9A-Fa-f]{1,4}){1,3}|(?:[0-9A-Fa-f]{1,4}:){1,3}(?::[0-9A-Fa-f]{1,4}){1,4}|(?:[0-9A-Fa-f]{1,4}:){1,2}(?::[0-9A-Fa-f]{1,4}){1,5}|[0-9A-Fa-f]{1,4}:(?:(?::[0-9A-Fa-f]{1,4}){1,6})|:(?:(?::[0-9A-Fa-f]{1,4}){1,7}|:))(?:%[0-9A-Za-z]{1,})?\/127\s*$', line) != None:
ptp = ipaddress.ip_network(line)[1]
break
command = [ f"vrf context {self.vrf}",
f"ipv6 route {self.prefix} {ptp}" ]
switch.send_config_set(command)
switch.disconnect()
def del_route(self):
output = ""
command:list = []
if self.vendor == "ciscoNxosSsh":
device = {
"device_type": "cisco_nxos",
"host": self.ip,
"username": self.user,
"password": self.password,
"port": self.port
}
switch = ConnectHandler(**device)
if ipaddress.ip_network(self.prefix).version == 4:
command = [ f"vrf context {self.vrf}",
f"no ip route {self.prefix} {self.port}" ]
elif ipaddress.ip_network(self.prefix).version == 6:
output = switch.send_command(f"show ipv6 route {self.prefix} vrf {self.vrf} | json")
if "TABLE_prefix" in json.loads(output)["TABLE_vrf"]["ROW_vrf"]["TABLE_addrf"]["ROW_addrf"]:
next_hop = json.loads(output)["TABLE_vrf"]["ROW_vrf"]["TABLE_addrf"]["ROW_addrf"]["TABLE_prefix"]["ROW_prefix"]["TABLE_path"]["ROW_path"]["ipnexthop"]
command = [ f"vrf context {self.vrf}",
f"no ipv6 route {self.prefix} {next_hop}" ]
switch.send_config_set(command)
switch.disconnect()

62
tenantosapi.py Normal file
View File

@ -0,0 +1,62 @@
import requests, ipaddress
class APIClient:
def __init__(self, url:str, key:str, serverid:int):
self.url = url
self.serverid = serverid
self.headers = {
"Authorization": f"Bearer {key}",
"Content-Type": "application/json",
"Accept": "application/json"
}
def get_all_info(self):
info:dict = {}
switch:dict = {}
connection_response = requests.get(f"https://{self.url}/api/servers/{self.serverid}/connections").json()
for value in connection_response["result"]:
if "related_data" in value and value["related_data"]["type"] == "snmp_switch":
network_device = requests.get(f"https://{self.url}/api/networkDevices/{value["related_data"]["meta"]["switchId"]}").json()
for port in network_device["result"]["ports"]:
if port["id"] == value["related_data"]["meta"]["portId"]:
switch = {
"host": network_device["result"]["host"],
"username": network_device["result"]["managementUser"],
"password": network_device["result"]["managementPassword"],
"vendor": network_device["result"]["managementVendor"],
"port": port["name"]
}
ipassignment_response = requests.get(f"https://{self.url}/api/servers/{self.serverid}/ipassignments").json()
for value in ipassignment_response["result"]:
if value["ipAttributes"]["isIpv4"]:
ipv4:list = []
ipv4.append(value["ip"])
elif value["ipAttributes"]["isIpv6"]:
ipv6:list = []
if value["ip"].endswith("/127"):
ipv6_ptp:str = ipaddress.ip_network(value["ip"])[1]
else:
ipv6:list = []
ipv6.append(value["ip"])
ipassignment = {
"ipv4": ipv4,
"ipv6": ipv6,
"ipv6_ptp": ipv6_ptp[0],
"ipv6_nh": ipv6_ptp[1]
}
server_response = requests.get(f"https://{self.url}/api/servers/{self.serverid}").json()
server = {
"tags": server_response["result"]["tags"]
}
info = {
"id": self.serverid,
"server": server,
"switch": switch,
"ipassignment": ipassignment
}
return info