import base64 import json import logging from concurrent.futures import ThreadPoolExecutor, as_completed from secrets import token_urlsafe from typing import Generator import requests from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 from cryptography.hazmat.primitives.hashes import SHA512 from transip_client.utils import import_string logger = logging.getLogger(__name__) API_URL = "https://api.transip.nl/v6" # Note that tokens cannot be recreated when an existing token is present def _get_token(private_key_path: str, login: str, api_url: str) -> str: request = requests.Request( "POST", f"{api_url}/auth", json={ "login": login, "nonce": token_urlsafe(), "read_only": False, "expiration_time": "30 minutes", "label": "Trans IP client", "global_key": True, }, ) prepped_request = request.prepare() with open(private_key_path, "rb") as file: private_key = serialization.load_pem_private_key( file.read(), password=None, backend=default_backend() ) signature = private_key.sign(prepped_request.body, PKCS1v15(), SHA512()) signature = base64.b64encode(signature) prepped_request.headers["Signature"] = signature.decode("ascii") logger.info(f"Retrieving token from {api_url} for {login}") with requests.Session() as session: response = session.send(prepped_request) if not response.ok: response.raise_for_status() response_data = response.json() return response_data["token"] def _get_domain(domain: str, token: str, api_url: str) -> requests.Response: logger.info(f"Retrieving domain information for {domain} from {api_url}") headers = {"Authorization": f"Bearer {token}"} return requests.get(f"{api_url}/domains/{domain}/dns", headers=headers) def _get_domain_data( domains: list[str], token: str, api_url: str ) -> Generator[dict, None, None]: with ThreadPoolExecutor(max_workers=10) as executor: futures = { executor.submit(_get_domain, domain, token, api_url): domain for domain in domains } for future in as_completed(futures): domain = futures[future] try: response = future.result() response.raise_for_status() except requests.HTTPError: logger.exception(f"Failed retrieving information for {domain}") continue yield {"domain": domain, **response.json()} def _update_domain( domain: str, payload: dict, api_url: str, token: str ) -> requests.Response: logger.info(f"Updating domain {domain} at {api_url}") headers = {"Authorization": f"Bearer {token}"} return requests.put( f"{api_url}/domains/{domain}/dns", data=json.dumps(payload), headers=headers ) def _update_domains( updated_domains: dict, api_url: str, token: str, read_only: bool ) -> None: if read_only: return with ThreadPoolExecutor(max_workers=10) as executor: futures = { executor.submit(_update_domain, domain, entries, api_url, token): domain for domain, entries in updated_domains.items() } for future in as_completed(futures): response = future.result() domain = futures[future] try: response.raise_for_status() except requests.HTTPError: logger.exception(f"Unable to update domain {domain}") continue logger.info(f"Updated domain {domain} at {api_url}") def detect( domains: list[str], credentials: tuple[str, str], adapter_class: str, read_only: bool, ) -> None: _adapter_class = import_string(adapter_class) adapter = _adapter_class() ip = adapter.get_ip() token = _get_token(*credentials, API_URL) domain_data = _get_domain_data(domains, token, API_URL) updated_domains = {} for data in domain_data: dns_entries = data["dnsEntries"] domain = data["domain"] updated_entries = [] for dns_entry in dns_entries: updated_entries.append( { **dns_entry, "content": ip, } ) if dns_entries == updated_entries: logger.info(f"No changes detected for {domain}, skipping...") continue updated_domains[domain] = {"dnsEntries": updated_entries} _update_domains(updated_domains, API_URL, token, read_only)