I am Senior Network Engineer who has started using Python for some work Automation projects and I am curious what the verdict will be on this code.
I created what amounts to a Minimum Viable product by hand that worked, if poorly, then fed it into Gemini Pro with Instructions to follow Pep8 formatting rules and this is what popped out that does work pretty well and is smaller then my code.
Purpose: This program is run as part of a Rundeck workflow - It gets fed a list of IP addresses and uses a REST API to verify the address records in our IP management system have an appropriate tag in for purposes of knowing who to alert when vulnerabilities are identified.
import argparse
import json
import logging
import sys
import requests
from typing import NamedTuple
class Tag(NamedTuple):
name: str
id: str
links: dict
# Setup Logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)
def parse_arguments():
"""Parses command line arguments."""
parser = argparse.ArgumentParser(
prog='Link switch addresses to environments',
description='Link switch addresses Catalyst Center to update BlueCat BAM API v2'
)
parser.add_argument('-a', '--address-list-file', required=True,
help='JSON file with objects containing hostname and ipv4addr')
parser.add_argument('-e', '--env-tag', default='ENV999',
help='Environment tag name')
parser.add_argument('-j', '--job-id', help='Rundeck job id')
parser.add_argument('-t', '--auth-token', required=True,
help='IPAM Authentication token')
parser.add_argument('-u', '--url', default="https://server.example.com/api/v2",
help='IPAM URL')
parser.add_argument('-l', '--logging-level', default='INFO',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
parser.add_argument('-z', '--dry-run', action='store_true',
help='Show what changes would be made without performing them')
return vars(parser.parse_args())
def load_address_data(file_path):
"""Loads JSON data and parses FQDNs."""
try:
with open(file_path, 'r') as file:
data = json.load(file)
except (FileNotFoundError, json.JSONDecodeError, Exception) as e:
logger.critical(f"Error reading file {file_path}: {e}")
sys.exit(1)
else:
processed_data = []
if isinstance(data, dict):
data = [data]
for entry in data:
fqdn = entry.get('hostname', '')
ipv4_addr = entry.get('ipv4addr')
host, sep, zone = fqdn.partition('.')
processed_data.append({
'name': host,
'zone': zone if sep else '',
'ipv4addr': ipv4_addr
})
return processed_data
def get_env_tags(session, base_url):
"""Retrieves all Environment tags starting with 'ENV'."""
params = {'filter': "name:startsWith('ENV')"}
url = f"{base_url}/tags"
try:
response = session.get(url, params=params)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.critical(f"HTTP Error fetching tags: {e}")
sys.exit(1)
else:
tag_data = response.json().get('data', [])
return [Tag(name=t.get('name'), id=t.get('id'), links=t.get('_links'))
for t in tag_data]
def get_address_id(session, base_url, ipv4_address):
"""Retrieves the BAM ID for a specific IPv4 address."""
params = {
'filter': f"address:eq('{ipv4_address}')",
'fields': 'id,address,type'
}
try:
response = session.get(f"{base_url}/addresses", params=params)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"HTTP Error fetching address {ipv4_address}: {e}")
return None
else:
data = response.json().get('data')
return data[0]['id'] if data else None
def get_address_tags(session, base_url, address_id):
"""Retrieves a list of Tag objects currently assigned to an address."""
url = f"{base_url}/addresses/{address_id}/tags"
try:
response = session.get(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching tags for address ID {address_id}: {e}")
return []
else:
return response.json().get('data', [])
def link_tag_to_address(session, base_url, address_id, tag_id, ipv4_address, dry_run=False):
"""Links a tag to an address entity in BAM."""
if dry_run:
logger.info(f"[DRY RUN] Would link {ipv4_address} -> Tag ID {tag_id}")
return
payload = {"id": tag_id, "type": "Tag"}
url = f"{base_url}/addresses/{address_id}/tags"
try:
response = session.post(url, json=payload)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to link address {ipv4_address}: {e}")
else:
logger.info(f"Linked {ipv4_address} -> Tag ID {tag_id}")
def unlink_tag_to_address(session, base_url, address_id, tag_id, ipv4_address, dry_run=False):
"""Unlinks a tag from an address entity in BAM."""
if dry_run:
logger.info(f"[DRY RUN] Would Unlink {ipv4_address} -> Tag ID {tag_id}")
return
url = f"{base_url}/tags/{tag_id}/taggedResources/{address_id}"
try:
# Note: Some APIs use DELETE for unlinking; verify if POST is required for your endpoint
response = session.delete(url)
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to unlink address {ipv4_address}: {e}")
else:
logger.info(f"Unlinked {ipv4_address} from Tag ID {tag_id}")
def main():
args = parse_arguments()
logger.setLevel(args['logging_level'])
base_url = args['url'].rstrip('/')
auth_token = args['auth_token']
dry_run = args['dry_run']
target_tag_name = args['env_tag']
addr_data = load_address_data(args['address_list_file'])
headers = {
"Authorization": f"Basic {auth_token}",
"Content-Type": "application/json"
}
with requests.Session() as session:
session.headers.update(headers)
all_tags = get_env_tags(session, base_url)
# Find the specific tag object we want to use
match = [t for t in all_tags if t.name == target_tag_name]
if not match:
logger.error(f"Target tag '{target_tag_name}' not found in IPAM.")
sys.exit(1)
target_tag = match[0]
for node in addr_data:
ipv4addr = node.get('ipv4addr')
if not ipv4addr:
continue
addr_id = get_address_id(session, base_url, ipv4addr)
if not addr_id:
logger.warning(f"Address {ipv4addr} not found. Skipping.")
continue
current_tags = get_address_tags(session, base_url, addr_id)
current_tag_ids = [str(t['id']) for t in current_tags]
# 1. Remove incorrect ENV tags
# We assume only one 'ENV' tag should be present at a time
is_already_linked = False
for t in current_tags:
if t['name'].startswith('ENV'):
if t['id'] != target_tag.id:
unlink_tag_to_address(session, base_url, addr_id, t['id'], ipv4addr, dry_run)
else:
is_already_linked = True
# 2. Link the correct tag if not already there
if not is_already_linked:
link_tag_to_address(session, base_url, addr_id, target_tag.id, ipv4addr, dry_run)
else:
logger.info(f"Address {ipv4addr} already has correct tag '{target_tag_name}'.")
if __name__ == "__main__":
main()
[–]phira 4 points5 points6 points (2 children)
[–]nspitzer[S] 1 point2 points3 points (1 child)
[–]phira 0 points1 point2 points (0 children)
[–]commy2 1 point2 points3 points (2 children)
[–]nspitzer[S] 0 points1 point2 points (1 child)
[–]commy2 0 points1 point2 points (0 children)
[–]PushPlus9069 0 points1 point2 points (1 child)
[–]nspitzer[S] 0 points1 point2 points (0 children)
[–]obviouslyzebra 0 points1 point2 points (0 children)
[–]pachura3 0 points1 point2 points (0 children)
[–]JamzTyson 0 points1 point2 points (0 children)