#!/usr/bin/python3
import requests
import json
import sys
import urllib3
import time # Import time for sleep functionality
from datetime import datetime # Import datetime for timestamps
requests.packages.urllib3.disable_warnings()
def x2_login(board_addr, username, password):
url = "https://" + board_addr + "/api2/tokens"
data = {"username": username, "password": password}
headers = {"content-type": "application/json"}
r = requests.post(url, headers=headers, json=data, verify=False)
return json.loads(r.content)
def x2_get_port(board_addr, token, labelname):
url = "https://" + board_addr + "/api2/ports"
headers = {"content-type": "application/json", "Authorization": token}
r = requests.get(url, headers=headers, verify=False)
data = json.loads(r.content)
port_id = None
for item in data:
if item.get("config", {}).get("label") == labelname:
port_id = item["id"]
break
if port_id is not None:
print(f"ID of the port with label {labelname}: {port_id}")
else:
print("Port with the requested label not found.")
link_status = None
for item in data:
if item.get("config", {}).get("label") == "PortX":
link_status = item["state"]["link"]
break
if link_status is not None:
print(f"Link status of the port with label {labelname}: {link_status}")
return link_status
else:
print("Port with label 'PortX' not found.")
return "notKnown"
def x2_get_ruleset(board_addr, token, ruleset_down, ruleset_up):
url = "https://" + board_addr + "/api2/tm/rulesets"
headers = {"content-type": "application/json", "Authorization": token}
r = requests.get(url, headers=headers, verify=False)
data = json.loads(r.content)
down_ruleset_id = None
up_ruleset_id = None
for item in data:
if item.get("name") == ruleset_down:
down_ruleset_id = item["id"]
print(
f"ID of the ruleset to be applied if the port is down is: {down_ruleset_id}"
)
elif item.get("name") == ruleset_up:
up_ruleset_id = item["id"]
print(
f"ID of the ruleset to be applied if the port is up is: {up_ruleset_id}"
)
# Check if ruleset IDs were found
if down_ruleset_id is None:
print(f"Error: Ruleset '{ruleset_down}' not found.")
if up_ruleset_id is None:
print(f"Error: Ruleset '{ruleset_up}' not found.")
return down_ruleset_id, up_ruleset_id
def x2_get_active_ruleset(board_addr, token):
url = "https://" + board_addr + "/api2/tm"
headers = {"content-type": "application/json", "Authorization": token}
r = requests.get(url, headers=headers, verify=False)
data = json.loads(r.content)
active_ruleset_id = data.get("active_ruleset")
if active_ruleset_id is not None:
return active_ruleset_id
else:
print("Active ruleset not found.")
return None
def x2_apply_ruleset(board_addr, token, ruleset_id):
url = "https://" + board_addr + "/api2/tm"
data = {"active_ruleset": ruleset_id}
headers = {"content-type": "application/json", "Authorization": token}
r = requests.put(url, headers=headers, json=data, verify=False)
if __name__ == "__main__":
# Main execution with loop
board_url = "x2-device-ip"
username = "admin"
password = "Adminadmin1"
labelname = "PortX"
ruleset_down = "Ruleset1" # Apply this ruleset if the port is down
ruleset_up = "Ruleset2" # Apply this ruleset if the port is up
X = 10 # Set the interval in seconds
iterations = 5 # Set the number of iterations
# Counter for loop iterations
loop_counter = 0
# Loop to run every X seconds for the specified number of iterations
for _ in range(iterations):
loop_counter += 1
timestamp = datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
) # Get current timestamp
auth_reply = x2_login(board_url, username, password)
port_status = x2_get_port(board_url, auth_reply["token"], labelname)
down_ruleset_id, up_ruleset_id = x2_get_ruleset(
board_url, auth_reply["token"], ruleset_down, ruleset_up
)
active_ruleset_id = x2_get_active_ruleset(board_url, auth_reply["token"])
# Check if ruleset IDs were found before proceeding
if down_ruleset_id is None or up_ruleset_id is None:
print("Aborting action due to missing ruleset IDs.")
continue
if port_status == "down" and active_ruleset_id == down_ruleset_id:
print()
elif port_status == "down" and active_ruleset_id != down_ruleset_id:
print(
"The port is down, the relevant ruleset will be applied. TAKING ACTION!!!"
)
x2_apply_ruleset(board_url, auth_reply["token"], down_ruleset_id)
elif port_status == "up" and active_ruleset_id == up_ruleset_id:
print()
elif port_status == "up" and active_ruleset_id != up_ruleset_id:
print(
"The port is up, the relevant ruleset will be applied. TAKING ACTION!!!"
)
x2_apply_ruleset(board_url, auth_reply["token"], up_ruleset_id)
else:
print("This line should not execute under normal circumstances")
print("\n\n\n") # Add several new lines before the next iteration
time.sleep(X) # Wait for X seconds before the next iteration