import base64 import json import logging import subprocess import time from concurrent.futures import ThreadPoolExecutor, as_completed import requests from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 from cryptography.hazmat.primitives.hashes import SHA512 logger = logging.getLogger(__name__) def _get_ip(resolvers): try: output = subprocess.check_output( ["dig", "+short", *resolvers], stderr=subprocess.STDOUT, ) except subprocess.CalledProcessError as e: raise OSError("Unable to retrieve current IP") from e return output.decode("utf-8").strip() def _get_token(private_key_path, login, api_url): request = requests.Request( "POST", f"{api_url}/auth", json={ "login": login, "nonce": str(int(time.time() * 1000)), "read_only": False, "expiration_time": "30 minutes", "label": "Custom token", "global_key": True, }, ) prepped_request = request.prepare() with open(private_key_path, "rb") as file: private_key = serialization.load_pem_private_key( file.read(), password=None, backend=default_backend() ) signature = private_key.sign(prepped_request.body, PKCS1v15(), SHA512()) signature = base64.b64encode(signature) prepped_request.headers["Signature"] = signature.decode("ascii") with requests.Session() as session: response = session.send(prepped_request) if not response.ok: response.raise_for_status() response_data = response.json() return response_data["token"] def _get_domain(domain, token, api_url): headers = {"Authorization": f"Bearer {token}"} return requests.get(f"{api_url}/domains/{domain}/dns", headers=headers) def _get_domain_data(domains, token, api_url): with ThreadPoolExecutor(max_workers=10) as executor: futures = { executor.submit(_get_domain, domain, token, api_url): domain for domain in domains } for future in as_completed(futures): response = future.result() domain = futures[future] try: response.raise_for_status() except requests.HTTPError as e: logger.exception(f"Failed retrieving information for {domain}") continue yield {"domain": domain, **response.json()} def _update_domain(domain, payload, api_url, token): headers = {"Authorization": f"Bearer {token}"} return requests.put( f"{api_url}/domains/{domain}/dns", data=json.dumps(payload), headers=headers ) def _update_domains(updated_domains, api_url, token, read_only): if read_only: return with ThreadPoolExecutor(max_workers=10) as executor: futures = { executor.submit(_update_domain, domain, entries, api_url, token): domain for domain, entries in updated_domains.items() } for future in as_completed(futures): response = future.result() domain = futures[future] try: response.raise_for_status() except requests.HTTPError: logger.exception(f"Unable to update domain {domain}") continue logger.info(f"Updated domain {domain}") def detect(domains, resolvers, credentials, token, api_url, read_only): ip = _get_ip(resolvers) updated_domains = {} if all(credentials): token = _get_token(*credentials, api_url) domain_data = _get_domain_data(domains, token, api_url) for data in domain_data: dns_entries = data["dnsEntries"] domain = data["domain"] updated_entries = [] for dns_entry in dns_entries: updated_entries.append( { **dns_entry, "content": ip, } ) if dns_entries == updated_entries: continue updated_domains[domain] = {"dnsEntries": updated_entries} _update_domains(updated_domains, api_url, token, read_only)