Resolve "Automatic token generation"

This commit is contained in:
sonny 2021-07-18 13:40:03 +00:00
parent a30d4b70ea
commit 57a85158b3
7 changed files with 338 additions and 27 deletions

View file

@ -2,6 +2,7 @@ import click
from transip_client.main import detect
DEFAULT_DNS = "myip.opendns.com"
DEFAULT_DNS_NAME = "@resolver1.opendns.com"
DEFAULT_API_URL = "https://api.transip.nl/v6"
@ -9,13 +10,39 @@ DEFAULT_API_URL = "https://api.transip.nl/v6"
@click.command()
@click.argument("domains", envvar="DOMAINS", nargs=-1)
@click.argument("token", envvar="TOKEN")
@click.option("--token", envvar="TOKEN")
@click.option("--login", envvar="LOGIN")
@click.option("--private-key-path", envvar="PRIVATE_KEY_PATH")
@click.option("--dns", envvar="DNS", default=DEFAULT_DNS)
@click.option("--dns-name", envvar="DNS_NAME", default=DEFAULT_DNS_NAME)
@click.option("--api-url", envvar="API_URL", default=DEFAULT_API_URL)
@click.option("--read-only/--write", envvar="READ_ONLY", default=False)
def run(domains, token, dns, dns_name, api_url, read_only):
def run(domains, token, login, private_key_path, dns, dns_name, api_url, read_only):
if not domains:
raise ValueError("No domain(s) specified")
detect(domains, (dns, dns_name), api_url, token, read_only)
token_retrieval = any(
(
login,
private_key_path,
)
)
if token_retrieval and not all((login, private_key_path)):
raise ValueError(
"Both a login name and the path to a private key need to be specified"
)
elif not token_retrieval and not token:
raise ValueError(
"Either a token or a login name with a path to a private key need"
" to be specified"
)
detect(
domains,
(dns, dns_name),
(private_key_path, login),
token,
api_url,
read_only,
)

View file

@ -1,11 +1,18 @@
import base64
import json
import logging
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.hashes import SHA512
logger = logging.getLogger(__name__)
@ -22,6 +29,43 @@ def _get_ip(resolvers):
return output.decode("utf-8").strip()
def _get_token(private_key_path, login, api_url):
request = requests.Request(
"POST",
f"{api_url}/auth",
json={
"login": login,
"nonce": str(int(time.time() * 1000)),
"read_only": False,
"expiration_time": "30 minutes",
"label": "Custom token",
"global_key": True,
},
)
prepped_request = request.prepare()
with open(private_key_path, "rb") as file:
private_key = serialization.load_pem_private_key(
file.read(), password=None, backend=default_backend()
)
signature = private_key.sign(prepped_request.body, PKCS1v15(), SHA512())
signature = base64.b64encode(signature)
prepped_request.headers["Signature"] = signature.decode("ascii")
with requests.Session() as session:
response = session.send(prepped_request)
if not response.ok:
response.raise_for_status()
response_data = response.json()
return response_data["token"]
def _get_domain(domain, token, api_url):
headers = {"Authorization": f"Bearer {token}"}
@ -79,11 +123,15 @@ def _update_domains(updated_domains, api_url, token, read_only):
logger.info(f"Updated domain {domain}")
def detect(domains, resolvers, api_url, token, read_only):
def detect(domains, resolvers, credentials, token, api_url, read_only):
ip = _get_ip(resolvers)
domain_data = _get_domain_data(domains, token, api_url)
updated_domains = {}
if all(credentials):
token = _get_token(*credentials, api_url)
domain_data = _get_domain_data(domains, token, api_url)
for data in domain_data:
dns_entries = data["dnsEntries"]
domain = data["domain"]

View file

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpgIBAAKCAQEAz7ojmkmZ2QA2L0QW/K2BLoxLs6eGWw/LBV5E+beTPz9rejla
Qj/Dq8T7qnBvM8TtLH3Bwwu6H15oqnVis0OhAZTuAFwQuUrzw9g83S+KHxBGATVb
eWQxwkf9ma8GIX+xFnby3XGvVXZjKZgUMZAeFK7c/eXxnRJ8QnN/aoXibiFP0bgS
jjGpsoS6FdjRkau7BbdOJg0fRcTIjkdd11r5pg1oRGAApwU6g+HtjQWYLiNoIxba
8Vv0SSdK9V/hgk/aqLXGmCCjvLqvEK0KxK9y035Sn6C4AnohdiSO5SRYZgipM9F0
RJzc20qCcTL6CpIS4SupA6121eno/bxAQY4q8wIDAQABAoIBAQC3KM9fqWoIFtGw
F3+VSH9RRc8yF5K2FFTU5Ow4q48gA5GG8a8OHx8vA79L51uF8CuYQUJp8psoMZxk
QKDIo+cBeAnrM0Jjvxz1IGN6PAKzpSu0wRFpFdlyDvwjWFo1i1vgDP3UF/ubhYmm
ETws/4AmiJC/JtNFxhjeluxQRsECjLDf52iqO/LbyxQUAElIkMC24ni2SHnLZfQ3
KIBS3MdVULGrAO6dF805vF/CbPajzgsDZzuzcOFjYetHm+qvUV8Tp34M00Mzf5Ar
7joBLCHgeFWfwPIY0l7LnRhs8Z79vAi/slMi/zrQnFi/5aiLE86y195LXeXh59If
BS7eq+YhAoGBAPmGmVR282oMhDFkzR6N2vkKbRCg8p6QMYK3L6/vYYVbf+S4ePH+
GJo41iG76YIx77UPR5wE6+KRStmc6SkvXPy8mJzNVRzMX7xIgtLbNfx37HSqIkKy
CN3rppQ8VTxsaEM8LwKvAQ/V1xxvtmhct08oEQCPqeQHHdrOEvFwl7HbAoGBANUd
56NRDcKmF0mnoCgJpTZxVImpbOcrislQHKJvVjz8JegBcv+JBoX4p/g62FXh0CIJ
fr7KyKsWpH77zniNFu9xgwEs3RaK+z/+GsHsH4IWBkgj4ABweGOiCw+oeN0WdA8w
4okF2VYZbzXbpH3ULrwAvKnElGFcBboSY9yLwHLJAoGBAIGsKz6z2mfAPWqV4esA
+Uz22BsOKUex06kEnemmU13EYUBxhZjs3cg3xUAesYkRfmrvl91CyXsi2m0gmCLp
FD/bmsvSAWtH4nCsliAR/4pGoEE4sTlL4EPD1PuwJvORutVGD4Arhje+f12tyHOP
y0t9nOhXwIhaEm/FLB8AzjSFAoGBAK3NSKZ5KLawi1dnHAbq7tC6lg36nTTd3r6U
1fVmxTbRD/zoiad6UVaa1ilrnBhWI05O3g2tBP/6ZEanBthrf+Pgd81SkC+dQpAK
pDm4Xm3Rlmfo0fqpvpTKhyjK5V6wvA/Tdzv2CCvebELJEJoJm9943mO5TKUlzgnU
i5pGYrl5AoGBAN+b0liV/tDP7NT2pwSTqoW+8itEhBu0QxiHr/QM2941C4+tV4g9
wDC2CkQftSm1zx3unnDvU/WXSeJN4GWI7RmQNdsj84mPZVMfFqhvS8F7czbBMHV8
OHJdQYlyGPHP9WeP1H0K/zuFEcU6sY/Prl0fRIqhTroI6xEaEd+1bZx7
-----END RSA PRIVATE KEY-----

View file

@ -1,7 +1,9 @@
import json
import os
from unittest import TestCase
from unittest.mock import call, patch
from pathlib import Path
from click.testing import CliRunner
from requests import HTTPError
@ -20,6 +22,9 @@ class RunTestCase(TestCase):
patcher = patch("transip_client.main.requests.put")
self.mocked_put = patcher.start()
patcher = patch("transip_client.main.requests.Session.send")
self.mocked_session = patcher.start()
self.runner = CliRunner()
def test_simple(self):
@ -46,7 +51,7 @@ class RunTestCase(TestCase):
}
with self.assertLogs("transip_client.main", level="INFO") as logger:
result = self.runner.invoke(run, ["foobar.com", "TOKEN"])
result = self.runner.invoke(run, ["foobar.com"], env={"TOKEN": "token"})
self.assertEqual(
logger.output, ["INFO:transip_client.main:Updated domain foobar.com"]
@ -56,7 +61,7 @@ class RunTestCase(TestCase):
self.mocked_get.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
)
expected_json = json.dumps(
@ -75,7 +80,7 @@ class RunTestCase(TestCase):
self.mocked_put.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
data=expected_json,
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
)
def test_error_response(self):
@ -83,7 +88,7 @@ class RunTestCase(TestCase):
self.mocked_get.return_value.raise_for_status.side_effect = HTTPError
with self.assertLogs("transip_client.main", level="INFO") as logger:
result = self.runner.invoke(run, ["foobar.com", "TOKEN"])
result = self.runner.invoke(run, ["foobar.com"], env={"TOKEN": "token"})
error_log = logger.output[0]
@ -92,7 +97,7 @@ class RunTestCase(TestCase):
self.mocked_get.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
)
self.mocked_put.assert_not_called()
@ -120,13 +125,13 @@ class RunTestCase(TestCase):
],
}
result = self.runner.invoke(run, ["foobar.com", "TOKEN"])
result = self.runner.invoke(run, ["foobar.com"], env={"TOKEN": "token"})
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
)
self.mocked_put.assert_not_called()
@ -154,13 +159,15 @@ class RunTestCase(TestCase):
],
}
result = self.runner.invoke(run, ["foobar.com", "TOKEN", "--read-only"])
result = self.runner.invoke(
run, ["foobar.com", "--read-only"], env={"TOKEN": "token"}
)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
)
self.mocked_put.assert_not_called()
@ -190,7 +197,9 @@ class RunTestCase(TestCase):
with self.assertLogs("transip_client.main", level="INFO") as logger:
result = self.runner.invoke(
run, ["foobar.com", "TOKEN", "--api-url", "https://other-provider.com"]
run,
["foobar.com", "--api-url", "https://other-provider.com"],
env={"TOKEN": "token"},
)
self.assertEqual(
@ -201,7 +210,7 @@ class RunTestCase(TestCase):
self.mocked_get.assert_called_with(
f"https://other-provider.com/domains/foobar.com/dns",
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
)
expected_json = json.dumps(
@ -220,7 +229,7 @@ class RunTestCase(TestCase):
self.mocked_put.assert_called_with(
f"https://other-provider.com/domains/foobar.com/dns",
data=expected_json,
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
)
def test_env_var(self):
@ -248,7 +257,12 @@ class RunTestCase(TestCase):
with self.assertLogs("transip_client.main", level="INFO") as logger:
result = self.runner.invoke(
run, ["foobar.com", "TOKEN"], env={"API_URL": "https://new-api.com"}
run,
["foobar.com"],
env={
"TOKEN": "token",
"API_URL": "https://new-api.com",
},
)
self.assertEqual(
@ -259,7 +273,7 @@ class RunTestCase(TestCase):
self.mocked_get.assert_called_with(
"https://new-api.com/domains/foobar.com/dns",
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
)
expected_json = json.dumps(
@ -278,7 +292,7 @@ class RunTestCase(TestCase):
self.mocked_put.assert_called_with(
"https://new-api.com/domains/foobar.com/dns",
data=expected_json,
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
)
def test_multi_arg_env_var(self):
@ -328,7 +342,7 @@ class RunTestCase(TestCase):
with self.assertLogs("transip_client.main", level="INFO") as logger:
result = self.runner.invoke(
run, ["TOKEN"], env={"DOMAINS": "foobar.com foofoo.com"}
run, [], env={"TOKEN": "token", "DOMAINS": "foobar.com foofoo.com"}
)
self.assertIsNone(result.exception)
@ -345,13 +359,13 @@ class RunTestCase(TestCase):
expected_calls = [
call(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
),
call().raise_for_status(),
call().json(),
call(
f"{DEFAULT_API_URL}/domains/foofoo.com/dns",
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
),
call().raise_for_status(),
call().json(),
@ -377,13 +391,13 @@ class RunTestCase(TestCase):
call(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
data=expected_json,
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
),
call().raise_for_status(),
call(
f"{DEFAULT_API_URL}/domains/foofoo.com/dns",
data=expected_json,
headers={"Authorization": "Bearer TOKEN"},
headers={"Authorization": "Bearer token"},
),
call().raise_for_status(),
]
@ -391,10 +405,101 @@ class RunTestCase(TestCase):
self.mocked_put.assert_has_calls(expected_calls, any_order=True)
def test_no_domains(self):
result = self.runner.invoke(run, ["TOKEN"])
result = self.runner.invoke(run, [], env={"TOKEN": "token"})
self.assertEqual(result.exit_code, 1)
self.assertEqual(str(result.exception), "No domain(s) specified")
self.mocked_get.assert_not_called()
self.mocked_put.assert_not_called()
def test_no_token_or_login_with_private_key_path(self):
result = self.runner.invoke(run, ["foobar.com"])
self.assertEqual(result.exit_code, 1)
self.assertEqual(
str(result.exception),
"Either a token or a login name with a path to a private key need"
" to be specified"
)
self.mocked_get.assert_not_called()
self.mocked_put.assert_not_called()
def test_login_without_private_key_path(self):
result = self.runner.invoke(run, ["foobar.com"], env={"LOGIN": "foo"})
self.assertEqual(result.exit_code, 1)
self.assertEqual(
str(result.exception),
"Both a login name and the path to a private key need to be specified"
)
self.mocked_get.assert_not_called()
self.mocked_put.assert_not_called()
def test_login_with_private_key_path(self):
self.mocked_dns.return_value = b"111.420\n"
self.mocked_get.return_value.json.return_value = {
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.421",
}
],
"_links": [
{
"rel": "self",
"link": "https://api.transip.nl/v6/domains/foobar.com/dns",
},
{
"rel": "domain",
"link": "https://api.transip.nl/v6/domains/foobar.com",
},
],
}
self.mocked_session.return_value.json.return_value = {"token": "FOOBAR"}
private_key_path = (
Path(os.path.dirname(__file__)) / "files" / "test-private-key.pem"
)
with self.assertLogs("transip_client.main", level="INFO") as logger:
result = self.runner.invoke(
run,
["foobar.com"],
env={"LOGIN": "foo", "PRIVATE_KEY_PATH": str(private_key_path)}
)
self.assertEqual(
logger.output, ["INFO:transip_client.main:Updated domain foobar.com"]
)
self.assertEqual(result.exit_code, 0)
self.mocked_get.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
headers={"Authorization": "Bearer FOOBAR"},
)
expected_json = json.dumps(
{
"dnsEntries": [
{
"name": "@",
"expire": 60,
"type": "A",
"content": "111.420",
}
]
}
)
self.mocked_put.assert_called_with(
f"{DEFAULT_API_URL}/domains/foobar.com/dns",
data=expected_json,
headers={"Authorization": "Bearer FOOBAR"},
)