Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TIDY: Moving tools into this repo #38

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .github/workflows/pynetbox.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: Pylint-Tests-Codecov

on:
push:
branches:
- master
pull_request:
paths:
- "pynetbox_data_uploader/**"
- ".github/workflows/pynetbox.yaml"

jobs:
Pylint-Tests-Codecov:
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: ["3.x"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: "pip"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
cd pynetbox_query
pip install -r requirements.txt

- name: Analyse with pylint
run: cd pynetbox_query && pylint $(git ls-files '*.py')

- name: Run tests and collect coverage
run: cd pynetbox_query && python3 -m pytest

- name: Run tests and collect coverage
run: cd pynetbox_query && python3 -m pytest . --cov-report xml:coverage.xml --cov

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
token: ${{secrets.CODECOV_TOKEN}}
files: ./pynetbox_query/coverage.xml
45 changes: 45 additions & 0 deletions .github/workflows/reverse_ssl_cert_chain.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: Pylint-Tests-Codecov

on:
push:
branches:
- master
pull_request:
paths:
- "reverse_ssl_cert_chain/**"
- ".github/workflows/reverse_ssl_cert_chain.yaml"

jobs:
Pylint-Tests-Codecov:
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: ["3.x"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: "pip"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
cd reverse_ssl_cert_chain
pip install -r requirements.txt

- name: Analyse with pylint
run: cd reverse_ssl_cert_chain && pylint $(git ls-files '*.py')

- name: Run tests and collect coverage
run: cd reverse_ssl_cert_chain && python3 -m pytest

- name: Run tests and collect coverage
run: cd reverse_ssl_cert_chain && python3 -m pytest . --cov-report xml:coverage.xml --cov

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: ./reverse_ssl_cert_chain/coverage.xml
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
# cloud-ops-tools
A collection of tools used by the cloud ops teams

## pynetbox_query

A Python package to bulk upload systems data to Netbox from files creating devices and interfaces.
[More Here](pynetbox_query/)

## reverse_ssl_cert_chain

A Python script to reverse the SSL certificate chain order. For example, a certificate as CA -> Root would output as Root -> CA.
[More Here](reverse_ssl_cert_chain/)
4 changes: 4 additions & 0 deletions pynetbox_query/.coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[report]

exclude_lines =
if __name__ == .__main__.:
10 changes: 10 additions & 0 deletions pynetbox_query/.pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[FORMAT]
# Black will enforce 88 chars on Python code
# this will enforce 120 chars on docs / comments
max-line-length=120

# Disable various warnings:
# C0114: Missing module string - we don't need module strings for the small repo
# W0212: Access to protected members - Don't know how to fix this :)

disable=C0114, W0212
Empty file.
24 changes: 24 additions & 0 deletions pynetbox_query/pynetboxquery/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2023 United Kingdom Research and Innovation
from importlib import import_module
import sys
from pynetboxquery.utils.error_classes import UserMethodNotFoundError


def main():
"""
This function will run the correct user method for the action specified in the CLI.
"""
user_methods_names = ["upload_devices_to_netbox", "validate_data_fields_in_netbox"]
for user_method in user_methods_names:
user_method_module = import_module(f"pynetboxquery.user_methods.{user_method}")
user_method_class = getattr(user_method_module, "Main")()
aliases = user_method_class.aliases()
if sys.argv[1] in aliases:
user_method_module.Main().main()
return
raise UserMethodNotFoundError(f"The user method {sys.argv[1]} was not found.")


if __name__ == "__main__":
main()
Empty file.
13 changes: 13 additions & 0 deletions pynetbox_query/pynetboxquery/netbox_api/netbox_connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2023 United Kingdom Research and Innovation
from pynetbox import api


def api_object(url: str, token: str) -> api:
"""
This function returns the Pynetbox Api object used to interact with Netbox.
:param url: The Netbox URL.
:param token: User Api token.
:return: The Pynetbox api object.
"""
return api(url, token)
33 changes: 33 additions & 0 deletions pynetbox_query/pynetboxquery/netbox_api/netbox_create.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2023 United Kingdom Research and Innovation
from typing import Union, Dict, List


class NetboxCreate:
"""
This class contains methods that will interact create objects in Netbox.
"""

def __init__(self, api):
"""
This initialises the class with the api object to be used by methods.
"""
self.netbox = api

def create_device(self, data: Union[Dict, List]) -> bool:
"""
This method uses the pynetbox Api to create a device in Netbox.
:param data: A list or a single dictionary containing data required to create devices in Netbox.
:return: Returns bool if the devices where made or not.
"""
devices = self.netbox.dcim.devices.create(data)
return bool(devices)

def create_device_type(self, data: Union[Dict, List]) -> bool:
"""
This method creates a new device type in Netbox.
:param data: A list or single dictionary containing data required to create device types in Netbox.
:return: Returns bool if the device types where made or not.
"""
device_type = self.netbox.dcim.device_types.create(data)
return bool(device_type)
43 changes: 43 additions & 0 deletions pynetbox_query/pynetboxquery/netbox_api/netbox_get_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2023 United Kingdom Research and Innovation
from pynetboxquery.utils.device_dataclass import Device


# pylint: disable = R0903,C0115
class NetboxGetId:
def __init__(self, api):
self.netbox = api

def get_id(self, device: Device, attr: str) -> Device:
"""
This method queries Netbox for ID's of values.
:param device: The device dataclass.
:param attr: The attribute to query Netbox for.
:return: Returns an updated copy of the device dataclass.
"""
value = getattr(device, attr)
netbox_id = ""
if attr in ["status", "face", "airflow", "position", "name", "serial"]:
return getattr(device, attr)
match attr:
case "tenant":
netbox_id = self.netbox.tenancy.tenants.get(name=value).id
case "device_role":
netbox_id = self.netbox.dcim.device_roles.get(name=value).id
case "manufacturer":
netbox_id = self.netbox.dcim.manufacturers.get(name=value).id
case "device_type":
netbox_id = self.netbox.dcim.device_types.get(slug=value).id
case "site":
netbox_id = self.netbox.dcim.sites.get(name=value).id
case "location":
if isinstance(device.site, int):
site_slug = self.netbox.dcim.sites.get(id=device.site).slug
else:
site_slug = device.site.replace(" ", "-").lower()
netbox_id = self.netbox.dcim.locations.get(
name=value, site=site_slug
).id
case "rack":
netbox_id = self.netbox.dcim.racks.get(name=value).id
return netbox_id
101 changes: 101 additions & 0 deletions pynetbox_query/pynetboxquery/netbox_api/validate_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2023 United Kingdom Research and Innovation
from typing import List
from pynetbox import api
from pynetboxquery.utils.device_dataclass import Device


# Disabling this Pylint warning as it is unnecessary.
# pylint: disable = R0903
class ValidateData:
"""
This class contains methods that check if things exist in Netbox or not.
"""

def validate_data(
self, device_list: List[Device], netbox_api: api, fields: List[str]
):
"""
This method will take a list of dataclasses.
Validate any data specified by the key word arguments.
:param device_list: A list of Device dataclasses containing the data.
:param netbox_api: The Api Object for Netbox.
:param fields: The Device fields to check.
"""
for field in fields:
results = self._call_validation_methods(device_list, netbox_api, field)
for result in results:
print(f"{result}\n")

def _call_validation_methods(
self, device_list: List[Device], netbox_api: api, field: str
) -> List[str]:
"""
This method will validate the field data by calling the correct validate method.
:param device_list: List of devices to validate.
:param netbox_api: The Api Object for Netbox.
:param field: Field to validate.
:return: Returns the results of the validation call.
"""
match field:
case "name":
device_names = [device.name for device in device_list]
results = self._check_list_device_name_in_netbox(
device_names, netbox_api
)
case "device_type":
device_types = [device.device_type for device in device_list]
results = self._check_list_device_type_in_netbox(
device_types, netbox_api
)
case _:
results = [f"Could not find a field for the argument: {field}."]
return results

@staticmethod
def _check_device_name_in_netbox(device_name: str, netbox_api: api) -> bool:
"""
This method will check if a device exists in Netbox.
:param device_name: The name of the device.
:return: Returns bool.
"""
device = netbox_api.dcim.devices.get(name=device_name)
return bool(device)

def _check_list_device_name_in_netbox(
self, device_names: List[str], netbox_api: api
) -> List[str]:
"""
This method will call the validate method on each device name in the list and return the results.
:param device_names: List of device names to check.
:return: Results of the check.
"""
results = []
for name in device_names:
in_netbox = self._check_device_name_in_netbox(name, netbox_api)
results += [f"Device {name} exists in Netbox: {in_netbox}."]
return results

@staticmethod
def _check_device_type_in_netbox(device_type: str, netbox_api: api) -> bool:
"""
This method will check if a device type exists in Netbox.
:param device_type: The device type.
:return: Returns bool.
"""
device_type = netbox_api.dcim.device_types.get(slug=device_type)
return bool(device_type)

def _check_list_device_type_in_netbox(
self, device_type_list: List[str], netbox_api: api
) -> List[str]:
"""
This method will call the validate method on each device type in the list and return the results.
:param device_type_list: List of device types to check.
:return: Results of the check.
"""
results = []
for device_type in device_type_list:
in_netbox = self._check_device_type_in_netbox(device_type, netbox_api)
results += [f"Device type {device_type} exists in Netbox: {in_netbox}."]
return results
Empty file.
47 changes: 47 additions & 0 deletions pynetbox_query/pynetboxquery/user_methods/abstract_user_method.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2023 United Kingdom Research and Innovation
from abc import ABC, abstractmethod
from typing import List, Dict
from argparse import ArgumentParser


class AbstractUserMethod(ABC):
"""
This Abstract class provides a template for user methods.
"""

def _collect_kwargs(self) -> Dict:
"""
This method collects the arguments from the subparser into a dictionary of kwargs.
:return: Dictionary of kwargs.
"""
main_parser = self._subparser()
args = main_parser.parse_args()
kwargs = vars(args)
return kwargs

@abstractmethod
def _subparser(self) -> ArgumentParser:
"""
This method creates a subparser with specific arguments to the user method.
:return: Returns the main parser that should contain the subparser information.
"""

@staticmethod
@abstractmethod
def aliases() -> List[str]:
"""Returns the aliases viable for this user_method."""

def main(self):
"""
This method gets the arguments and calls the run method with them.
"""
kwargs = self._collect_kwargs()
self._run(**kwargs)

@staticmethod
@abstractmethod
def _run(url: str, token: str, file_path: str, **kwargs):
"""
This the main method in the user script. It contains all calls needed to perform the action.
"""
Loading
Loading