From bb37cb92235c8b64a8ad2a0ed1c926171061ad07 Mon Sep 17 00:00:00 2001 From: lare Date: Sat, 11 Nov 2023 20:35:10 +0100 Subject: [PATCH] [validate-dns] print the error if an unknown error occurs, so that the script doesn't completely exit upon such errors - add warning after summary if at least one 'error' occured - align line beginnings --- validate-my-dns.py | 92 ++++++++++++++++++++++++++-------------------- 1 file changed, 52 insertions(+), 40 deletions(-) diff --git a/validate-my-dns.py b/validate-my-dns.py index 23fba5b31..06f2a8407 100755 --- a/validate-my-dns.py +++ b/validate-my-dns.py @@ -83,13 +83,13 @@ def get_domain_by_mntner(mntner): # handle edge case where if "\t" in nserver[0]: print( - f"WARN: nserver line is not following expected schema. atempting to guess: {line}") + f"WARN: nserver line is not following expected schema. atempting to guess: {line}") errors += 1 nserver = nserver[0].split("\t") # has more than one space between nserver fqdn and its ip if len(nserver) > 2: print( - f"WARN: nserver line is not following expected schema. atempting to guess: {line}") + f"WARN: nserver line is not following expected schema. atempting to guess: {line}") errors += 1 nserver = [nserver[0], line[20:].replace(nserver[0], "").strip()] # ignore registry-sync nservers @@ -161,7 +161,7 @@ def get_domain_by_mntner(mntner): elif int(line.split("/")[1]) <= 24: # TODO: implement creation of multiple zones for every /24 within print( - f"WARN: currently only ipv4 subnets with length >=24 or 16 or 8 are possible to be checked: relavent inetnum {line}") + f"WARN: currently only ipv4 subnets with length >=24 or 16 or 8 are possible to be checked: relavent inetnum {line}") break elif line.startswith("nserver"): _tmp = _parse_nserver(line) @@ -233,7 +233,7 @@ def get_domain_by_mntner(mntner): for nserver in domains[domain]["nserver"]: # if the nserver isn't specified: ... if domains[domain]["nserver"][nserver] == None: - # print(f"INFO: the nserver {nserver} isn't specified in {domain}, looking into the parent domain of it") + # print(f"INFO: the nserver {nserver} isn't specified in {domain}, looking into the parent domain of it") for i in range(len(nserver.split(".")), 1, -1): # check if the nserver is already in loaded database, starts with more specific if ".".join(nserver.split(".")[-i:]) in domains: @@ -243,12 +243,25 @@ def get_domain_by_mntner(mntner): except KeyError: # reaches here if the domain for the nserver specified in the inet{6}num/domain is found, but the nserver itself not. print( - f"Warn: the nserver {nserver} specified in {domain} wasn't found") + f"Warn: the nserver {nserver} specified in {domain} wasn't found") break return domains +# if an unknown error occurs: notify the user +def _handle_unknown_error(e: Exception, nserver: str, domain_name: str): + global errors + print(f"-----------------") + print(f"Error: unknown/unexpected error occured while querying {nserver} for {domain_name}") + print(f"Error: '{e.__class__.__module__}.{e.__class__.__name__}': '{str(e)}'") + print(f"Note: please check your nameserver, your network and any related configuration") + print(f"-----------------") + errors += 1 + time.sleep(3) + return False + + def get_soa(domain_name, nserver): """query dns server for SOA""" global errors, summary @@ -259,7 +272,7 @@ def get_soa(domain_name, nserver): response = dns.query.udp_with_fallback( request, nserver, timeout=TIMEOUT) except dns.exception.Timeout: - print(f"WARN: querying {nserver} for SOA on {domain_name} timed out") + print(f"WARN: querying {nserver} for SOA on {domain_name} timed out") summary[domain_name][SUMMARY.TIMEOUT] += 1 errors += 1 return False @@ -270,16 +283,12 @@ def get_soa(domain_name, nserver): summary[domain_name][SUMMARY.SERVFAIL] += 1 return False except Exception as e: - print( - f"ERROR: unknown error occured while querying {nserver} for {domain_name}: '{e}'") - errors += 1 - time.sleep(3) - return False + return _handle_unknown_error(e) # raise e if response[0].rcode() != 0: # HANDLE QUERY FAILED (SERVER ERROR OR NO SOA RECORD) print( - f"WARN: query for a SOA on {domain_name} failed on {nserver}, returncode: {dns.rcode.to_text(response[0].rcode())}") + f"WARN: query for a SOA on {domain_name} failed on {nserver}, returncode: {dns.rcode.to_text(response[0].rcode())}") errors += 1 if dns.rcode.to_text(response[0].rcode()) == "REFUSED": summary[domain_name][SUMMARY.REFUSED] += 1 @@ -321,7 +330,7 @@ def get_ns(domain_name, nserver): response = dns.query.udp_with_fallback( request, nserver, timeout=TIMEOUT) except dns.exception.Timeout: - print(f"WARN: querying {nserver} for NS on {domain_name} timed out") + print(f"WARN: querying {nserver} for NS on {domain_name} timed out") summary[domain_name][SUMMARY.TIMEOUT] += 1 errors += 1 return False @@ -330,10 +339,12 @@ def get_ns(domain_name, nserver): f"ERROR: server replied with different different ip than requested: error: {e}") errors += 1 return False + except Exception as e: + return _handle_unknown_error(e, nserver=nserver, domain_name=domain_name) if response[0].rcode() != 0: # HANDLE QUERY FAILED (SERVER ERROR OR NO NS RECORD) print( - f"WARN: query for a NS on {domain_name} failed on {nserver}, returncode: {dns.rcode.to_text(response[0].rcode())}") + f"WARN: query for a NS on {domain_name} failed on {nserver}, returncode: {dns.rcode.to_text(response[0].rcode())}") errors += 1 if dns.rcode.to_text(response[0].rcode()) == "REFUSED": summary[domain_name][SUMMARY.REFUSED] += 1 @@ -365,7 +376,7 @@ def get_dnskey(domain_name, nserver): """query dns server for DNSKEY""" global errors, summary if nserver == None: - print("WARN: nserver specified was 'None'") + print("WARN: nserver specified was 'None'") errors += 1 return False try: @@ -375,7 +386,7 @@ def get_dnskey(domain_name, nserver): request, nserver, timeout=TIMEOUT) except dns.exception.Timeout: print( - f"WARN: querying {nserver} for DNSKEY on {domain_name} timed out") + f"WARN: querying {nserver} for DNSKEY on {domain_name} timed out") summary[domain_name][SUMMARY.TIMEOUT] += 1 errors += 1 return False @@ -386,15 +397,17 @@ def get_dnskey(domain_name, nserver): return False except ConnectionRefusedError: print( - f"WARN: {nserver} refused the connection") + f"WARN: {nserver} refused the connection") summary[domain_name][SUMMARY.REFUSED] += 1 errors += 1 return False + except Exception as e: + return _handle_unknown_error(e, nserver=nserver, domain_name=domain_name ) if response[0].rcode() != 0: # HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD) print( - f"WARN: query for a DNSKEY on {domain_name} failed on {nserver}, returncode: {dns.rcode.to_text(response[0].rcode())}") + f"WARN: query for a DNSKEY on {domain_name} failed on {nserver}, returncode: {dns.rcode.to_text(response[0].rcode())}") errors += 1 if dns.rcode.to_text(response[0].rcode()) == "REFUSED": summary[domain_name][SUMMARY.REFUSED] += 1 @@ -491,7 +504,7 @@ def check_dnssec(domain_name, domain_data): no_ds_rdatas = domain_data["ds-rdata"] == [] # if no_ds_rdatas: # print( - # f"NOTE: {domain_name} doesn't have ds-rdata configured, not checking it") + # f"NOTE: {domain_name} doesn't have ds-rdata configured, not checking it") # return True for nserver in domain_data["nserver"]: @@ -499,7 +512,7 @@ def check_dnssec(domain_name, domain_data): # if the nserver is not set (i.e. not loaded from other dns file or "wrong" fqdn) if domain_data["nserver"][nserver] == None: print( - f"INFO: ip address(es) for nserver '{nserver}' in '{domain_name}' isn't specified/loaded") + f"INFO: ip address(es) for nserver '{nserver}' in '{domain_name}' isn't specified/loaded") continue for nsaddr in domain_data["nserver"][nserver]: @@ -512,13 +525,16 @@ def check_dnssec(domain_name, domain_data): # if it timed out: tell the user except dns.exception.Timeout: print( - f"WARN: querying {nserver} ({nsaddr}) for {domain_name} timed out") + f"WARN: querying {nserver} ({nsaddr}) for {domain_name} timed out") summary[domain_name][SUMMARY.TIMEOUT] += 1 continue + except Exception as e: + _handle_unknown_error(e, nserver=f"{nserver} ({nsaddr})", domain_name=domain_name) + continue if no_ds_rdatas: print( - f"INFO: query for {domain_name} SOA on {nserver} ({nsaddr}) succeded, not checking DNSSEC") + f"INFO: query for {domain_name} SOA on {nserver} ({nsaddr}) succeded, not checking DNSSEC") continue # get DNSKEY for zone request = dns.message.make_query( @@ -529,7 +545,7 @@ def check_dnssec(domain_name, domain_data): if response[0].rcode() != 0: # HANDLE QUERY FAILED (SERVER ERROR OR NO DNSKEY RECORD) print( - f"WARN: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), returncode: {response[0].rcode()}") + f"WARN: query for a DNSKEY on {domain_name} failed on {nserver} ({nsaddr}), returncode: {response[0].rcode()}") errors += 1 continue # answer should contain two RRSET: DNSKEY and RRSIG(DNSKEY) @@ -556,7 +572,7 @@ def check_dnssec(domain_name, domain_data): except dns.dnssec.ValidationFailure: # BE SUSPICIOUS print( - f"WARN: DNSSEC validation failed on {domain_name} failed on {nserver} ({nsaddr}), answer: {answer}") + f"WARN: DNSSEC validation failed on {domain_name} failed on {nserver} ({nsaddr}), answer: {answer}") summary[domain_name][SUMMARY.DNSSEC_FAIL] += 1 errors += 1 except AttributeError as e: @@ -565,7 +581,7 @@ def check_dnssec(domain_name, domain_data): else: # WE'RE GOOD, THERE'S A VALID DNSSEC SELF-SIGNED KEY FOR example.com print( - f"INFO: DNSSEC validation succeded on {domain_name} failed on {nserver} ({nsaddr})") + f"INFO: DNSSEC validation succeded on {domain_name} failed on {nserver} ({nsaddr})") summary[domain_name][SUMMARY.SUCCESS] += 1 success = True @@ -581,20 +597,13 @@ def main(mntner): # get all domains/inet(6)nums of the mntner domains = get_domain_by_mntner(mntner=mntner) - # global _tmp_continue, _tmp_found - # _tmp_found = False - # _tmp_continue = "10.in-addr.arpa" def check_dns(domain_name): global errors, summary - # global _tmp_found, _tmp_continue - # if domain_name == _tmp_continue: - # _tmp_found = True - # if not _tmp_found: - # return + summary[domain_name] = [0, 0, 0, 0, 0, 0, 0, 0] # check if the domain doesn't have DS data if domains[domain_name]["ds-rdata"] == []: - print(f"NOTE: {domain_name} doesn't have any ds-rdata specified") + print(f"NOTE: {domain_name} doesn't have any ds-rdata specified") # continue for nserver in domains[domain_name]["nserver"]: @@ -638,7 +647,7 @@ def main(mntner): break else: print( - f"WARN: master nserver '{master_ns}' returned by {ip}({nserver}) not in the list of the specified nservers of {domain_name}") + f"WARN: master nserver '{master_ns}' returned by {ip}({nserver}) not in the list of the specified nservers of {domain_name}") summary[domain_name][SUMMARY.WRONG_SOA] += 1 errors += 1 @@ -649,7 +658,7 @@ def main(mntner): # print(f"DEBUG: response {_ns}") if not f"{nserver}." in _ns: print( - f"WARN: returned nservers returned by {ip}({nserver}) for {domain_name} does not include it self") + f"WARN: returned nservers returned by {ip}({nserver}) for {domain_name} does not include it self") summary[domain_name][SUMMARY.WRONG_NS] += 1 errors += 1 @@ -661,13 +670,13 @@ def main(mntner): break else: print( - f"INFO: {_nserver} was not listed in the NS records by {ip}({nserver}) for {domain_name}") + f"INFO: {_nserver} was not listed in the NS records by {ip}({nserver}) for {domain_name}") summary[domain_name][SUMMARY.WRONG_NS] += 1 # check if there are any left if len(_ns) > 0: for _server in _ns: print( - f"INFO: {_server} in response for NS records by {ip}({nserver}) for {domain_name} but not in the dns/inet(6)num file") + f"INFO: {_server} in response for NS records by {ip}({nserver}) for {domain_name} but not in the dns/inet(6)num file") summary[domain_name][SUMMARY.WRONG_NS] += 1 # don't check dnssec if not configured @@ -700,7 +709,7 @@ def main(mntner): # print(f"DEBUG: generated: {ds_candidates}") if found: print( - f"INFO: correct ds-rdata specified and matching DNSKEY returned by {ip} for {domain_name}") + f"INFO: correct ds-rdata specified and matching DNSKEY returned by {ip} for {domain_name}") summary[domain_name][SUMMARY.SUCCESS] += 1 else: print( @@ -736,12 +745,15 @@ def main(mntner): if len(_domain) > _max_domain_length: _max_domain_length = len(_domain) - print("\n\nSummary:\n") + print("```\n\nSummary:\n") print(f"{'domain name'.ljust(_max_domain_length)} | success | dnssec fail | wrong NS | wrong SOA | NXDOMAIN | REFUSED | SERVFAIL | timeout") print(f"-{'-'.rjust(_max_domain_length, '-') }-|---------|-------------|----------|-----------|----------|---------| -------- | -------") for domain in summary: print(f" {domain.rjust(1).ljust(_max_domain_length)} | {str(summary[domain][SUMMARY.SUCCESS]).rjust(7)} | {str(summary[domain][SUMMARY.DNSSEC_FAIL]).rjust(11)} | {str(summary[domain][SUMMARY.WRONG_NS]).rjust(8)} | {str(summary[domain][SUMMARY.WRONG_SOA]).rjust(9)} | {str(summary[domain][SUMMARY.NXDOMAIN]).rjust(8)} | {str(summary[domain][SUMMARY.REFUSED]).rjust(7)} | {str(summary[domain][SUMMARY.SERVFAIL]).rjust(8)} | {str(summary[domain][SUMMARY.TIMEOUT]).rjust(7)}") + print("```\n") + if errors > 0: + print("WARN: at least one 'error' occured while checking. check the table and output above") # print(summary)