diff --git a/.github/workflows/proxy-test.yml b/.github/workflows/proxy-test.yml new file mode 100644 index 0000000..8d5087c --- /dev/null +++ b/.github/workflows/proxy-test.yml @@ -0,0 +1,107 @@ +name: Proxy tests + +on: + push: + pull_request: + branches: [ devel ] + schedule: + # * is a special character in YAML so you have to quote this string + - cron: '0 2 * * 6' + +jobs: + proxy_tests: + name: "proxy_tests" + runs-on: ubuntu-latest + strategy: + fail-fast: false + steps: + + - name: "checkout GIT" + uses: actions/checkout@v2 + + - name: "[ PREPARE ] create network" + run: | + docker network create acme + + - name: "[ PREPARE ] proxy container" + run: | + docker pull mosajjal/pproxy:latest + docker run -d -it --name=proxy --network acme --rm -p 8080:8080 mosajjal/pproxy:latest -vv & + + - name: "[ PREPARE ] Sleep for 10s" + uses: juliangruber/sleep-action@v1 + with: + time: 10s + + - name: "[ PREPARE ] Build docker-compose (apache2_wsgi)" + working-directory: examples/Docker/ + run: | + sudo mkdir -p data + docker-compose up -d + docker-compose logs + + - name: "Test http://acme-srv/directory is accessable" + run: docker run -i --rm --network acme curlimages/curl -f http://acme-srv/directory + + - name: "[ PREPARE ] setup openssl ca_handler" + run: | + sudo cp examples/ca_handler/openssl_ca_handler.py examples/Docker/data/ca_handler.py + sudo mkdir -p examples/Docker/data/acme_ca/certs + sudo cp test/ca/sub-ca-key.pem test/ca/sub-ca-crl.pem test/ca/sub-ca-cert.pem test/ca/root-ca-cert.pem examples/Docker/data/acme_ca/ + sudo cp .github/openssl_ca_handler.py_acme_srv_default_handler.cfg examples/Docker/data/acme_srv.cfg + sudo chmod 777 examples/Docker/data/acme_srv.cfg + sudo sed -i "s/debug: True/debug: True\nproxy_server_list: {\"acme-sh.acme\$\": \"socks5:\/\/proxy.acme:8080\", \"acme-sh.\$\": \"http\:\/\/proxy.acme:8080\"}/g" examples/Docker/data/acme_srv.cfg + cd examples/Docker/ + docker-compose restart + docker-compose logs + + - name: "Test http://acme-srv/directory is accessable again" + run: docker run -i --rm --network acme curlimages/curl -f http://acme-srv/directory + + - name: "[ PREPARE ] prepare acme.sh container" + run: | + docker run --rm -id -v "$(pwd)/acme-sh":/acme.sh --network acme --name=acme-sh neilpang/acme.sh:latest daemon + + - name: "[ ENROLL ] acme.sh - http challenge validation" + run: | + docker exec -i acme-sh acme.sh --server http://acme-srv --accountemail 'acme-sh@example.com' --issue -d acme-sh.acme -d acme-sh. --standalone --debug 3 --output-insecure --force + openssl verify -CAfile examples/Docker/data/acme_ca/root-ca-cert.pem -untrusted examples/Docker/data/acme_ca/sub-ca-cert.pem acme-sh/acme-sh.acme/acme-sh.acme.cer + + - name: "[ CHECK ] proxy logs" + run: | + docker logs proxy | grep socks5 | grep -- "->" + docker logs proxy | grep http | grep -- "->" + docker stop proxy + docker run -d -it --name=proxy --network acme --rm -p 8080:8080 mosajjal/pproxy:latest -vv & + + - name: "[ ENROLL ] acme.sh - alpn challenge validation" + run: | + docker exec -i acme-sh acme.sh --server http://acme-srv --accountemail 'acme-sh@example.com' --issue -d acme-sh.acme --alpn -d acme-sh. --alpn --standalone --debug 3 --output-insecure --force + openssl verify -CAfile examples/Docker/data/acme_ca/root-ca-cert.pem -untrusted examples/Docker/data/acme_ca/sub-ca-cert.pem acme-sh/acme-sh.acme/acme-sh.acme.cer + + - name: "[ CHECK ] proxy logs" + run: | + docker logs proxy | grep socks5 | grep -- "->" + docker logs proxy | grep http | grep -- "->" + docker stop proxy + docker run -d -it --name=proxy --network acme --rm -p 8080:8080 mosajjal/pproxy:latest -vv & + + - name: "[ stop ] proxy container" + run: | + docker stop proxy + + - name: "[ * ] collecting test logs" + if: ${{ failure() }} + run: | + mkdir -p ${{ github.workspace }}/artifact/upload + sudo cp -rp examples/Docker/data/ ${{ github.workspace }}/artifact/data/ + cd examples/Docker + docker-compose logs > ${{ github.workspace }}/artifact/docker-compose.log + sudo tar -C ${{ github.workspace }}/artifact/ -cvzf ${{ github.workspace }}/artifact/upload/artifact.tar.gz docker-compose.log data + + - name: "[ * ] uploading artificates" + uses: actions/upload-artifact@v2 + if: ${{ failure() }} + with: + name: proxy.tar.gz + path: ${{ github.workspace }}/artifact/upload/ diff --git a/acme_srv/challenge.py b/acme_srv/challenge.py index d7b6b0c..34f70e6 100644 --- a/acme_srv/challenge.py +++ b/acme_srv/challenge.py @@ -3,7 +3,7 @@ """ Challenge class """ from __future__ import print_function import json -from acme_srv.helper import generate_random_string, parse_url, load_config, jwk_thumbprint_get, url_get, sha256_hash, sha256_hash_hex, b64_encode, b64_url_encode, txt_get, fqdn_resolve, uts_now, uts_to_date_utc, servercert_get, cert_san_get, cert_extensions_get, fqdn_in_san_check +from acme_srv.helper import generate_random_string, parse_url, load_config, jwk_thumbprint_get, url_get, sha256_hash, sha256_hash_hex, b64_encode, b64_url_encode, txt_get, fqdn_resolve, uts_now, uts_to_date_utc, servercert_get, cert_san_get, cert_extensions_get, fqdn_in_san_check, proxy_check from acme_srv.db_handler import DBstore from acme_srv.message import Message @@ -21,6 +21,7 @@ class Challenge(object): self.challenge_validation_disable = False self.tnauthlist_support = False self.dns_server_list = None + self.proxy_server_list = {} def __enter__(self): """ Makes ACMEHandler a Context Manager """ @@ -136,15 +137,21 @@ class Challenge(object): try: self.dns_server_list = json.loads(config_dic['Challenge']['dns_server_list']) except BaseException as err_: - self.logger.warning('Challenge._config_load() failed with error: {0}'.format(err_)) + self.logger.warning('Challenge._config_load() dns_server_list failed with error: {0}'.format(err_)) if 'Order' in config_dic: self.tnauthlist_support = config_dic.getboolean('Order', 'tnauthlist_support', fallback=False) - if 'Directory' in config_dic: + if 'Directory' in config_dic: if 'url_prefix' in config_dic['Directory']: self.path_dic = {k: config_dic['Directory']['url_prefix'] + v for k, v in self.path_dic.items()} + if 'DEFAULT' in config_dic and 'proxy_server_list' in config_dic['DEFAULT']: + try: + self.proxy_server_list = json.loads(config_dic['DEFAULT']['proxy_server_list']) + except BaseException as err_: + self.logger.warning('Challenge._config_load() proxy_server_list failed with error: {0}'.format(err_)) + self.logger.debug('Challenge._config_load() ended.') def _name_get(self, url): @@ -252,13 +259,18 @@ class Challenge(object): self.logger.debug('fqdn_resolve() ended with: {0}/{1}'.format(response, invalid)) # we are expecting a certifiate extension which is the sha256 hexdigest of token in a byte structure - # which is base 64 encoded '0420' has been taken from acme_srv.sh sources + # which is base64 encoded '0420' has been taken from acme_srv.sh sources sha256_digest = sha256_hash_hex(self.logger, '{0}.{1}'.format(token, jwk_thumbprint)) extension_value = b64_encode(self.logger, bytearray.fromhex('0420{0}'.format(sha256_digest))) self.logger.debug('computed value: {0}'.format(extension_value)) if not invalid: - cert = servercert_get(self.logger, fqdn) + # check if we need to set a proxy + if self.proxy_server_list: + proxy_server = proxy_check(self.logger, fqdn, self.proxy_server_list) + else: + proxy_server = None + cert = servercert_get(self.logger, fqdn, 443, proxy_server) if cert: san_list = cert_san_get(self.logger, cert, recode=False) fqdn_in_san = fqdn_in_san_check(self.logger, san_list, fqdn) @@ -316,9 +328,12 @@ class Challenge(object): (response, invalid) = fqdn_resolve(fqdn, self.dns_server_list) self.logger.debug('fqdn_resolve() ended with: {0}/{1}'.format(response, invalid)) if not invalid: - req = url_get(self.logger, 'http://{0}/.well-known/acme-challenge/{1}'.format(fqdn, token), self.dns_server_list, verify=False) - # make challenge validation unsuccessful - # req = url_get(self.logger, 'http://{0}/.well-known/acme-challenge/{1}'.format('test.test', 'foo.bar.some.not.existing.ressource')) + # check if we need to set a proxy + if self.proxy_server_list: + proxy_server = proxy_check(self.logger, fqdn, self.proxy_server_list) + else: + proxy_server = None + req = url_get(self.logger, 'http://{0}/.well-known/acme-challenge/{1}'.format(fqdn, token), dns_server_list=self.dns_server_list, proxy_server=proxy_server, verify=False) if req: response_got = req.splitlines()[0] response_expected = '{0}.{1}'.format(token, jwk_thumbprint) diff --git a/acme_srv/helper.py b/acme_srv/helper.py index 19c7d26..4de810e 100644 --- a/acme_srv/helper.py +++ b/acme_srv/helper.py @@ -15,13 +15,14 @@ import textwrap from datetime import datetime from string import digits, ascii_letters import socket +import ssl +import logging +import hashlib +import socks try: from urllib.parse import urlparse except ImportError: from urlparse import urlparse -import logging -import hashlib -import ssl from urllib3.util import connection from jwcrypto import jwk, jws from dateutil.parser import parse @@ -587,6 +588,33 @@ def patched_create_connection(address, *args, **kwargs): # pylint: disable=W0212 return connection._orig_create_connection((hostname, port), *args, **kwargs) +def proxy_check(logger, fqdn, proxy_server_list): + """ check proxy server """ + logger.debug('proxy_check({0})'.format(fqdn)) + + # remove leading *. + proxy_server_list_new = { k.replace('*.', ''): v for k, v in proxy_server_list.items() } + + proxy = None + for regex in sorted(proxy_server_list_new.keys(), reverse=True): + if regex != '*': + if regex.startswith('*.'): + regex_compiled = re.compile(regex.replace('*.', '')) + else: + regex_compiled = re.compile(regex) + if bool(regex_compiled.search(fqdn)): + # parameter is in - set flag accordingly and stop loop + proxy = proxy_server_list_new[regex] + logger.debug('proxy_check() match found: fqdn: {0}, regex: {1}'.format(fqdn, regex)) + break + + if '*' in proxy_server_list_new.keys() and not proxy: + logger.debug('proxy_check() wildcard match found: fqdn: {0}'.format(fqdn)) + proxy = proxy_server_list_new['*'] + + logger.debug('proxy_check() ended with {0}'.format(proxy)) + return proxy + def url_get_with_own_dns(logger, url): """ request by using an own dns resolver """ logger.debug('url_get_with_own_dns({0})'.format(url)) @@ -609,22 +637,29 @@ def allowed_gai_family(): family = socket.AF_INET # force IPv4 return family -def url_get(logger, url, dns_server_list=None, verify=True): +def url_get(logger, url, dns_server_list=None, proxy_server=None, verify=True): """ http get """ logger.debug('url_get({0})'.format(url)) - if dns_server_list: + + # configure proxy servers if specified + if proxy_server: + proxy_list = {'http': proxy_server, 'https': proxy_server} + else: + proxy_list = {} + if dns_server_list and not proxy_server: result = url_get_with_own_dns(logger, url) else: try: - req = requests.get(url, headers={'Connection':'close', 'Accept-Encoding': 'gzip', 'User-Agent': 'acme2certifier/{0}'.format(__version__)}) + req = requests.get(url, headers={'Connection':'close', 'Accept-Encoding': 'gzip', 'User-Agent': 'acme2certifier/{0}'.format(__version__)}, proxies=proxy_list) result = req.text except BaseException as err_: + logger.debug('url_get({0}): error'.format(err_)) # force fallback to ipv4 logger.debug('url_get({0}): fallback to v4'.format(url)) old_gai_family = urllib3_cn.allowed_gai_family try: urllib3_cn.allowed_gai_family = allowed_gai_family - req = requests.get(url, verify=verify, headers={'Connection':'close', 'Accept-Encoding': 'gzip', 'User-Agent': 'acme2certifier/{0}'.format(__version__)}) + req = requests.get(url, verify=verify, headers={'Connection':'close', 'Accept-Encoding': 'gzip', 'User-Agent': 'acme2certifier/{0}'.format(__version__)}, proxies=proxy_list) result = req.text except BaseException as err_: result = None @@ -685,21 +720,47 @@ def datestr_to_date(datestr, tformat='%Y-%m-%dT%H:%M:%S'): result = None return result -def servercert_get(logger, hostname, port=443): +def proxystring_convert(logger, proxy_server): + """ convert proxy string """ + logger.debug('proxystring_convert({0})'.format(proxy_server)) + proxy_proto_dic = {'http': socks.PROXY_TYPE_HTTP, 'socks4': socks.PROXY_TYPE_SOCKS4, 'socks5': socks.PROXY_TYPE_SOCKS5} + (proxy_proto, proxy) = proxy_server.split('://') + (proxy_addr, proxy_port) = proxy.split(':') + if proxy_proto and proxy_addr and proxy_port: + try: + proto_string = proxy_proto_dic[proxy_proto] + except BaseException: + logger.error('proxystring_convert(): unknown proxy protocol: {0}'.format(proxy_proto)) + proto_string = None + else: + proto_string = None + + try: + proxy_port = int(proxy_port) + except BaseException as err_: + logger.error('proxystring_convert(): unknown proxy port: {0}'.format(proxy_port)) + proxy_port = None + + logger.debug('proxystring_convert() ended with {0}, {1}, {2}'.format(proto_string, proxy_addr, proxy_port)) + return(proto_string, proxy_addr, proxy_port) + +def servercert_get(logger, hostname, port=443, proxy_server=None): """ get server certificate from an ssl connection """ logger.debug('servercert_get({0}:{1})'.format(hostname, port)) pem_cert = None - context = ssl.create_default_context() - # disable cert validation - context.check_hostname = False - context.verify_mode = ssl.CERT_NONE - with socket.create_connection((hostname, port)) as sock: - with context.wrap_socket(sock, server_hostname=hostname) as sslsock: - der_cert = sslsock.getpeercert(True) - # from binary DER format to PEM + sock = socks.socksocket() + if proxy_server: + (proxy_proto, proxy_addr, proxy_port) = proxystring_convert(logger, proxy_server) + if proxy_proto and proxy_addr and proxy_port: + logger.debug('servercert_get() configure proxy') + sock.setproxy(proxy_proto, proxy_addr, port=proxy_port) + sock.connect((hostname, port)) + with(ssl.wrap_socket(sock, cert_reqs=ssl.CERT_NONE)) as sslsock: + der_cert = sslsock.getpeercert(True) + # from binary DER format to PEM + if der_cert: pem_cert = ssl.DER_cert_to_PEM_cert(der_cert) - return pem_cert def validate_csr(logger, order_dic, _csr): diff --git a/docs/acme_srv.md b/docs/acme_srv.md index 02f48e2..71634c7 100644 --- a/docs/acme_srv.md +++ b/docs/acme_srv.md @@ -7,6 +7,7 @@ | Section | Option | Description | Values | default| | :-------| :------| :-----------| :------| :------| | `DEFAULT` | `debug` | Debug mode| True/False| False| +| `DEFAULT` | `proxy_server_list` | [Proxy-server configuration](proxy_support.md)| {"bar.local$": "http​://10.0.0.1:3128", "foo.local$": "socks5://10.0.0.1:1080"}| None| | `Account` | `ecc_only` | mandates the usage of ECC for account key generation | True/False | False| | `Account` | `inner_header_nonce_allow` | allow nonce header on inner JWS during key-rollover | True/False | False| | `Account` | `tos_check_disable` | turn off "Terms of Service" acceptance check | True/False | False| diff --git a/docs/proxy_support.md b/docs/proxy_support.md new file mode 100644 index 0000000..f2a0ebb --- /dev/null +++ b/docs/proxy_support.md @@ -0,0 +1,25 @@ + + +# Proxy support in acme2certifier + +Proxy got introduced along with acme2certifer version 0.18. + +As of today both http and socks5 proxies are being supported for: + +- validation of http and tls-alpn challenges + +Proxies will be configured in `acme_srv/acme_srv.cfg` and need to be set per destination: + +```cfg +[DEFAULT] +debug: True +proxy_server_list: {"bar.local$": "socks5://proxy.dmn:1080", "foo.local$": "socks5://proxy.dmn:1080"} +``` + +Destination can be: + +- a tld like `.local` +- a domain name like `bar.local` +- an fqdn like `foo.bar.local` + +The usage of wildcards (`host*.bar.local`) and regular expressions (`^hostname.bar.local$`) is also supported. To configure a proxy for all outbound connections please use a single asterisk `{"*": "socks5://proxy.dmn:1080"}` diff --git a/requirements.txt b/requirements.txt index 9303c0d..06c8e23 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,4 +6,5 @@ certsrv[ntlm] pytz configparser python-dateutil -requests==2.25.1 +requests==2.25.1[use_chardet_on_py3] +pysocks diff --git a/test/test_challenge.py b/test/test_challenge.py index 3d8aaf6..5c845d9 100644 --- a/test/test_challenge.py +++ b/test/test_challenge.py @@ -660,7 +660,7 @@ class TestACMEHandler(unittest.TestCase): mock_json.side_effect = Exception('exc_mock_json') with self.assertLogs('test_a2c', level='INFO') as lcm: self.challenge._config_load() - self.assertIn('WARNING:test_a2c:Challenge._config_load() failed with error: exc_mock_json', lcm.output) + self.assertIn('WARNING:test_a2c:Challenge._config_load() dns_server_list failed with error: exc_mock_json', lcm.output) self.assertFalse(self.challenge.challenge_validation_disable) self.assertFalse(self.challenge.tnauthlist_support) self.assertFalse(self.challenge.dns_server_list) @@ -677,20 +677,57 @@ class TestACMEHandler(unittest.TestCase): self.assertFalse(self.challenge.dns_server_list) self.assertEqual({'authz_path': 'url_prefix//acme/authz/','chall_path': 'url_prefix//acme/chall/'}, self.challenge.path_dic) - def test_076__name_get(self): + @patch('acme_srv.challenge.load_config') + def test_076_config_load(self, mock_load_cfg): + """ test _config_load one DNS """ + parser = configparser.ConfigParser() + parser['DEFAULT'] = {'proxy_server_list': '{"key1.bar.local": "val1.bar.local"}'} + mock_load_cfg.return_value = parser + self.challenge._config_load() + self.assertFalse(self.challenge.challenge_validation_disable) + self.assertFalse(self.challenge.tnauthlist_support) + self.assertEqual({'key1.bar.local': 'val1.bar.local'}, self.challenge.proxy_server_list) + + @patch('acme_srv.challenge.load_config') + def test_077_config_load(self, mock_load_cfg): + """ test _config_load one DNS """ + parser = configparser.ConfigParser() + parser['DEFAULT'] = {'proxy_server_list': '{"key1.bar.local": "val1.bar.local", "key2.bar.local": "val2.bar.local"}'} + mock_load_cfg.return_value = parser + self.challenge._config_load() + self.assertFalse(self.challenge.challenge_validation_disable) + self.assertFalse(self.challenge.tnauthlist_support) + self.assertEqual({'key1.bar.local': 'val1.bar.local', 'key2.bar.local': 'val2.bar.local'}, self.challenge.proxy_server_list) + + @patch('json.loads') + @patch('acme_srv.challenge.load_config') + def test_078_config_load(self, mock_load_cfg, mock_json): + """ test _config_load two DNS """ + parser = configparser.ConfigParser() + parser['DEFAULT'] = {'proxy_server_list': '{"key1.bar.local": "val1.bar.local"}'} + mock_load_cfg.return_value = parser + mock_json.side_effect = Exception('exc_mock_json') + with self.assertLogs('test_a2c', level='INFO') as lcm: + self.challenge._config_load() + self.assertIn('WARNING:test_a2c:Challenge._config_load() proxy_server_list failed with error: exc_mock_json', lcm.output) + self.assertFalse(self.challenge.challenge_validation_disable) + self.assertFalse(self.challenge.tnauthlist_support) + self.assertFalse(self.challenge.proxy_server_list) + + def test_079__name_get(self): """ test name get no touch""" url = 'foo' self.assertEqual('foo', self.challenge._name_get(url)) @patch('acme_srv.challenge.parse_url') - def test_077__name_get(self, mock_parse): + def test_080__name_get(self, mock_parse): """ test name get urlparse""" mock_parse.return_value = {'path': 'path'} url = 'foo' self.assertEqual('path', self.challenge._name_get(url)) @patch('acme_srv.challenge.parse_url') - def test_078__name_get(self, mock_parse): + def test_081__name_get(self, mock_parse): """ test name get challenge_path replace """ mock_parse.return_value = {'path': 'foo/my_path'} self.challenge.path_dic = {'chall_path': 'foo/'} @@ -698,7 +735,7 @@ class TestACMEHandler(unittest.TestCase): self.assertEqual('my_path', self.challenge._name_get(url)) @patch('acme_srv.challenge.parse_url') - def test_079__name_get(self, mock_parse): + def test_082__name_get(self, mock_parse): """ test name get challenge_path replace """ mock_parse.return_value = {'path': 'foo/my/path'} self.challenge.path_dic = {'chall_path': 'foo/'} @@ -707,7 +744,7 @@ class TestACMEHandler(unittest.TestCase): @patch('acme_srv.challenge.Challenge._update_authz') @patch('acme_srv.challenge.Challenge._update') - def test_080__validate(self, mock_update, mock_aupdate): + def test_083__validate(self, mock_update, mock_aupdate): """ test validate """ challenge_name = 'challenge_name' payload = 'payload' @@ -721,7 +758,7 @@ class TestACMEHandler(unittest.TestCase): @patch('acme_srv.challenge.Challenge._check') @patch('acme_srv.challenge.Challenge._update_authz') @patch('acme_srv.challenge.Challenge._update') - def test_081__validate(self, mock_update, mock_aupdate, mock_check): + def test_084__validate(self, mock_update, mock_aupdate, mock_check): """ test validate check returned ch:False/inv:False """ challenge_name = 'challenge_name' payload = 'payload' @@ -735,7 +772,7 @@ class TestACMEHandler(unittest.TestCase): @patch('acme_srv.challenge.Challenge._check') @patch('acme_srv.challenge.Challenge._update_authz') @patch('acme_srv.challenge.Challenge._update') - def test_082__validate(self, mock_update, mock_aupdate, mock_check): + def test_085__validate(self, mock_update, mock_aupdate, mock_check): """ test validate check returned ch:False/inv:True """ challenge_name = 'challenge_name' payload = 'payload' @@ -749,7 +786,7 @@ class TestACMEHandler(unittest.TestCase): @patch('acme_srv.challenge.Challenge._check') @patch('acme_srv.challenge.Challenge._update_authz') @patch('acme_srv.challenge.Challenge._update') - def test_083__validate(self, mock_update, mock_aupdate, mock_check): + def test_086__validate(self, mock_update, mock_aupdate, mock_check): """ test validate check returned ch:True/inv:False """ challenge_name = 'challenge_name' payload = 'payload' @@ -763,7 +800,7 @@ class TestACMEHandler(unittest.TestCase): @patch('acme_srv.challenge.Challenge._check') @patch('acme_srv.challenge.Challenge._update_authz') @patch('acme_srv.challenge.Challenge._update') - def test_084__validate(self, mock_update, mock_aupdate, mock_check): + def test_087__validate(self, mock_update, mock_aupdate, mock_check): """ test validate check returned ch:True/inv:True """ challenge_name = 'challenge_name' payload = 'payload' @@ -777,7 +814,7 @@ class TestACMEHandler(unittest.TestCase): @patch('acme_srv.challenge.Challenge._check') @patch('acme_srv.challenge.Challenge._update_authz') @patch('acme_srv.challenge.Challenge._update') - def test_085__validate(self, mock_update, mock_aupdate, mock_check): + def test_088__validate(self, mock_update, mock_aupdate, mock_check): """ test validate check returned ch:True/inv:False """ challenge_name = 'challenge_name' payload = {'keyAuthorization': 'keyAuthorization'} @@ -790,7 +827,7 @@ class TestACMEHandler(unittest.TestCase): @patch('acme_srv.challenge.Challenge._name_get') @patch('acme_srv.challenge.Challenge._info') - def test_086_get(self, mock_info, mock_name): + def test_089_get(self, mock_info, mock_name): """ test get """ mock_info.return_value = 'chall_info' mock_name.return_value = 'foo' @@ -800,7 +837,7 @@ class TestACMEHandler(unittest.TestCase): @patch('acme_srv.challenge.Challenge.new_set') @patch('acme_srv.challenge.Challenge._existing_challenge_validate') @patch('acme_srv.challenge.Challenge._challengelist_search') - def test_087_challengeset_get(self, mock_chsearch, mock_val, mock_set): + def test_090_challengeset_get(self, mock_chsearch, mock_val, mock_set): """ test challengeset_get - no challenge_list returned """ mock_chsearch.return_value = [] mock_val.return_value = True @@ -812,7 +849,7 @@ class TestACMEHandler(unittest.TestCase): @patch('acme_srv.challenge.Challenge.new_set') @patch('acme_srv.challenge.Challenge._existing_challenge_validate') @patch('acme_srv.challenge.Challenge._challengelist_search') - def test_088_challengeset_get(self, mock_chsearch, mock_val, mock_set): + def test_091_challengeset_get(self, mock_chsearch, mock_val, mock_set): """ test challengeset_get - challenge_list returned """ mock_chsearch.return_value = [{'name': 'name1', 'foo': 'bar'}] mock_val.return_value = True @@ -824,7 +861,7 @@ class TestACMEHandler(unittest.TestCase): @patch('acme_srv.challenge.Challenge.new_set') @patch('acme_srv.challenge.Challenge._existing_challenge_validate') @patch('acme_srv.challenge.Challenge._challengelist_search') - def test_089_challengeset_get(self, mock_chsearch, mock_val, mock_set): + def test_092_challengeset_get(self, mock_chsearch, mock_val, mock_set): """ test challengeset_get - challenge_list returned autzstatus pending """ mock_chsearch.return_value = [{'name': 'name1', 'foo': 'bar'}] mock_val.return_value = True diff --git a/test/test_helper.py b/test/test_helper.py index 89c4e59..1c5022f 100644 --- a/test/test_helper.py +++ b/test/test_helper.py @@ -29,7 +29,7 @@ class TestACMEHandler(unittest.TestCase): patch.dict('sys.modules', modules).start() import logging logging.basicConfig(level=logging.CRITICAL) - from acme_srv.helper import b64decode_pad, b64_decode, b64_encode, b64_url_encode, b64_url_recode, ca_handler_get, convert_string_to_byte, convert_byte_to_string, decode_message, decode_deserialize, get_url, generate_random_string, signature_check, validate_email, uts_to_date_utc, date_to_uts_utc, load_config, cert_serial_get, cert_san_get, cert_dates_get, build_pem_file, date_to_datestr, datestr_to_date, dkeys_lower, csr_cn_get, cert_pubkey_get, csr_pubkey_get, url_get, url_get_with_own_dns, dns_server_list_load, csr_san_get, csr_extensions_get, fqdn_resolve, fqdn_in_san_check, sha256_hash, sha256_hash_hex, cert_der2pem, cert_pem2der, cert_extensions_get, csr_dn_get, logger_setup, logger_info, print_debug, jwk_thumbprint_get, allowed_gai_family, patched_create_connection, validate_csr, servercert_get, txt_get + from acme_srv.helper import b64decode_pad, b64_decode, b64_encode, b64_url_encode, b64_url_recode, ca_handler_get, convert_string_to_byte, convert_byte_to_string, decode_message, decode_deserialize, get_url, generate_random_string, signature_check, validate_email, uts_to_date_utc, date_to_uts_utc, load_config, cert_serial_get, cert_san_get, cert_dates_get, build_pem_file, date_to_datestr, datestr_to_date, dkeys_lower, csr_cn_get, cert_pubkey_get, csr_pubkey_get, url_get, url_get_with_own_dns, dns_server_list_load, csr_san_get, csr_extensions_get, fqdn_resolve, fqdn_in_san_check, sha256_hash, sha256_hash_hex, cert_der2pem, cert_pem2der, cert_extensions_get, csr_dn_get, logger_setup, logger_info, print_debug, jwk_thumbprint_get, allowed_gai_family, patched_create_connection, validate_csr, servercert_get, txt_get, proxystring_convert, proxy_check self.logger = logging.getLogger('test_a2c') self.allowed_gai_family = allowed_gai_family self.b64_decode = b64_decode @@ -70,6 +70,7 @@ class TestACMEHandler(unittest.TestCase): self.logger_info = logger_info self.patched_create_connection = patched_create_connection self.print_debug = print_debug + self.proxy_check = proxy_check self.servercert_get = servercert_get self.signature_check = signature_check self.txt_get = txt_get @@ -80,6 +81,7 @@ class TestACMEHandler(unittest.TestCase): self.validate_csr = validate_csr self.sha256_hash = sha256_hash self.sha256_hash_hex = sha256_hash_hex + self.proxystring_convert = proxystring_convert def test_001_helper_b64decode_pad(self): """ test b64decode_pad() method with a regular base64 encoded string """ @@ -1220,11 +1222,13 @@ klGUNHG98CtsmlhrivhSTJWqSIOfyKGF """ patched_create_connection """ self.assertTrue(self.validate_csr(self.logger, 'oder_dic', 'csr')) + @patch('acme_srv.helper.proxystring_convert') @patch('ssl.DER_cert_to_PEM_cert') - @patch('ssl.create_default_context') - @patch('socket.create_connection') - def test_175_servercert_get(self, mock_sock, mock_context, mock_cert): + @patch('ssl.wrap_socket') + @patch('socks.socksocket') + def test_175_servercert_get(self, mock_sock, mock_context, mock_cert, mock_convert): """ test servercert get """ + mock_convert.return_value = ('proxy_proto', 'proxy_addr', 'proxy_port') mock_sock = Mock() mock_context = Mock() mock_cert.return_value = 'foo' @@ -1256,11 +1260,95 @@ klGUNHG98CtsmlhrivhSTJWqSIOfyKGF self.assertFalse(self.txt_get(self.logger, 'foo')) self.assertIn('ERROR:test_a2c:txt_get() error: mock_resolve', lcm.output) - #@patch('configparser.RawConfigParser') - #def test_190_load_config(self, mock_cfg): - # """ test load config """ - # mock_cfg = configparser.ConfigParser() - # self.assertTrue(self.load_config()) + def test_179_proxystring_convert(self): + """ convert proxy_string http """ + self.assertEqual((3, 'proxy', 8080), self.proxystring_convert(self.logger, 'http://proxy:8080')) + + def test_180_proxystring_convert(self): + """ convert proxy_string socks4 """ + self.assertEqual((1, 'proxy', 8080), self.proxystring_convert(self.logger, 'socks4://proxy:8080')) + + def test_181_proxystring_convert(self): + """ convert proxy_string socks5 """ + self.assertEqual((2, 'proxy', 8080), self.proxystring_convert(self.logger, 'socks5://proxy:8080')) + + def test_182_proxystring_convert(self): + """ convert proxy_string unknown protocol """ + with self.assertLogs('test_a2c', level='INFO') as lcm: + self.assertEqual((None, 'proxy', 8080), self.proxystring_convert(self.logger, 'unk://proxy:8080')) + self.assertIn('ERROR:test_a2c:proxystring_convert(): unknown proxy protocol: unk', lcm.output) + + def test_183_proxystring_convert(self): + """ convert proxy_string unknown protocol """ + with self.assertLogs('test_a2c', level='INFO') as lcm: + self.assertEqual((3, 'proxy', None), self.proxystring_convert(self.logger, 'http://proxy:ftp')) + self.assertIn('ERROR:test_a2c:proxystring_convert(): unknown proxy port: ftp', lcm.output) + + def test_184_proxy_check(self): + """ check proxy for empty list """ + fqdn = 'foo.bar.local' + proxy_list = {} + self.assertFalse(self.proxy_check(self.logger, fqdn, proxy_list)) + + def test_182_proxy_check(self): + """ check proxy - no match """ + fqdn = 'foo.bar.local' + proxy_list = {'foo1.bar.local': 'proxy_match'} + self.assertFalse(self.proxy_check(self.logger, fqdn, proxy_list)) + + def test_185_proxy_check(self): + """ check proxy - single entry """ + fqdn = 'foo.bar.local' + proxy_list = {'foo.bar.local': 'proxy_match'} + self.assertEqual('proxy_match', self.proxy_check(self.logger, fqdn, proxy_list)) + + def test_186_proxy_check(self): + """ check proxy - multiple entry """ + fqdn = 'foo.bar.local' + proxy_list = {'bar.bar.local': 'proxy_nomatch', 'foo.bar.local': 'proxy_match'} + self.assertEqual('proxy_match', self.proxy_check(self.logger, fqdn, proxy_list)) + + def test_187_proxy_check(self): + """ check proxy - multiple entrie domain match""" + fqdn = 'foo.bar.local' + proxy_list = {'bar.bar.local': 'proxy_nomatch', 'bar.local$': 'proxy_match'} + self.assertEqual('proxy_match', self.proxy_check(self.logger, fqdn, proxy_list)) + + def test_188_proxy_check(self): + """ check proxy for empty list multiple entrie domain match""" + fqdn = 'foo.bar.local' + proxy_list = {'bar.local$': 'proxy_nomatch', 'foo.bar.local$': 'proxy_match'} + self.assertEqual('proxy_match', self.proxy_check(self.logger, fqdn, proxy_list)) + + def test_189_proxy_check(self): + """ check proxy - multiple entrie domain match""" + fqdn = 'foo.bar.local' + proxy_list = {'bar.local$': 'proxy_match', 'foo1.bar.local$': 'proxy_nomatch'} + self.assertEqual('proxy_match', self.proxy_check(self.logger, fqdn, proxy_list)) + + def test_190_proxy_check(self): + """ check proxy - wildcard """ + fqdn = 'foo.bar.local' + proxy_list = {'foo1.bar.local$': 'proxy_nomatch', '*.bar.local$': 'proxy_match'} + self.assertEqual('proxy_match', self.proxy_check(self.logger, fqdn, proxy_list)) + + def test_191_proxy_check(self): + """ check proxy - wildcard """ + fqdn = 'foo.bar.local' + proxy_list = {'.local$': 'proxy_nomatch', '*.bar.local$': 'proxy_match'} + self.assertEqual('proxy_match', self.proxy_check(self.logger, fqdn, proxy_list)) + + def test_192_proxy_check(self): + """ check proxy - wildcard """ + fqdn = 'local' + proxy_list = {'local$': 'proxy_match', '*.bar.local$': 'proxy_no_match'} + self.assertEqual('proxy_match', self.proxy_check(self.logger, fqdn, proxy_list)) + + def test_193_proxy_check(self): + """ check proxy - wildcard """ + fqdn = 'foo.bar.local' + proxy_list = {'*': 'wildcard', 'notlocal$': 'proxy_no_match', '*.notbar.local$': 'proxy_no_match'} + self.assertEqual('wildcard', self.proxy_check(self.logger, fqdn, proxy_list)) if __name__ == '__main__': unittest.main()