Adapter refactor
All checks were successful
ci/woodpecker/push/tests Pipeline was successful

Allows using different ways to retrieve hosts IP address
This commit is contained in:
Sonny Bakker 2025-05-03 13:12:13 +02:00 committed by sonny
parent 97cf1a8f5c
commit 84558826fb
15 changed files with 861 additions and 710 deletions

View file

@ -1,24 +1,32 @@
import os
import subprocess
from logging.config import dictConfig
from pathlib import Path
from dotenv import load_dotenv
from ruamel.yaml import YAML
yml = YAML(typ="safe", pure=True)
env_path = Path("..") / ".env"
load_dotenv(dotenv_path=env_path)
default_config_path = Path(__file__).resolve().parent / "logging" / "default.yml"
logging_config_path = os.environ.get("LOGGING_CONFIG", default_config_path)
with open(logging_config_path, "r") as f:
logging_config = yml.load(f.read())
dictConfig(logging_config)
def get_current_version():
if "VERSION" in os.environ:
return os.environ["VERSION"]
try:
output = subprocess.check_output(
["git", "describe", "--tags"], universal_newlines=True
)
output = subprocess.check_output(["git", "describe", "--tags"], text=True)
return output.strip()
except (subprocess.CalledProcessError, OSError):
return ""
@ -32,6 +40,7 @@ try:
sentry_init(
dsn=os.environ.get("SENTRY_DSN"),
environment=os.environ.get("ENVIRONMENT", "production"),
send_default_pii=False,
release=VERSION,
)

View file

@ -0,0 +1,86 @@
import logging
import subprocess
import requests
logger = logging.getLogger(__name__)
class Adapter:
def get_ip(self) -> str:
raise NotImplementedError
class HTTPAdapter(Adapter):
service: str
def handle_response(self, response: requests.Response) -> str:
raise NotImplementedError
def get_params(self) -> dict:
raise NotImplementedError
def get_ip(self) -> str:
try:
response = requests.get(self.service, params=self.get_params(), timeout=5)
response.raise_for_status()
except requests.RequestException as e:
raise OSError(f"Unable to retrieve current IP from {self.service}") from e
ip = self.handle_response(response)
if not ip:
raise OSError(f"Unable to determine IP from response from {self.service}")
return ip
class IpifyAdapter(HTTPAdapter):
service: str = "https://api.ipify.org"
def get_params(self) -> dict:
return {"format": "text"}
def handle_response(self, response: requests.Response) -> str:
return response.text
class DNSAdapter(Adapter):
resolvers: list[str]
dns: str
def try_resolver(self, resolver: str) -> str:
output = subprocess.check_output(
["dig", "+short", self.dns, resolver], stderr=subprocess.STDOUT, text=True
)
return output.strip()
def get_ip(self) -> str:
for resolver in self.resolvers:
try:
ip = self.try_resolver(resolver)
except subprocess.CalledProcessError as e:
if e.returncode == 9:
continue
raise OSError("Unable to retrieve current IP") from e
if not ip:
logger.warning(f"No IP returned from {resolver}")
continue
return ip
raise OSError("Exhausted all known IP resolvers, unable to retrieve IP")
class OpenDNSAdapter(DNSAdapter):
dns = "myip.opendns.com"
resolvers = [
"@resolver1.opendns.com",
"@resolver2.opendns.com",
"@resolver3.opendns.com",
"@resolver4.opendns.com",
]

View file

@ -1,54 +1,40 @@
from pathlib import Path
import click
from transip_client.main import detect
DEFAULT_SERVICE = "https://api.ipify.org"
DEFAULT_API_URL = "https://api.transip.nl/v6"
@click.command()
@click.argument("domains", envvar="DOMAINS", nargs=-1)
@click.option("--token", envvar="TOKEN")
@click.option("--login", envvar="LOGIN")
@click.option("--private-key-path", envvar="PRIVATE_KEY_PATH")
@click.option("--service", envvar="SERVICE", default=DEFAULT_SERVICE)
@click.option("--api-url", envvar="API_URL", default=DEFAULT_API_URL)
@click.option("--read-only/--write", envvar="READ_ONLY", default=False)
@click.argument("login")
@click.argument("private-key-path")
@click.argument("domains", nargs=-1)
@click.option(
"--adapter-class",
default="transip_client.adapters.OpenDNSAdapter",
)
@click.option("--read-only/--write", default=False)
def run(
domains: list[str],
token: str,
login: str,
private_key_path: str,
service: str,
api_url: str,
adapter_class: str,
read_only: bool,
) -> None:
if not domains:
raise ValueError("No domain(s) specified")
token_retrieval = any(
(
login,
private_key_path,
)
)
if not Path(private_key_path).exists():
raise ValueError(f"Unknown private key path: {private_key_path}")
if token_retrieval and not all((login, private_key_path)):
if not all((login, private_key_path)):
raise ValueError(
"Both a login name and the path to a private key need to be specified"
)
elif not token_retrieval and not token:
raise ValueError(
"Either a token or a login name with a path to a private key need"
" to be specified"
)
detect(
domains,
service,
(private_key_path, login),
token,
api_url,
adapter_class,
read_only,
)

View file

@ -0,0 +1,14 @@
version: 1
formatters:
simple:
format: '%(asctime)s %(levelname)s %(name)s %(message)s'
datefmt: '%Y-%m-%d %H:%M:%S'
handlers:
console:
level: DEBUG
formatter: simple
class: logging.StreamHandler
loggers:
transip_client:
handlers: [console]
level: INFO

View file

@ -0,0 +1,14 @@
version: 1
formatters:
simple:
format: '%(asctime)s %(levelname)s %(name)s %(message)s'
datefmt: '%Y-%m-%d %H:%M:%S'
handlers:
console:
level: DEBUG
formatter: simple
class: logging.StreamHandler
loggers:
transip_client:
handlers: [console]
level: DEBUG

View file

@ -0,0 +1,21 @@
version: 1
formatters:
simple:
format: '%(asctime)s %(levelname)s %(name)s %(message)s'
datefmt: '%Y-%m-%d %H:%M:%S'
handlers:
console:
level: DEBUG
formatter: simple
class: logging.StreamHandler
file:
level: INFO
formatter: simple
class: logging.handlers.RotatingFileHandler
filename: /app/transip_client/logging/logs/app.log
backupCount: 5
maxBytes: 50000000 # 50 mB
loggers:
transip_client:
handlers: [console, file]
level: INFO

View file

@ -13,20 +13,15 @@ from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.hashes import SHA512
from transip_client.utils import import_string
logger = logging.getLogger(__name__)
def _get_ip(service: str) -> str:
try:
response = requests.get(service, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
raise OSError(f"Unable to retrieve current IP from {service}") from e
return response.text
API_URL = "https://api.transip.nl/v6"
# Note that tokens cannot be recreated when an existing token is present
def _get_token(private_key_path: str, login: str, api_url: str) -> str:
request = requests.Request(
"POST",
@ -36,7 +31,7 @@ def _get_token(private_key_path: str, login: str, api_url: str) -> str:
"nonce": str(int(time.time() * 1000)),
"read_only": False,
"expiration_time": "30 minutes",
"label": "Custom token",
"label": "Trans IP client",
"global_key": True,
},
)
@ -53,6 +48,8 @@ def _get_token(private_key_path: str, login: str, api_url: str) -> str:
prepped_request.headers["Signature"] = signature.decode("ascii")
logger.info(f"Retrieving token from {api_url} for {login}")
with requests.Session() as session:
response = session.send(prepped_request)
@ -65,6 +62,8 @@ def _get_token(private_key_path: str, login: str, api_url: str) -> str:
def _get_domain(domain: str, token: str, api_url: str) -> requests.Response:
logger.info(f"Retrieving domain information for {domain} from {api_url}")
headers = {"Authorization": f"Bearer {token}"}
return requests.get(f"{api_url}/domains/{domain}/dns", headers=headers)
@ -94,8 +93,9 @@ def _get_domain_data(
def _update_domain(
domain: str, payload: dict, api_url: str, token: str
) -> requests.Response:
headers = {"Authorization": f"Bearer {token}"}
logger.info(f"Updating domain {domain} at {api_url}")
headers = {"Authorization": f"Bearer {token}"}
return requests.put(
f"{api_url}/domains/{domain}/dns", data=json.dumps(payload), headers=headers
)
@ -123,23 +123,22 @@ def _update_domains(
logger.exception(f"Unable to update domain {domain}")
continue
logger.info(f"Updated domain {domain}")
logger.info(f"Updated domain {domain} at {api_url}")
def detect(
domains: list[str],
service: str,
credentials: tuple[str, str],
token: str,
api_url: str,
adapter_class: str,
read_only: bool,
) -> None:
ip = _get_ip(service)
_adapter_class = import_string(adapter_class)
adapter = _adapter_class()
ip = adapter.get_ip()
if all(credentials):
token = _get_token(*credentials, api_url)
token = _get_token(*credentials, API_URL)
domain_data = _get_domain_data(domains, token, api_url)
domain_data = _get_domain_data(domains, token, API_URL)
updated_domains = {}
for data in domain_data:
@ -156,8 +155,9 @@ def detect(
)
if dns_entries == updated_entries:
logger.info(f"No changes detected for {domain}, skipping...")
continue
updated_domains[domain] = {"dnsEntries": updated_entries}
_update_domains(updated_domains, api_url, token, read_only)
_update_domains(updated_domains, API_URL, token, read_only)

View file

@ -0,0 +1,129 @@
from subprocess import CalledProcessError
from unittest import TestCase
from unittest.mock import MagicMock, patch
from requests.exceptions import Timeout
from requests.models import HTTPError
from transip_client.adapters import IpifyAdapter, OpenDNSAdapter
class IpifyAdapterTestCase(TestCase):
adapter_class = IpifyAdapter
def setUp(self) -> None:
patcher = patch("transip_client.adapters.requests.get")
self.mocked_requests_get = patcher.start()
self.addCleanup(patcher.stop)
def test_simple(self):
self.mocked_requests_get.return_value = MagicMock(text="8.8.8.8")
adapter = self.adapter_class()
self.assertEqual(adapter.get_ip(), "8.8.8.8")
def test_service_unavailable(self):
self.mocked_requests_get.side_effect = Timeout
adapter = self.adapter_class()
with self.assertRaises(OSError) as exception_manager:
adapter.get_ip()
exception = exception_manager.exception
self.assertEqual(
str(exception), f"Unable to retrieve current IP from {adapter.service}"
)
def test_error_response(self):
request_mock = MagicMock()
request_mock.raise_for_status.side_effect = HTTPError
self.mocked_requests_get.return_value = request_mock
adapter = self.adapter_class()
with self.assertRaises(OSError) as exception_manager:
adapter.get_ip()
exception = exception_manager.exception
self.assertEqual(
str(exception), f"Unable to retrieve current IP from {adapter.service}"
)
def test_no_text_content(self):
self.mocked_requests_get.return_value = MagicMock(text="")
adapter = self.adapter_class()
with self.assertRaises(OSError) as exception_manager:
adapter.get_ip()
exception = exception_manager.exception
self.assertEqual(
str(exception),
f"Unable to determine IP from response from {adapter.service}",
)
class OpenDNSAdapterTestCase(TestCase):
def setUp(self) -> None:
patcher = patch("transip_client.adapters.subprocess.check_output")
self.mocked_check_output = patcher.start()
self.addCleanup(patcher.stop)
def test_simple(self):
self.mocked_check_output.return_value = "8.8.8.8\n"
adapter = OpenDNSAdapter()
self.assertEqual(adapter.get_ip(), "8.8.8.8")
def test_retry_next_resolvers(self):
self.mocked_check_output.side_effect = (
CalledProcessError(9, "dig +short myip.opendns.com @resolver1.opendns.com"),
CalledProcessError(9, "dig +short myip.opendns.com @resolver2.opendns.com"),
CalledProcessError(9, "dig +short myip.opendns.com @resolver3.opendns.com"),
"9.9.9.9\n",
)
adapter = OpenDNSAdapter()
self.assertEqual(adapter.get_ip(), "9.9.9.9")
def test_resolvers_exhausted(self):
self.mocked_check_output.side_effect = (
CalledProcessError(9, "dig +short myip.opendns.com @resolver1.opendns.com"),
CalledProcessError(9, "dig +short myip.opendns.com @resolver2.opendns.com"),
CalledProcessError(9, "dig +short myip.opendns.com @resolver3.opendns.com"),
CalledProcessError(9, "dig +short myip.opendns.com @resolver4.opendns.com"),
)
adapter = OpenDNSAdapter()
with self.assertRaises(OSError) as exception_manager:
adapter.get_ip()
self.assertEqual(
str(exception_manager.exception),
"Exhausted all known IP resolvers, unable to retrieve IP",
)
def test_command_error(self):
self.mocked_check_output.side_effect = (
CalledProcessError(1, "dig +short myip.opendns.com @resolver1.opendns.com"),
)
adapter = OpenDNSAdapter()
with self.assertRaises(OSError) as exception_manager:
adapter.get_ip()
self.assertEqual(
str(exception_manager.exception), "Unable to retrieve current IP"
)

View file

@ -0,0 +1,359 @@
import json
from pathlib import Path
from unittest import TestCase
from unittest.mock import MagicMock, call, patch
from click.testing import CliRunner
from requests import HTTPError
from transip_client.cli import run
from transip_client.main import API_URL
class RunTestCase(TestCase):
def setUp(self):
patcher = patch("transip_client.main.requests.get")
self.mocked_get = patcher.start()
self.addCleanup(patcher.stop)
patcher = patch("transip_client.main.requests.put")
self.mocked_put = patcher.start()
self.addCleanup(patcher.stop)
patcher = patch("transip_client.main.requests.Session.send")
self.mocked_session = patcher.start()
self.addCleanup(patcher.stop)
patcher = patch("transip_client.main.import_string")
self.mocked_import_string = patcher.start()
self.addCleanup(patcher.stop)
self.runner = CliRunner()
self.private_key_path = (
Path(__file__).resolve().parent / "files" / "test-private-key.pem"
)
def test_simple(self):
mocked_adapter = MagicMock(get_ip=lambda: "111.420")
self.mocked_import_string.return_value = MagicMock(return_value=mocked_adapter)
self.mocked_get.side_effect = [
MagicMock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.421",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foobar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foobar.com",
},
],
}
),
]
self.mocked_session.return_value.json.return_value = {"token": "token"}
result = self.runner.invoke(
run,
["my-user", str(self.private_key_path), "foobar.com"],
catch_exceptions=False,
)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
f"{API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
)
expected_json = json.dumps(
{
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.420",
}
]
}
)
self.mocked_put.assert_called_with(
f"{API_URL}/domains/foobar.com/dns",
data=expected_json,
headers={"Authorization": "Bearer token"},
)
def test_error_response(self):
mocked_adapter = MagicMock(get_ip=lambda: "111.420")
self.mocked_import_string.return_value = MagicMock(return_value=mocked_adapter)
self.mocked_session.return_value.json.return_value = {"token": "token"}
self.mocked_get.side_effect = [HTTPError]
result = self.runner.invoke(
run,
["my-user", str(self.private_key_path), "foobar.com"],
catch_exceptions=False,
)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
f"{API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
)
self.mocked_put.assert_not_called()
def test_update_error_response(self):
mocked_adapter = MagicMock(get_ip=lambda: "111.420")
self.mocked_import_string.return_value = MagicMock(return_value=mocked_adapter)
self.mocked_session.return_value.json.return_value = {"token": "token"}
self.mocked_get.side_effect = [
MagicMock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.421",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foobar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foobar.com",
},
],
}
),
MagicMock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.421",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/boofar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/boofar.com",
},
],
}
),
]
def raise_exception() -> None:
raise HTTPError
self.mocked_put.side_effect = (
MagicMock(raise_for_status=raise_exception),
MagicMock(status=200),
)
result = self.runner.invoke(
run,
["my-user", str(self.private_key_path), "foobar.com", "boofar.com"],
catch_exceptions=False,
)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_has_calls(
(
call(
f"{API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
),
call(
f"{API_URL}/domains/boofar.com/dns",
headers={"Authorization": "Bearer token"},
),
)
)
self.assertCountEqual(
self.mocked_put.mock_calls,
(
call(
f"{API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
data=json.dumps(
{
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.420",
}
]
}
),
),
call(
f"{API_URL}/domains/boofar.com/dns",
headers={"Authorization": "Bearer token"},
data=json.dumps(
{
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.420",
}
]
}
),
),
),
)
def test_matching_ip(self):
mocked_adapter = MagicMock(get_ip=lambda: "111.420")
self.mocked_import_string.return_value = MagicMock(return_value=mocked_adapter)
self.mocked_get.side_effect = [
MagicMock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.420",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foobar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foobar.com",
},
],
}
),
]
self.mocked_session.return_value.json.return_value = {"token": "token"}
result = self.runner.invoke(
run,
["my-user", str(self.private_key_path), "foobar.com"],
catch_exceptions=False,
)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
f"{API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
)
self.mocked_put.assert_not_called()
def test_readonly(self):
mocked_adapter = MagicMock(get_ip=lambda: "111.420")
self.mocked_import_string.return_value = MagicMock(return_value=mocked_adapter)
self.mocked_get.side_effect = [
MagicMock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.421",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foobar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foobar.com",
},
],
}
),
]
self.mocked_session.return_value.json.return_value = {"token": "token"}
result = self.runner.invoke(
run,
["my-user", str(self.private_key_path), "foobar.com", "--read-only"],
catch_exceptions=False,
)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
f"{API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
)
self.mocked_put.assert_not_called()
def test_no_domains(self):
result = self.runner.invoke(
run,
["my-user", str(self.private_key_path)],
catch_exceptions=True,
)
self.assertEqual(result.exit_code, 1)
self.assertEqual(str(result.exception), "No domain(s) specified")
self.mocked_get.assert_not_called()
self.mocked_put.assert_not_called()
def test_unknown_private_key_path(self):
result = self.runner.invoke(
run,
["my-user", str("/tmp/foo"), "foobar.com"],
catch_exceptions=True,
)
self.assertEqual(result.exit_code, 1)
self.assertEqual(str(result.exception), "Unknown private key path: /tmp/foo")
self.mocked_get.assert_not_called()
self.mocked_put.assert_not_called()

View file

@ -1,528 +0,0 @@
import json
import os
from pathlib import Path
from unittest import TestCase
from unittest.mock import Mock, call, patch
from click.testing import CliRunner
from requests import HTTPError
from transip_client.cli import DEFAULT_API_URL, DEFAULT_SERVICE, run
class RunTestCase(TestCase):
def setUp(self):
patcher = patch("transip_client.main.requests.get")
self.mocked_get = patcher.start()
patcher = patch("transip_client.main.requests.put")
self.mocked_put = patcher.start()
patcher = patch("transip_client.main.requests.Session.send")
self.mocked_session = patcher.start()
self.runner = CliRunner()
def test_simple(self):
self.mocked_get.side_effect = [
Mock(text="111.420"),
Mock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.421",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foobar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foobar.com",
},
],
}
),
]
with self.assertLogs("transip_client.main", level="INFO") as logger:
result = self.runner.invoke(
run, "foobar.com", env={"TOKEN": "token"}, catch_exceptions=False
)
self.assertEqual(
logger.output, ["INFO:transip_client.main:Updated domain foobar.com"]
)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
)
expected_json = json.dumps(
{
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.420",
}
]
}
)
self.mocked_put.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
data=expected_json,
headers={"Authorization": "Bearer token"},
)
def test_error_response(self):
self.mocked_get.side_effect = [Mock(text="111.420"), HTTPError]
with self.assertLogs("transip_client.main", level="INFO") as logger:
result = self.runner.invoke(
run, ["foobar.com"], env={"TOKEN": "token"}, catch_exceptions=False
)
error_log = logger.output[0]
self.assertIn("Failed retrieving information for foobar.com", error_log)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
)
self.mocked_put.assert_not_called()
def test_matching_ip(self):
self.mocked_get.side_effect = [
Mock(text="111.420"),
Mock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.420",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foobar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foobar.com",
},
],
}
),
]
result = self.runner.invoke(run, ["foobar.com"], env={"TOKEN": "token"})
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
)
self.mocked_put.assert_not_called()
def test_readonly(self):
self.mocked_get.side_effect = [
Mock(text="111.420"),
Mock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.421",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foobar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foobar.com",
},
],
}
),
]
result = self.runner.invoke(
run, ["foobar.com", "--read-only"], env={"TOKEN": "token"}
)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
)
self.mocked_put.assert_not_called()
def test_different_api_url(self):
self.mocked_get.side_effect = [
Mock(text="111.420"),
Mock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.421",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foobar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foobar.com",
},
],
}
),
]
with self.assertLogs("transip_client.main", level="INFO") as logger:
result = self.runner.invoke(
run,
["foobar.com", "--api-url", "https://other-provider.com"],
env={"TOKEN": "token"},
)
self.assertEqual(
logger.output, ["INFO:transip_client.main:Updated domain foobar.com"]
)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
"https://other-provider.com/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
)
expected_json = json.dumps(
{
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.420",
}
]
}
)
self.mocked_put.assert_called_with(
"https://other-provider.com/domains/foobar.com/dns",
data=expected_json,
headers={"Authorization": "Bearer token"},
)
def test_env_var(self):
self.mocked_get.side_effect = [
Mock(text="111.420"),
Mock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.421",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foobar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foobar.com",
},
],
}
),
]
with self.assertLogs("transip_client.main", level="INFO") as logger:
result = self.runner.invoke(
run,
["foobar.com"],
env={
"TOKEN": "token",
"API_URL": "https://new-api.com",
},
)
self.assertEqual(
logger.output, ["INFO:transip_client.main:Updated domain foobar.com"]
)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
"https://new-api.com/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
)
expected_json = json.dumps(
{
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.420",
}
]
}
)
self.mocked_put.assert_called_with(
"https://new-api.com/domains/foobar.com/dns",
data=expected_json,
headers={"Authorization": "Bearer token"},
)
def test_multi_arg_env_var(self):
self.mocked_get.side_effect = [
Mock(text="111.420"),
Mock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.421",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foobar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foobar.com",
},
],
},
),
Mock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.421",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foofoo.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foofoo.com",
},
],
}
),
]
with self.assertLogs("transip_client.main", level="INFO") as logger:
result = self.runner.invoke(
run, [], env={"TOKEN": "token", "DOMAINS": "foobar.com foofoo.com"}
)
self.assertIsNone(result.exception)
self.assertEqual(
logger.output,
[
"INFO:transip_client.main:Updated domain foobar.com",
"INFO:transip_client.main:Updated domain foofoo.com",
],
)
self.assertEqual(result.exit_code, 0)
expected_calls = [
call(DEFAULT_SERVICE, timeout=10),
call(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer token"},
),
call(
f"{DEFAULT_API_URL}/domains/foofoo.com/dns",
headers={"Authorization": "Bearer token"},
),
]
# use any_order because of the asynchronous requests
self.mocked_get.assert_has_calls(expected_calls, any_order=True)
expected_json = json.dumps(
{
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.420",
}
]
}
)
expected_calls = [
call(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
data=expected_json,
headers={"Authorization": "Bearer token"},
),
call(
f"{DEFAULT_API_URL}/domains/foofoo.com/dns",
data=expected_json,
headers={"Authorization": "Bearer token"},
),
]
self.mocked_put.assert_has_calls(expected_calls, any_order=True)
def test_no_domains(self):
result = self.runner.invoke(run, [], env={"TOKEN": "token"})
self.assertEqual(result.exit_code, 1)
self.assertEqual(str(result.exception), "No domain(s) specified")
self.mocked_get.assert_not_called()
self.mocked_put.assert_not_called()
def test_no_token_or_login_with_private_key_path(self):
result = self.runner.invoke(run, ["foobar.com"])
self.assertEqual(result.exit_code, 1)
self.assertEqual(
str(result.exception),
"Either a token or a login name with a path to a private key need"
" to be specified",
)
self.mocked_get.assert_not_called()
self.mocked_put.assert_not_called()
def test_login_without_private_key_path(self):
result = self.runner.invoke(run, ["foobar.com"], env={"LOGIN": "foo"})
self.assertEqual(result.exit_code, 1)
self.assertEqual(
str(result.exception),
"Both a login name and the path to a private key need to be specified",
)
self.mocked_get.assert_not_called()
self.mocked_put.assert_not_called()
def test_login_with_private_key_path(self):
self.mocked_get.side_effect = [
Mock(text="111.420"),
Mock(
json=lambda: {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.421",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foobar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foobar.com",
},
],
}
),
]
self.mocked_session.return_value.json.return_value = {"token": "FOOBAR"}
private_key_path = (
Path(os.path.dirname(__file__)) / "files" / "test-private-key.pem"
)
with self.assertLogs("transip_client.main", level="INFO") as logger:
result = self.runner.invoke(
run,
["foobar.com"],
env={"LOGIN": "foo", "PRIVATE_KEY_PATH": str(private_key_path)},
)
self.assertEqual(
logger.output, ["INFO:transip_client.main:Updated domain foobar.com"]
)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer FOOBAR"},
)
expected_json = json.dumps(
{
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.420",
}
]
}
)
self.mocked_put.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
data=expected_json,
headers={"Authorization": "Bearer FOOBAR"},
)

10
transip_client/utils.py Normal file
View file

@ -0,0 +1,10 @@
from importlib import import_module
from typing import Type
from transip_client.adapters import Adapter
def import_string(path: str) -> Type[Adapter]:
module_path, class_name = path.rsplit(".", 1)
module = import_module(module_path)
return getattr(module, class_name)