diff --git a/utils/schema-check/dn42-schema.py b/utils/schema-check/dn42-schema.py index 1fe098294..acf9cf3b7 100755 --- a/utils/schema-check/dn42-schema.py +++ b/utils/schema-check/dn42-schema.py @@ -1,21 +1,25 @@ #!/usr/bin/env python3 +"DN42 Schema Checker" from __future__ import print_function import re import os import sys +import time import argparse -import log import glob -import random -from pprint import pprint +import urllib.parse +import http.client +import json +import log SCHEMA_NAMESPACE = "dn42." class SchemaDOM: + "schema" def __init__(self, fn): self.name = None self.ref = None @@ -27,30 +31,30 @@ class SchemaDOM: def __parse_schema(self, f): schema = {} - for k, v, l in f.dom: - if k == 'ref': - self.ref = v - elif k == 'schema': - self.name = v + for key, val, _ in f.dom: + if key == "ref": + self.ref = val + elif key == "schema": + self.name = val - if k != 'key': + if key != "key": continue - v = v.split() - key = v.pop(0) + val = val.split() + key = val.pop(0) schema[key] = set() - for i in v: + for i in val: if i == ">": break schema[key].add(i) for k, v in schema.items(): - if 'schema' in v: + if "schema" in v: self.type = k - if 'primary' in v: + if "primary" in v: self.primary = k schema[k].add("oneline") if "multiline" in v: @@ -66,51 +70,63 @@ class SchemaDOM: if "deprecate" in v: schema[k].remove("deprecate") - if 'oneline' not in v: + if "oneline" not in v: schema[k].add("multiline") - if 'single' not in v: + if "single" not in v: schema[k].add("multiple") return schema def check_file(self, f, lookups=None): + "check file" status = "PASS" if not f.valid: - log.error("%s Line 0: File does not parse" % (f.src)) - status = "FAIL" + log.error("%s Line 0: File does not parse" % (f.src)) + status = "FAIL" for k, v in self.schema.items(): - if 'required' in v and k not in f.keys: - log.error( - "%s Line 0: Key [%s] not found and is required." % (f.src, k)) + if "required" in v and k not in f.keys: + log.error("%s Line 0: Key [%s] not found and is required." % (f.src, k)) status = "FAIL" - elif 'recommend' in v and k not in f.keys: + elif "recommend" in v and k not in f.keys: log.notice( - "%s Line 0: Key [%s] not found and is recommended." % (f.src, k)) + "%s Line 0: Key [%s] not found and is recommended." % (f.src, k) + ) status = "NOTE" - if 'schema' in v and SCHEMA_NAMESPACE + f.dom[0][0] != self.ref: + if "schema" in v and SCHEMA_NAMESPACE + f.dom[0][0] != self.ref: log.error( - "%s Line 1: Key [%s] not found and is required as the first line." % (f.src, k)) + "%s Line 1: Key [%s] not found and is required as the first line." + % (f.src, k) + ) status = "FAIL" - if 'single' in v and k in f.keys and len(f.keys[k]) > 1: - log.warning("%s Line %d: Key [%s] first defined here and has repeated keys." % ( - f.src, f.keys[k][0], k)) + if "single" in v and k in f.keys and len(f.keys[k]) > 1: + log.warning( + "%s Line %d: Key [%s] first defined here and has repeated keys." + % (f.src, f.keys[k][0], k) + ) for l in f.keys[k][1:]: log.error( - "%s Line %d: Key [%s] can only appear once." % (f.src, l, k)) + "%s Line %d: Key [%s] can only appear once." % (f.src, l, k) + ) status = "FAIL" - if 'oneline' in v and k in f.multi: + if "oneline" in v and k in f.multi: for l in f.keys[k]: log.error( - "%s Line %d: Key [%s] can not have multiple lines." % (f.src, l, k)) + "%s Line %d: Key [%s] can not have multiple lines." + % (f.src, l, k) + ) status = "FAIL" for k, v, l in f.dom: - if k == self.primary and not f.src.endswith(v.replace("/","_").replace(" ","")): - log.error("%s Line %d: Primary [%s: %s] does not match filename." % (f.src, l, k, v)) + if k == self.primary and not f.src.endswith( + v.replace("/", "_").replace(" ", "")): + log.error( + "%s Line %d: Primary [%s: %s] does not match filename." + % (f.src, l, k, v) + ) status = "FAIL" if k.startswith("x-"): @@ -121,9 +137,11 @@ class SchemaDOM: status = "FAIL" continue else: - if 'deprecate' in self.schema[k]: + if "deprecate" in self.schema[k]: log.info( - "%s Line %d: Key [%s] was found and is deprecated." % (f.src, l, k)) + "%s Line %d: Key [%s] was found and is deprecated." + % (f.src, l, k) + ) status = "INFO" if lookups is not None: @@ -136,20 +154,22 @@ class SchemaDOM: if (ref, val) in lookups: found = True if not found: - log.error("%s Line %d: Key %s references object %s in %s but does not exist." % ( - f.src, l, k, val, refs)) + log.error( + "%s Line %d: Key %s references object %s in %s but does not exist." + % (f.src, l, k, val, refs) + ) status = "FAIL" if status != "FAIL": ck = sanity_check(f) if ck == "FAIL": status = ck - print("CHECK\t%-54s\t%s\tMNTNERS: %s" %(f.src, status, ','.join(f.mntner))) + print("CHECK\t%-54s\t%s\tMNTNERS: %s" % (f.src, status, ",".join(f.mntner))) return status class FileDOM: - + "file" def __init__(self, fn): self.valid = True self.dom = [] @@ -159,7 +179,7 @@ class FileDOM: self.schema = None self.src = fn - with open(fn, mode='r', encoding='utf-8') as f: + with open(fn, mode="r", encoding="utf-8") as f: dom = [] keys = {} multi = {} @@ -167,9 +187,9 @@ class FileDOM: last_multi = None for lineno, i in enumerate(f.readlines(), 1): - if re.match(r'[ \t]', i): + if re.match(r"[ \t]", i): if len(dom) == 0: - log.error("File %s does not parse properly" % (fn) ) + log.error("File %s does not parse properly" % (fn)) self.valid = False return @@ -187,7 +207,7 @@ class FileDOM: if len(i) < 2: continue - dom.append([i[0].strip(), ':'.join(i[1:]).strip(), lineno - 1]) + dom.append([i[0].strip(), ":".join(i[1:]).strip(), lineno - 1]) if i[0].strip() not in keys: keys[i[0].strip()] = [] @@ -196,7 +216,7 @@ class FileDOM: last_multi = None - if dom[-1][0] == 'mnt-by': + if dom[-1][0] == "mnt-by": mntner.append(dom[-1][1]) self.dom = dom @@ -216,11 +236,12 @@ class FileDOM: s += i[0] + ":" + " " * (length - len(i[0])) + l[0] + "\n" for m in l[1:]: - s += " " * (length + 1) + m + "\n" + s += " " * (length + 1) + m + "\n" return s def get(self, key, index=0, default=None): + "get value" if key not in self.keys: return default if index >= len(self.keys[key]) or index <= -len(self.keys[key]): @@ -230,7 +251,7 @@ class FileDOM: def main(infile, schema): - + "main command" log.debug("Check File: %s" % (infile)) f = FileDOM(infile) @@ -248,54 +269,67 @@ def main(infile, schema): s = SchemaDOM(f.schema) return s.check_file(f) + def check_schemas(path): + "check schemas" schemas = {} - for fn in glob.glob(path+"/*"): + for fn in glob.glob(path + "/*"): s = SchemaDOM(fn) log.info("read schema: %s" % (s.name)) schemas[s.ref] = s ok = True c = schemas[SCHEMA_NAMESPACE + "schema"] - for s in schemas.keys(): + for s in schemas: ck = c.check_file(s) if not ck: ok = False return ok + def scan_index(infile, mntner=None): + "scan index" idx = {} schemas = {} - with open(infile, 'r') as f: + with open(infile, "r") as f: for line in f.readlines(): line = line.split() idx[(line[0], line[1])] = line[2:] - if line[0] == SCHEMA_NAMESPACE + 'schema': + if line[0] == SCHEMA_NAMESPACE + "schema": s = SchemaDOM(line[2]) log.info("read schema: %s" % (s.name)) schemas[s.ref] = s return __scan_index(idx, schemas, mntner) + def scan_files(path, mntner=None, use_file=None): + "scan files" arr = __index_files(path, use_file) idx = {} schemas = {} for dom in arr: - line = (dom.schema, dom.src.split("/")[-1].replace("_", "/"), dom.src, ",".join(dom.mntner), dom) + line = ( + dom.schema, + dom.src.split("/")[-1].replace("_", "/"), + dom.src, + ",".join(dom.mntner), + dom, + ) idx[(line[0], line[1])] = line[2:] - if line[0] == SCHEMA_NAMESPACE + 'schema': + if line[0] == SCHEMA_NAMESPACE + "schema": s = SchemaDOM(line[2]) schemas[s.ref] = s return __scan_index(idx, schemas, mntner, use_file) -def __scan_index(idx, schemas, mntner, use_file): + +def __scan_index(idx, schemas, mntner, use_file=None): ok = True for k, v in idx.items(): if use_file is not None and use_file != v[0]: @@ -304,7 +338,7 @@ def __scan_index(idx, schemas, mntner, use_file): s = schemas.get(k[0], None) if s is None: log.error("No schema found for %s" % (k[1])) - print("CHECK\t%-54s\tFAIL\tMNTNERS: UNKNOWN" %(v[2].src)) + print("CHECK\t%-54s\tFAIL\tMNTNERS: UNKNOWN" % (v[2].src)) ok = "FAIL" else: @@ -315,7 +349,6 @@ def __scan_index(idx, schemas, mntner, use_file): if mntner is not None and mntner not in mlist: continue - c = v[2] ck = s.check_file(c, idx.keys()) @@ -325,34 +358,35 @@ def __scan_index(idx, schemas, mntner, use_file): ok = ck return ok -def __index_files(path, use_file): + +def __index_files(path, use_file=None): xlat = { - "dns/": SCHEMA_NAMESPACE + "domain", - "inetnum/": SCHEMA_NAMESPACE + "inetnum", - "inet6num/": SCHEMA_NAMESPACE + "inet6num", - "route/": SCHEMA_NAMESPACE + "route", - "route6/": SCHEMA_NAMESPACE + "route6", - "aut-num/": SCHEMA_NAMESPACE + "aut-num", - "as-set/": SCHEMA_NAMESPACE + "as-set", - "as-block/": SCHEMA_NAMESPACE + "as-block", + "dns/": SCHEMA_NAMESPACE + "domain", + "inetnum/": SCHEMA_NAMESPACE + "inetnum", + "inet6num/": SCHEMA_NAMESPACE + "inet6num", + "route/": SCHEMA_NAMESPACE + "route", + "route6/": SCHEMA_NAMESPACE + "route6", + "aut-num/": SCHEMA_NAMESPACE + "aut-num", + "as-set/": SCHEMA_NAMESPACE + "as-set", + "as-block/": SCHEMA_NAMESPACE + "as-block", "organisation/": SCHEMA_NAMESPACE + "organisation", - "mntner/": SCHEMA_NAMESPACE + "mntner", - "person/": SCHEMA_NAMESPACE + "person", - "role/": SCHEMA_NAMESPACE + "role", - "tinc-key/": SCHEMA_NAMESPACE + "tinc-key", - "tinc-keyset/": SCHEMA_NAMESPACE + "tinc-keyset", - "registry/": SCHEMA_NAMESPACE + "registry", - "schema/": SCHEMA_NAMESPACE + "schema", + "mntner/": SCHEMA_NAMESPACE + "mntner", + "person/": SCHEMA_NAMESPACE + "person", + "role/": SCHEMA_NAMESPACE + "role", + "tinc-key/": SCHEMA_NAMESPACE + "tinc-key", + "tinc-keyset/": SCHEMA_NAMESPACE + "tinc-keyset", + "registry/": SCHEMA_NAMESPACE + "registry", + "schema/": SCHEMA_NAMESPACE + "schema", } - for root, dirs, files in os.walk(path): + for root, _, files in os.walk(path): ignore = True - for t in xlat.keys(): - if root+"/" == os.path.join(path, t): - ignore = False - break + for t in xlat: + if root + "/" == os.path.join(path, t): + ignore = False + break if ignore: - continue + continue for f in files: if f[0] == ".": @@ -361,35 +395,35 @@ def __index_files(path, use_file): yield dom if use_file is not None: - dom = FileDOM(use_file) - yield dom + dom = FileDOM(use_file) + yield dom + def index_files(path): + "index files" idx = __index_files(path) for i in idx: print("%s\t%s\t%s\t%s" % i) -def http_get(server, url, query=None, headers=None): - import urllib.parse - import http.client - import json +def http_get(server, url, query=None, headers=None): + "http get" if headers is None: headers = {} - if 'User-Agent' not in headers: - headers['User-Agent'] = "curl" - if 'Accept' not in headers: - headers['Accept'] = 'application/json' + if "User-Agent" not in headers: + headers["User-Agent"] = "curl" + if "Accept" not in headers: + headers["Accept"] = "application/json" if query is None: query = {} http_client = http.client.HTTPSConnection(server) - full_url = url + '?' + urllib.parse.urlencode(query) + full_url = url + "?" + urllib.parse.urlencode(query) log.debug("GET " + full_url) - http_client.request('GET', full_url, headers=headers) + http_client.request("GET", full_url, headers=headers) req = http_client.getresponse() log.debug("HTTP Response: %d %s" % (req.status, req.reason)) @@ -398,7 +432,7 @@ def http_get(server, url, query=None, headers=None): return {} r = req.read() if not isinstance(r, str): - r = r.decode("utf-8") + r = r.decode("utf-8") return json.loads(r) if req.status > 299: @@ -406,61 +440,73 @@ def http_get(server, url, query=None, headers=None): return req.read() -def find(fields=None, filter=None): + +def find(fields=None, filters=None): + "find" server = "registry.dn42.us" - url = "/v1/reg/reg.objects" + url = "/v1/reg/reg.objects" if fields is None: fields = [] - if filter is None: - filter = {} - query = {"fields": ",".join(fields), "filter": ",".join([k + "=" + v for k, v in filter.items()])} + if filters is None: + filters = {} + query = { + "fields": ",".join(fields), + "filter": ",".join([k + "=" + v for k, v in filters.items()]), + } return http_get(server, url, query) + def to_num(ip): - ip = [int(i) for i in ip.split('.')] + "ip to number" + ip = [int(i) for i in ip.split(".")] return ip[3] + ip[2] * 256 + ip[1] * 256 ** 2 + ip[0] * 256 ** 3 + def to_ip(num): - return '.'.join([str(i) for i in [num >> 24, (num >> 16) & 0xFF, (num >> 8) & 0xFF, num & 0xFF]]) + "number to ip" + return ".".join( + [str(i) for i in [num >> 24, (num >> 16) & 0xFF, (num >> 8) & 0xFF, num & 0xFF]] + ) + def pretty_ip(addr): + "pretty ip" if addr.startswith("00000000000000000000ffff"): addr = addr[-8:] addr = int(addr, 16) return to_ip(addr) - return ":".join([addr[i:i+4] for i in range(0, len(addr), 4)]) + return ":".join([addr[i:i + 4] for i in range(0, len(addr), 4)]) -# addr = ["%x" % (int(addr[i:i+4],16)) for i in range(0, len(addr), 4)] -# -# last_seg = 8 -# for i, seg in enumerate(addr[::-1]): -# if seg == "0": last_seg = 7 - i -# else: break -# -# return ":".join(addr[:last_seg]) + ("::" if last_seg < 8 else "") def expand_ipv6(addr): + "expand ip6" addr = addr.lower() if "::" in addr: - if addr.count('::') > 1: + if addr.count("::") > 1: return False - addr = addr.replace('::', ':' * (9 - addr.count(':'))) - if addr.count(':') != 7: + addr = addr.replace("::", ":" * (9 - addr.count(":"))) + if addr.count(":") != 7: return False - return ''.join((i.zfill(4) for i in addr.split(":"))) + return "".join((i.zfill(4) for i in addr.split(":"))) + def ip4_to_ip6(ip): - return "::ffff:%04x:%04x" % (ip >> 16, ip & 0xffff) + "ip4 to ip6" + return "::ffff:%04x:%04x" % (ip >> 16, ip & 0xFFFF) + def inetrange(inet): - ip, mask = inet.split('/') + "inet range" + ip, mask = inet.split("/") mask = int(mask) ip = to_num(ip) & (0xFFFFFFFF << 32 - mask) ip6 = ip4_to_ip6(ip) return inet6range("%s/%d" % (ip6, mask + 96)) + def inet6range(inet): - ip, mask = inet.split('/') + "inet6 range" + ip, mask = inet.split("/") mask = int(mask) log.debug(ip) @@ -470,81 +516,95 @@ def inet6range(inet): return ip, ip, mask offset = int(ip[mask // 4], 16) - return "%s%x%s" % ( - ip[:mask // 4], - offset & (0xf0 >> mask % 4), - "0" * (31 - mask // 4) - ), "%s%x%s" % ( - ip[:mask // 4], - offset | (0xf >> mask % 4), - "f" * (31 - mask // 4) - ), mask + return ( + "%s%x%s" + % (ip[: mask // 4], offset & (0xF0 >> mask % 4), "0" * (31 - mask // 4)), + "%s%x%s" + % (ip[: mask // 4], offset | (0xF >> mask % 4), "f" * (31 - mask // 4)), + mask, + ) + def test_policy(obj_type, name, mntner): + "test policy" log.debug([obj_type, name, mntner]) - if obj_type in ["organisation", "mntner", "person", "role", "as-set", "schema", "dns"]: + if obj_type in ["organisation", + "mntner", + "person", + "role", + "as-set", + "schema", + "dns", + ]: if obj_type == "organisation" and not name.startswith("ORG-"): - log.error("%s does not start with 'ORG-'" %(name)) + log.error("%s does not start with 'ORG-'" % (name)) return "FAIL" elif obj_type == "mntner" and not name.endswith("-MNT"): - log.error("%s does not end with '-MNT'" %(name)) + log.error("%s does not end with '-MNT'" % (name)) return "FAIL" elif obj_type == "dns" and not name.endswith(".dn42"): - log.error("%s does not end with '.dn42'" %(name)) + log.error("%s does not end with '.dn42'" % (name)) return "FAIL" elif obj_type == "dns" and len(name.strip(".").split(".")) != 2: - log.error("%s is not a second level domain" %(name)) + log.error("%s is not a second level domain" % (name)) return "FAIL" elif obj_type in ["person", "role"] and not name.endswith("-DN42"): - log.error("%s does not end with '-DN42'" %(name)) + log.error("%s does not end with '-DN42'" % (name)) return "FAIL" lis = find(["mnt-by"], {"@type": obj_type, "@name": name}) log.debug(lis) if len(lis) == 0: - log.notice("%s does not currently exist" %(name)) + log.notice("%s does not currently exist" % (name)) return "PASS" - status = 'FAIL' + status = "FAIL" for o in lis: for n in o: log.debug(n) log.debug(mntner) if n[0] == "mnt-by" and n[1] == mntner: - status = 'PASS' + status = "PASS" return status - log.error("%s does not have mnt for object" %(mntner)) + log.error("%s does not have mnt for object" % (mntner)) return status - elif args["type"] in ["inetnum","inet6num"]: + elif obj_type in ["inetnum", "inet6num"]: log.info("Checking inetnum type") lis = find(["mnt-by"], {"@type": "net", "cidr": name}) log.debug(lis) if len(lis) > 0: - status = 'FAIL' + status = "FAIL" for o in lis: for n in o: if n[0] == "mnt-by" and n[1] == mntner: - status = 'PASS' - log.notice("%s has mnt for current object" %(mntner)) + status = "PASS" + log.notice("%s has mnt for current object" % (mntner)) return status - log.error("%s does not have mnt for current object" %(mntner)) + log.error("%s does not have mnt for current object" % (mntner)) return status - if args["type"] == "inetnum": + if obj_type == "inetnum": Lnet, Hnet, mask = inetrange(name) else: Lnet, Hnet, mask = inet6range(name) - mask = "%03d" %(mask) + mask = "%03d" % (mask) log.info([Lnet, Hnet, mask]) - lis = find(["inetnum","inet6num","policy","@netlevel","mnt-by","mnt-lower"], - {"@type": "net", "@netmin": "le=" + Lnet, "@netmax": "ge=" + Hnet, "@netmask": "lt=" + mask}) + lis = find( + ["inetnum", "inet6num", "policy", "@netlevel", "mnt-by", "mnt-lower"], + { + "@type": "net", + "@netmin": "le=" + Lnet, + "@netmax": "ge=" + Hnet, + "@netmask": "lt=" + mask, + }, + ) log.debug(lis) policy = {} @@ -563,46 +623,53 @@ def test_policy(obj_type, name, mntner): if select is None: select = k - elif select<=k: + elif select <= k: select = k - if select == None: + if select is None: pass - elif policy.get(select,{}).get("policy","closed") == "open": + elif policy.get(select, {}).get("policy", "closed") == "open": log.notice("Policy is open for parent object") return "PASS" # 3. Check if mntner or mnt-lower for any as-block in the tree. elif mntner in mntners: - log.notice("%s has mnt in parent object" %(mntner)) + log.notice("%s has mnt in parent object" % (mntner)) return "PASS" - elif args["type"] in ["route","route6"]: + elif obj_type in ["route", "route6"]: log.info("Checking route type") - lis = find(["mnt-by"], {"@type": "route", args["type"]: name}) + lis = find(["mnt-by"], {"@type": "route", obj_type: name}) log.debug(lis) if len(lis) > 0: - status = 'FAIL' + status = "FAIL" for o in lis: for n in o: if n[0] == "mnt-by" and n[1] == mntner: - status = 'PASS' - log.notice("%s has mnt for current object" %(mntner)) + status = "PASS" + log.notice("%s has mnt for current object" % (mntner)) return status - log.error("%s does not have mnt for current object" %(mntner)) + log.error("%s does not have mnt for current object" % (mntner)) return status - if args["type"] == "route": + if obj_type == "route": Lnet, Hnet, mask = inetrange(name) else: Lnet, Hnet, mask = inet6range(name) - mask = "%03d" %(mask) + mask = "%03d" % (mask) log.info([Lnet, Hnet, mask]) - lis = find(["inetnum","inet6num","policy","@netlevel","mnt-by","mnt-lower"], - {"@type": "net", "@netmin": "le=" + Lnet, "@netmax": "ge=" + Hnet, "@netmask": "le=" + mask}) + lis = find( + ["inetnum", "inet6num", "policy", "@netlevel", "mnt-by", "mnt-lower"], + { + "@type": "net", + "@netmin": "le=" + Lnet, + "@netmax": "ge=" + Hnet, + "@netmask": "le=" + mask, + }, + ) log.debug(lis) policy = {} @@ -621,24 +688,24 @@ def test_policy(obj_type, name, mntner): if select is None: select = k - elif select<=k: + elif select <= k: select = k - if select == None: + if select is None: pass - elif policy.get(select,{}).get("policy","closed") == "open": + elif policy.get(select, {}).get("policy", "closed") == "open": log.notice("Policy is open for parent object") return "PASS" # 3. Check if mntner or mnt-lower for any as-block in the tree. elif mntner in mntners: - log.notice("%s has mnt in parent object" %(mntner)) + log.notice("%s has mnt in parent object" % (mntner)) return "PASS" - elif args["type"] == "aut-num": + elif obj_type == "aut-num": if not name.startswith("AS"): - log.error("%s does not start with AS" %(name)) + log.error("%s does not start with AS" % (name)) return "FAIL" # 1. Check if they already have an object @@ -646,20 +713,22 @@ def test_policy(obj_type, name, mntner): log.debug(lis) if len(lis) > 0: - status = 'FAIL' + status = "FAIL" for o in lis: for n in o: if n[0] == "mnt-by" and n[1] == mntner: - status = 'PASS' - log.notice("%s has mnt for current object" %(mntner)) + status = "PASS" + log.notice("%s has mnt for current object" % (mntner)) return status - log.error("%s does not have mnt for current object" %(mntner)) + log.error("%s does not have mnt for current object" % (mntner)) return status # 2. Check if the as-block has an open policy asn = "AS{:0>9}".format(name[2:]) - lis = find(["as-block","policy","@as-min","@as-max","mnt-by","mnt-lower"], - {"@type": "as-block","@as-min":"le=" + asn,"@as-max": "ge=" + asn}) + lis = find( + ["as-block", "policy", "@as-min", "@as-max", "mnt-by", "mnt-lower"], + {"@type": "as-block", "@as-min": "le=" + asn, "@as-max": "ge=" + asn}, + ) log.info(lis) policy = {} @@ -673,29 +742,29 @@ def test_policy(obj_type, name, mntner): if o[0].startswith("mnt-"): mntners.append(o[1]) - k = (obj["@as-min"],obj["@as-max"]) + k = (obj["@as-min"], obj["@as-max"]) policy[k] = obj if select is None: select = k - elif select[0]<=k[0] or select[1]>=k[1]: + elif select[0] <= k[0] or select[1] >= k[1]: select = k - if policy.get(select,{}).get("policy","closed") == "open": + if policy.get(select, {}).get("policy", "closed") == "open": log.notice("Policy is open for parent object") return "PASS" # 3. Check if mntner or mnt-lower for any as-block in the tree. elif mntner in mntners: - log.notice("%s has mnt in parent object" %(mntner)) + log.notice("%s has mnt in parent object" % (mntner)) return "PASS" - elif args["type"] == "as-block": + elif obj_type == "as-block": Lname, Hname = name.split("-") Lname, Hname = Lname.strip(), Hname.strip() if not Lname.startswith("AS") or not Hname.startswith("AS"): - log.error("%s does not start with AS for min and max" %(name)) + log.error("%s does not start with AS for min and max" % (name)) return "FAIL" # 1. Check if they already have an object @@ -703,14 +772,14 @@ def test_policy(obj_type, name, mntner): log.debug(lis) if len(lis) > 0: - status = 'FAIL' + status = "FAIL" for o in lis: for n in o: if n[0] == "mnt-by" and n[1] == mntner: - status = 'PASS' - log.notice("%s has mnt for current object" %(mntner)) + status = "PASS" + log.notice("%s has mnt for current object" % (mntner)) return status - log.notice("%s does not have mnt for current object" %(mntner)) + log.notice("%s does not have mnt for current object" % (mntner)) return status # 2. Check if the parent as-blocks have an open policy @@ -718,9 +787,12 @@ def test_policy(obj_type, name, mntner): Hasn = "AS{:0>9}".format(Hname[2:]) if Lasn > Hasn: - log.error("%s should come before %s" %(Lname, Hname)) + log.error("%s should come before %s" % (Lname, Hname)) - lis = find(["as-block","policy","@as-min","@as-max","mnt-by","mnt-lower"], {"@type": "as-block","@as-min":"le=" + Lasn,"@as-max": "ge=" + Hasn}) + lis = find( + ["as-block", "policy", "@as-min", "@as-max", "mnt-by", "mnt-lower"], + {"@type": "as-block", "@as-min": "le=" + Lasn, "@as-max": "ge=" + Hasn}, + ) log.debug(lis) policy = {} @@ -734,12 +806,12 @@ def test_policy(obj_type, name, mntner): if o[0].startswith("mnt-"): mntners.append(o[1]) - k = (obj["@as-min"],obj["@as-max"]) + k = (obj["@as-min"], obj["@as-max"]) policy[k] = obj if select is None: select = k - elif select[0]<=k[0] or select[1]>=k[1]: + elif select[0] <= k[0] or select[1] >= k[1]: select = k # Policy Open only applies to aut-nums. as-blocks must be defined by parent mntners only. @@ -750,110 +822,149 @@ def test_policy(obj_type, name, mntner): # 3. Check if mntner or mnt-lower for any as-block in the tree. if mntner in mntners: - log.notice("%s has mnt in parent object" %(mntner)) + log.notice("%s has mnt in parent object" % (mntner)) return "PASS" - pass - log.error("%s does not pass checks for %s %s" %(mntner, obj_type, name)) + log.error("%s does not pass checks for %s %s" % (mntner, obj_type, name)) return "FAIL" + def sanity_check(dom): -# log.info(dom.keys) -# log.info(dom.dom) + "sanity check" ck = "PASS" if dom.schema == "dn42.inetnum": cidr = dom.get("cidr") - Lnet, Hnet, mask = inetrange(cidr) + Lnet, Hnet, _ = inetrange(cidr) cidr_range = pretty_ip(Lnet) + "-" + pretty_ip(Hnet) file_range = dom.get("inetnum") file_range = re.sub(r"\s+", "", file_range, flags=re.UNICODE) if cidr_range != file_range: - log.error("inetnum range [%s] does not match: [%s]" %(file_range, cidr_range)) + log.error( + "inetnum range [%s] does not match: [%s]" % (file_range, cidr_range) + ) ck = "FAIL" if dom.schema == "dn42.inet6num": cidr = dom.get("cidr") log.info(cidr) - Lnet, Hnet, mask = inet6range(cidr) + Lnet, Hnet, _ = inet6range(cidr) cidr_range = pretty_ip(Lnet) + "-" + pretty_ip(Hnet) file_range = dom.get("inet6num") file_range = re.sub(r"\s+", "", file_range, flags=re.UNICODE) if cidr_range != file_range: - log.error("inetnum range [%s] does not match: [%s]" %(file_range, cidr_range)) + log.error( + "inetnum range [%s] does not match: [%s]" % (file_range, cidr_range) + ) ck = "FAIL" return ck + def get_args(): """Get and parse command line arguments""" parser = argparse.ArgumentParser( - description='Check Schema. Checks Schema of file for validity') - parser.add_argument('--merge-output', - help="Merge stderr into stdout (helps when reading output with pagers) [Default OFF]", action="store_true") - parser.add_argument('-v', '--verbose', - help="Enable verbose output [Default OFF]", action="store_true") - parser.add_argument('-vv', '--doubleVerbose', - help="Enable full verbose output [Default OFF]", action="store_true") + description="Check Schema. Checks Schema of file for validity" + ) + parser.add_argument( + "--merge-output", + help="Merge stderr into stdout (helps when reading output with pagers) [Default OFF]", + action="store_true", + ) + parser.add_argument( + "-v", + "--verbose", + help="Enable verbose output [Default OFF]", + action="store_true", + ) + parser.add_argument( + "-vv", + "--doubleVerbose", + help="Enable full verbose output [Default OFF]", + action="store_true", + ) - subparsers = parser.add_subparsers(help='sub-command help', dest="command") + subparsers = parser.add_subparsers(help="sub-command help", dest="command") - parser_file = subparsers.add_parser( - 'check-file', help='Process a specific file') - parser_file.add_argument('-s', '--use-schema', nargs='?', - help="Override schema to validate [Default None]", action="store") + parser_file = subparsers.add_parser("check-file", help="Process a specific file") parser_file.add_argument( - 'infile', nargs="?", help="File to check", type=str) + "-s", + "--use-schema", + nargs="?", + help="Override schema to validate [Default None]", + action="store", + ) + parser_file.add_argument("infile", nargs="?", help="File to check", type=str) - parser_schema = subparsers.add_parser( - 'check-schemas', help='Validate all schemas') - parser_schema.add_argument( - 'path', nargs="?", help="Path for schemas", type=str) + parser_schema = subparsers.add_parser("check-schemas", help="Validate all schemas") + parser_schema.add_argument("path", nargs="?", help="Path for schemas", type=str) - parser_index = subparsers.add_parser('index', help='Generate index') - parser_index.add_argument( - 'path', nargs="?", help="Path for dn42 data", type=str) + parser_index = subparsers.add_parser("index", help="Generate index") + parser_index.add_argument("path", nargs="?", help="Path for dn42 data", type=str) parser_scanindex = subparsers.add_parser( - 'scan-index', help='Validate files in index') + "scan-index", help="Validate files in index" + ) parser_scanindex.add_argument( - 'infile', nargs="?", help="Index file to scan", type=str) - parser_scanindex.add_argument('-m', '--use-mntner', nargs='?', - help="Only scan files that has MNT [Default None]", action="store") + "infile", nargs="?", help="Index file to scan", type=str + ) + parser_scanindex.add_argument( + "-m", + "--use-mntner", + nargs="?", + help="Only scan files that has MNT [Default None]", + action="store", + ) - parser_scan = subparsers.add_parser('scan', help='Validate files in index') + parser_scan = subparsers.add_parser("scan", help="Validate files in index") + parser_scan.add_argument("path", nargs="?", help="Path for dn42 data", type=str) parser_scan.add_argument( - 'path', nargs="?", help="Path for dn42 data", type=str) - parser_scan.add_argument('-m', '--use-mntner', nargs='?', - help="Only scan files that has a matching MNT [Default None]", action="store") - parser_scan.add_argument('-f', '--use-file', nargs='?', - help="Only scan file given [Default None]", action="store") + "-m", + "--use-mntner", + nargs="?", + help="Only scan files that has a matching MNT [Default None]", + action="store", + ) + parser_scan.add_argument( + "-f", + "--use-file", + nargs="?", + help="Only scan file given [Default None]", + action="store", + ) - parser_fmt = subparsers.add_parser('fmt', help='Format file') + parser_fmt = subparsers.add_parser("fmt", help="Format file") parser_fmt.add_argument( - 'infile', nargs="?", help="Path for dn42 data file", type=str) - parser_fmt.add_argument('-i', '--in-place', - help="Format file in place", action="store_true") + "infile", nargs="?", help="Path for dn42 data file", type=str + ) + parser_fmt.add_argument( + "-i", "--in-place", help="Format file in place", action="store_true" + ) - parser_sane = subparsers.add_parser('sanity-check', help='Check the file for sane-ness') + parser_sane = subparsers.add_parser( + "sanity-check", help="Check the file for sane-ness" + ) parser_sane.add_argument( - 'infile', nargs="?", help="Path for dn42 data file", type=str) + "infile", nargs="?", help="Path for dn42 data file", type=str + ) - parser_pol = subparsers.add_parser('policy', help='Format file') - parser_pol.add_argument('type', nargs="?", type=str, help="dn42 object type") - parser_pol.add_argument('name', nargs="?", type=str, help="dn42 object name") - parser_pol.add_argument('mntner', nargs="?", type=str, help="dn42 object mntner") + parser_pol = subparsers.add_parser("policy", help="Format file") + parser_pol.add_argument("type", nargs="?", type=str, help="dn42 object type") + parser_pol.add_argument("name", nargs="?", type=str, help="dn42 object name") + parser_pol.add_argument("mntner", nargs="?", type=str, help="dn42 object mntner") - parser_mroute = subparsers.add_parser('match-routes', help='Match routes to inetnums') + parser_mroute = subparsers.add_parser( + "match-routes", help="Match routes to inetnums" + ) + _ = parser_mroute return vars(parser.parse_args()) -if __name__ == '__main__': - args = get_args() - +def run(args): + "run" if args["merge_output"]: log.OUTPUT = sys.stdout @@ -884,10 +995,15 @@ if __name__ == '__main__': scan_index(args["infile"], args["use_mntner"]) elif args["command"] == "scan": - import time - log.notice("## Scan Started at %s" %(time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()))) + log.notice( + "## Scan Started at %s" + % (time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())) + ) ck = scan_files(args["path"], args["use_mntner"], args["use_file"]) - log.notice("## Scan Completed at %s" %(time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()))) + log.notice( + "## Scan Completed at %s" + % (time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())) + ) if ck == "INFO": sys.exit(2) @@ -897,7 +1013,7 @@ if __name__ == '__main__': elif args["command"] == "fmt": dom = FileDOM(args["infile"]) if args["in_place"]: - with open(args["infile"], 'w+') as f: + with open(args["infile"], "w+") as f: f.write(str(dom)) else: print(str(dom)) @@ -913,24 +1029,30 @@ if __name__ == '__main__': if args["mntner"] is None: log.fatal("Mntner should be provided") - if args["type"] in ["inetnum","inet6num","route","route6"]: - args["name"] = args["name"].replace("_","/") + if args["type"] in ["inetnum", "inet6num", "route", "route6"]: + args["name"] = args["name"].replace("_", "/") status = test_policy(args["type"], args["name"], args["mntner"]) - print("POLICY %-12s\t%-8s\t%20s\t%s" %(args["mntner"], args["type"], args["name"], status)) + print( + "POLICY %-12s\t%-8s\t%20s\t%s" + % (args["mntner"], args["type"], args["name"], status) + ) if status != "PASS": sys.exit(1) elif args["command"] == "sanity-check": dom = FileDOM(args["infile"]) ck = sanity_check(dom) - print("SANITY %-8s\t%20s\t%s" %(dom.schema.split(".")[1], args["infile"], ck)) + print("SANITY %-8s\t%20s\t%s" % (dom.schema.split(".")[1], args["infile"], ck)) if ck != "PASS": sys.exit(1) elif args["command"] == "match-routes": - lis = find(["mnt-by","cidr","route","@netlevel", "@netmin", "@netmax", "@uri"], {"@family":"ipv4"}) + lis = find( + ["mnt-by", "cidr", "route", "@netlevel", "@netmin", "@netmax", "@uri"], + {"@family": "ipv4"}, + ) def field(x, field): for i in x: @@ -951,12 +1073,12 @@ if __name__ == '__main__': def is_net(x): i = field(x, "cidr") if i is not None: - return True + return True return False def obj(x): d = {} - for k,v in x: + for k, v in x: if k in d: d[k].append(v) else: @@ -979,23 +1101,43 @@ if __name__ == '__main__': rlvl = int(o["@netlevel"][0]) if ilvl + 1 != rlvl: - print("\nNo Parent > ", o["route"][0], " ", rlvl, " ", ",".join(o["mnt-by"]), \ - "Nearest INET ", inet["cidr"][0], " ", ilvl, " ", ",".join(inet["mnt-by"])) + print( + "\nNo Parent > ", + o["route"][0], + " ", + rlvl, + " ", + ",".join(o["mnt-by"]), + "Nearest INET ", + inet["cidr"][0], + " ", + ilvl, + " ", + ",".join(inet["mnt-by"]), + ) first = True continue if inet["@netmin"][0] > o["@netmin"][0] or inet["@netmax"][0] < o["@netmax"][0]: - print("\nNo Parent > ", o["route"][0], " ", rlvl, " ", ",".join(o["mnt-by"]), \ - "Nearest INET ", inet["cidr"][0], " ", ilvl, " ", ",".join(inet["mnt-by"])) + print( + "\nNo Parent > ", + o["route"][0], + " ", + rlvl, + " ", + ",".join(o["mnt-by"]), + "Nearest INET ", + inet["cidr"][0], + " ", + ilvl, + " ", + ",".join(inet["mnt-by"]), + ) first = True continue - continue - if first: - first = False - print(inet["cidr"]," ", ilvl, ",".join(inet["mnt-by"])) - print(" > ", o["route"][0], " ", rlvl, " ", ",".join(o["mnt-by"])) - else: - print(" > ", o["route"][0], " ", rlvl, " ", ",".join(o["mnt-by"])) + +if __name__ == "__main__": + run(get_args())