Merge pull request 'cleanup work' (#75) from xuu-20200723/clean-up-dn42-schema.py into master

Reviewed-on: https://git.dn42.dev/dn42/registry/pulls/75
Reviewed-by: schema-checker <schema-checker@noreply.dn42.us>
This commit is contained in:
Simon Marsh 2020-07-24 18:45:48 +00:00
commit 3d3ac2924f

View file

@ -1,21 +1,25 @@
#!/usr/bin/env python3
"DN42 Schema Checker"
from __future__ import print_function
import re
import os
import sys
import time
import argparse
import log
import glob
import random
from pprint import pprint
import urllib.parse
import http.client
import json
import log
SCHEMA_NAMESPACE = "dn42."
class SchemaDOM:
"schema"
def __init__(self, fn):
self.name = None
self.ref = None
@ -27,30 +31,30 @@ class SchemaDOM:
def __parse_schema(self, f):
schema = {}
for k, v, l in f.dom:
if k == 'ref':
self.ref = v
elif k == 'schema':
self.name = v
for key, val, _ in f.dom:
if key == "ref":
self.ref = val
elif key == "schema":
self.name = val
if k != 'key':
if key != "key":
continue
v = v.split()
key = v.pop(0)
val = val.split()
key = val.pop(0)
schema[key] = set()
for i in v:
for i in val:
if i == ">":
break
schema[key].add(i)
for k, v in schema.items():
if 'schema' in v:
if "schema" in v:
self.type = k
if 'primary' in v:
if "primary" in v:
self.primary = k
schema[k].add("oneline")
if "multiline" in v:
@ -66,51 +70,63 @@ class SchemaDOM:
if "deprecate" in v:
schema[k].remove("deprecate")
if 'oneline' not in v:
if "oneline" not in v:
schema[k].add("multiline")
if 'single' not in v:
if "single" not in v:
schema[k].add("multiple")
return schema
def check_file(self, f, lookups=None):
"check file"
status = "PASS"
if not f.valid:
log.error("%s Line 0: File does not parse" % (f.src))
status = "FAIL"
for k, v in self.schema.items():
if 'required' in v and k not in f.keys:
log.error(
"%s Line 0: Key [%s] not found and is required." % (f.src, k))
if "required" in v and k not in f.keys:
log.error("%s Line 0: Key [%s] not found and is required." % (f.src, k))
status = "FAIL"
elif 'recommend' in v and k not in f.keys:
elif "recommend" in v and k not in f.keys:
log.notice(
"%s Line 0: Key [%s] not found and is recommended." % (f.src, k))
"%s Line 0: Key [%s] not found and is recommended." % (f.src, k)
)
status = "NOTE"
if 'schema' in v and SCHEMA_NAMESPACE + f.dom[0][0] != self.ref:
if "schema" in v and SCHEMA_NAMESPACE + f.dom[0][0] != self.ref:
log.error(
"%s Line 1: Key [%s] not found and is required as the first line." % (f.src, k))
"%s Line 1: Key [%s] not found and is required as the first line."
% (f.src, k)
)
status = "FAIL"
if 'single' in v and k in f.keys and len(f.keys[k]) > 1:
log.warning("%s Line %d: Key [%s] first defined here and has repeated keys." % (
f.src, f.keys[k][0], k))
if "single" in v and k in f.keys and len(f.keys[k]) > 1:
log.warning(
"%s Line %d: Key [%s] first defined here and has repeated keys."
% (f.src, f.keys[k][0], k)
)
for l in f.keys[k][1:]:
log.error(
"%s Line %d: Key [%s] can only appear once." % (f.src, l, k))
"%s Line %d: Key [%s] can only appear once." % (f.src, l, k)
)
status = "FAIL"
if 'oneline' in v and k in f.multi:
if "oneline" in v and k in f.multi:
for l in f.keys[k]:
log.error(
"%s Line %d: Key [%s] can not have multiple lines." % (f.src, l, k))
"%s Line %d: Key [%s] can not have multiple lines."
% (f.src, l, k)
)
status = "FAIL"
for k, v, l in f.dom:
if k == self.primary and not f.src.endswith(v.replace("/","_").replace(" ","")):
log.error("%s Line %d: Primary [%s: %s] does not match filename." % (f.src, l, k, v))
if k == self.primary and not f.src.endswith(
v.replace("/", "_").replace(" ", "")):
log.error(
"%s Line %d: Primary [%s: %s] does not match filename."
% (f.src, l, k, v)
)
status = "FAIL"
if k.startswith("x-"):
@ -121,9 +137,11 @@ class SchemaDOM:
status = "FAIL"
continue
else:
if 'deprecate' in self.schema[k]:
if "deprecate" in self.schema[k]:
log.info(
"%s Line %d: Key [%s] was found and is deprecated." % (f.src, l, k))
"%s Line %d: Key [%s] was found and is deprecated."
% (f.src, l, k)
)
status = "INFO"
if lookups is not None:
@ -136,20 +154,22 @@ class SchemaDOM:
if (ref, val) in lookups:
found = True
if not found:
log.error("%s Line %d: Key %s references object %s in %s but does not exist." % (
f.src, l, k, val, refs))
log.error(
"%s Line %d: Key %s references object %s in %s but does not exist."
% (f.src, l, k, val, refs)
)
status = "FAIL"
if status != "FAIL":
ck = sanity_check(f)
if ck == "FAIL":
status = ck
print("CHECK\t%-54s\t%s\tMNTNERS: %s" %(f.src, status, ','.join(f.mntner)))
print("CHECK\t%-54s\t%s\tMNTNERS: %s" % (f.src, status, ",".join(f.mntner)))
return status
class FileDOM:
"file"
def __init__(self, fn):
self.valid = True
self.dom = []
@ -159,7 +179,7 @@ class FileDOM:
self.schema = None
self.src = fn
with open(fn, mode='r', encoding='utf-8') as f:
with open(fn, mode="r", encoding="utf-8") as f:
dom = []
keys = {}
multi = {}
@ -167,7 +187,7 @@ class FileDOM:
last_multi = None
for lineno, i in enumerate(f.readlines(), 1):
if re.match(r'[ \t]', i):
if re.match(r"[ \t]", i):
if len(dom) == 0:
log.error("File %s does not parse properly" % (fn))
self.valid = False
@ -187,7 +207,7 @@ class FileDOM:
if len(i) < 2:
continue
dom.append([i[0].strip(), ':'.join(i[1:]).strip(), lineno - 1])
dom.append([i[0].strip(), ":".join(i[1:]).strip(), lineno - 1])
if i[0].strip() not in keys:
keys[i[0].strip()] = []
@ -196,7 +216,7 @@ class FileDOM:
last_multi = None
if dom[-1][0] == 'mnt-by':
if dom[-1][0] == "mnt-by":
mntner.append(dom[-1][1])
self.dom = dom
@ -221,6 +241,7 @@ class FileDOM:
return s
def get(self, key, index=0, default=None):
"get value"
if key not in self.keys:
return default
if index >= len(self.keys[key]) or index <= -len(self.keys[key]):
@ -230,7 +251,7 @@ class FileDOM:
def main(infile, schema):
"main command"
log.debug("Check File: %s" % (infile))
f = FileDOM(infile)
@ -248,7 +269,9 @@ def main(infile, schema):
s = SchemaDOM(f.schema)
return s.check_file(f)
def check_schemas(path):
"check schemas"
schemas = {}
for fn in glob.glob(path + "/*"):
s = SchemaDOM(fn)
@ -257,45 +280,56 @@ def check_schemas(path):
ok = True
c = schemas[SCHEMA_NAMESPACE + "schema"]
for s in schemas.keys():
for s in schemas:
ck = c.check_file(s)
if not ck:
ok = False
return ok
def scan_index(infile, mntner=None):
"scan index"
idx = {}
schemas = {}
with open(infile, 'r') as f:
with open(infile, "r") as f:
for line in f.readlines():
line = line.split()
idx[(line[0], line[1])] = line[2:]
if line[0] == SCHEMA_NAMESPACE + 'schema':
if line[0] == SCHEMA_NAMESPACE + "schema":
s = SchemaDOM(line[2])
log.info("read schema: %s" % (s.name))
schemas[s.ref] = s
return __scan_index(idx, schemas, mntner)
def scan_files(path, mntner=None, use_file=None):
"scan files"
arr = __index_files(path, use_file)
idx = {}
schemas = {}
for dom in arr:
line = (dom.schema, dom.src.split("/")[-1].replace("_", "/"), dom.src, ",".join(dom.mntner), dom)
line = (
dom.schema,
dom.src.split("/")[-1].replace("_", "/"),
dom.src,
",".join(dom.mntner),
dom,
)
idx[(line[0], line[1])] = line[2:]
if line[0] == SCHEMA_NAMESPACE + 'schema':
if line[0] == SCHEMA_NAMESPACE + "schema":
s = SchemaDOM(line[2])
schemas[s.ref] = s
return __scan_index(idx, schemas, mntner, use_file)
def __scan_index(idx, schemas, mntner, use_file):
def __scan_index(idx, schemas, mntner, use_file=None):
ok = True
for k, v in idx.items():
if use_file is not None and use_file != v[0]:
@ -315,7 +349,6 @@ def __scan_index(idx, schemas, mntner, use_file):
if mntner is not None and mntner not in mlist:
continue
c = v[2]
ck = s.check_file(c, idx.keys())
@ -325,7 +358,8 @@ def __scan_index(idx, schemas, mntner, use_file):
ok = ck
return ok
def __index_files(path, use_file):
def __index_files(path, use_file=None):
xlat = {
"dns/": SCHEMA_NAMESPACE + "domain",
"inetnum/": SCHEMA_NAMESPACE + "inetnum",
@ -345,9 +379,9 @@ def __index_files(path, use_file):
"schema/": SCHEMA_NAMESPACE + "schema",
}
for root, dirs, files in os.walk(path):
for root, _, files in os.walk(path):
ignore = True
for t in xlat.keys():
for t in xlat:
if root + "/" == os.path.join(path, t):
ignore = False
break
@ -364,32 +398,32 @@ def __index_files(path, use_file):
dom = FileDOM(use_file)
yield dom
def index_files(path):
"index files"
idx = __index_files(path)
for i in idx:
print("%s\t%s\t%s\t%s" % i)
def http_get(server, url, query=None, headers=None):
import urllib.parse
import http.client
import json
def http_get(server, url, query=None, headers=None):
"http get"
if headers is None:
headers = {}
if 'User-Agent' not in headers:
headers['User-Agent'] = "curl"
if 'Accept' not in headers:
headers['Accept'] = 'application/json'
if "User-Agent" not in headers:
headers["User-Agent"] = "curl"
if "Accept" not in headers:
headers["Accept"] = "application/json"
if query is None:
query = {}
http_client = http.client.HTTPSConnection(server)
full_url = url + '?' + urllib.parse.urlencode(query)
full_url = url + "?" + urllib.parse.urlencode(query)
log.debug("GET " + full_url)
http_client.request('GET', full_url, headers=headers)
http_client.request("GET", full_url, headers=headers)
req = http_client.getresponse()
log.debug("HTTP Response: %d %s" % (req.status, req.reason))
@ -406,61 +440,73 @@ def http_get(server, url, query=None, headers=None):
return req.read()
def find(fields=None, filter=None):
def find(fields=None, filters=None):
"find"
server = "registry.dn42.us"
url = "/v1/reg/reg.objects"
if fields is None:
fields = []
if filter is None:
filter = {}
query = {"fields": ",".join(fields), "filter": ",".join([k + "=" + v for k, v in filter.items()])}
if filters is None:
filters = {}
query = {
"fields": ",".join(fields),
"filter": ",".join([k + "=" + v for k, v in filters.items()]),
}
return http_get(server, url, query)
def to_num(ip):
ip = [int(i) for i in ip.split('.')]
"ip to number"
ip = [int(i) for i in ip.split(".")]
return ip[3] + ip[2] * 256 + ip[1] * 256 ** 2 + ip[0] * 256 ** 3
def to_ip(num):
return '.'.join([str(i) for i in [num >> 24, (num >> 16) & 0xFF, (num >> 8) & 0xFF, num & 0xFF]])
"number to ip"
return ".".join(
[str(i) for i in [num >> 24, (num >> 16) & 0xFF, (num >> 8) & 0xFF, num & 0xFF]]
)
def pretty_ip(addr):
"pretty ip"
if addr.startswith("00000000000000000000ffff"):
addr = addr[-8:]
addr = int(addr, 16)
return to_ip(addr)
return ":".join([addr[i:i + 4] for i in range(0, len(addr), 4)])
# addr = ["%x" % (int(addr[i:i+4],16)) for i in range(0, len(addr), 4)]
#
# last_seg = 8
# for i, seg in enumerate(addr[::-1]):
# if seg == "0": last_seg = 7 - i
# else: break
#
# return ":".join(addr[:last_seg]) + ("::" if last_seg < 8 else "")
def expand_ipv6(addr):
"expand ip6"
addr = addr.lower()
if "::" in addr:
if addr.count('::') > 1:
if addr.count("::") > 1:
return False
addr = addr.replace('::', ':' * (9 - addr.count(':')))
if addr.count(':') != 7:
addr = addr.replace("::", ":" * (9 - addr.count(":")))
if addr.count(":") != 7:
return False
return ''.join((i.zfill(4) for i in addr.split(":")))
return "".join((i.zfill(4) for i in addr.split(":")))
def ip4_to_ip6(ip):
return "::ffff:%04x:%04x" % (ip >> 16, ip & 0xffff)
"ip4 to ip6"
return "::ffff:%04x:%04x" % (ip >> 16, ip & 0xFFFF)
def inetrange(inet):
ip, mask = inet.split('/')
"inet range"
ip, mask = inet.split("/")
mask = int(mask)
ip = to_num(ip) & (0xFFFFFFFF << 32 - mask)
ip6 = ip4_to_ip6(ip)
return inet6range("%s/%d" % (ip6, mask + 96))
def inet6range(inet):
ip, mask = inet.split('/')
"inet6 range"
ip, mask = inet.split("/")
mask = int(mask)
log.debug(ip)
@ -470,20 +516,27 @@ def inet6range(inet):
return ip, ip, mask
offset = int(ip[mask // 4], 16)
return "%s%x%s" % (
ip[:mask // 4],
offset & (0xf0 >> mask % 4),
"0" * (31 - mask // 4)
), "%s%x%s" % (
ip[:mask // 4],
offset | (0xf >> mask % 4),
"f" * (31 - mask // 4)
), mask
return (
"%s%x%s"
% (ip[: mask // 4], offset & (0xF0 >> mask % 4), "0" * (31 - mask // 4)),
"%s%x%s"
% (ip[: mask // 4], offset | (0xF >> mask % 4), "f" * (31 - mask // 4)),
mask,
)
def test_policy(obj_type, name, mntner):
"test policy"
log.debug([obj_type, name, mntner])
if obj_type in ["organisation", "mntner", "person", "role", "as-set", "schema", "dns"]:
if obj_type in ["organisation",
"mntner",
"person",
"role",
"as-set",
"schema",
"dns",
]:
if obj_type == "organisation" and not name.startswith("ORG-"):
log.error("%s does not start with 'ORG-'" % (name))
return "FAIL"
@ -507,35 +560,35 @@ def test_policy(obj_type, name, mntner):
log.notice("%s does not currently exist" % (name))
return "PASS"
status = 'FAIL'
status = "FAIL"
for o in lis:
for n in o:
log.debug(n)
log.debug(mntner)
if n[0] == "mnt-by" and n[1] == mntner:
status = 'PASS'
status = "PASS"
return status
log.error("%s does not have mnt for object" % (mntner))
return status
elif args["type"] in ["inetnum","inet6num"]:
elif obj_type in ["inetnum", "inet6num"]:
log.info("Checking inetnum type")
lis = find(["mnt-by"], {"@type": "net", "cidr": name})
log.debug(lis)
if len(lis) > 0:
status = 'FAIL'
status = "FAIL"
for o in lis:
for n in o:
if n[0] == "mnt-by" and n[1] == mntner:
status = 'PASS'
status = "PASS"
log.notice("%s has mnt for current object" % (mntner))
return status
log.error("%s does not have mnt for current object" % (mntner))
return status
if args["type"] == "inetnum":
if obj_type == "inetnum":
Lnet, Hnet, mask = inetrange(name)
else:
Lnet, Hnet, mask = inet6range(name)
@ -543,8 +596,15 @@ def test_policy(obj_type, name, mntner):
mask = "%03d" % (mask)
log.info([Lnet, Hnet, mask])
lis = find(["inetnum","inet6num","policy","@netlevel","mnt-by","mnt-lower"],
{"@type": "net", "@netmin": "le=" + Lnet, "@netmax": "ge=" + Hnet, "@netmask": "lt=" + mask})
lis = find(
["inetnum", "inet6num", "policy", "@netlevel", "mnt-by", "mnt-lower"],
{
"@type": "net",
"@netmin": "le=" + Lnet,
"@netmax": "ge=" + Hnet,
"@netmask": "lt=" + mask,
},
)
log.debug(lis)
policy = {}
@ -566,7 +626,7 @@ def test_policy(obj_type, name, mntner):
elif select <= k:
select = k
if select == None:
if select is None:
pass
elif policy.get(select, {}).get("policy", "closed") == "open":
@ -578,31 +638,38 @@ def test_policy(obj_type, name, mntner):
log.notice("%s has mnt in parent object" % (mntner))
return "PASS"
elif args["type"] in ["route","route6"]:
elif obj_type in ["route", "route6"]:
log.info("Checking route type")
lis = find(["mnt-by"], {"@type": "route", args["type"]: name})
lis = find(["mnt-by"], {"@type": "route", obj_type: name})
log.debug(lis)
if len(lis) > 0:
status = 'FAIL'
status = "FAIL"
for o in lis:
for n in o:
if n[0] == "mnt-by" and n[1] == mntner:
status = 'PASS'
status = "PASS"
log.notice("%s has mnt for current object" % (mntner))
return status
log.error("%s does not have mnt for current object" % (mntner))
return status
if args["type"] == "route":
if obj_type == "route":
Lnet, Hnet, mask = inetrange(name)
else:
Lnet, Hnet, mask = inet6range(name)
mask = "%03d" % (mask)
log.info([Lnet, Hnet, mask])
lis = find(["inetnum","inet6num","policy","@netlevel","mnt-by","mnt-lower"],
{"@type": "net", "@netmin": "le=" + Lnet, "@netmax": "ge=" + Hnet, "@netmask": "le=" + mask})
lis = find(
["inetnum", "inet6num", "policy", "@netlevel", "mnt-by", "mnt-lower"],
{
"@type": "net",
"@netmin": "le=" + Lnet,
"@netmax": "ge=" + Hnet,
"@netmask": "le=" + mask,
},
)
log.debug(lis)
policy = {}
@ -624,7 +691,7 @@ def test_policy(obj_type, name, mntner):
elif select <= k:
select = k
if select == None:
if select is None:
pass
elif policy.get(select, {}).get("policy", "closed") == "open":
@ -636,7 +703,7 @@ def test_policy(obj_type, name, mntner):
log.notice("%s has mnt in parent object" % (mntner))
return "PASS"
elif args["type"] == "aut-num":
elif obj_type == "aut-num":
if not name.startswith("AS"):
log.error("%s does not start with AS" % (name))
return "FAIL"
@ -646,11 +713,11 @@ def test_policy(obj_type, name, mntner):
log.debug(lis)
if len(lis) > 0:
status = 'FAIL'
status = "FAIL"
for o in lis:
for n in o:
if n[0] == "mnt-by" and n[1] == mntner:
status = 'PASS'
status = "PASS"
log.notice("%s has mnt for current object" % (mntner))
return status
log.error("%s does not have mnt for current object" % (mntner))
@ -658,8 +725,10 @@ def test_policy(obj_type, name, mntner):
# 2. Check if the as-block has an open policy
asn = "AS{:0>9}".format(name[2:])
lis = find(["as-block","policy","@as-min","@as-max","mnt-by","mnt-lower"],
{"@type": "as-block","@as-min":"le=" + asn,"@as-max": "ge=" + asn})
lis = find(
["as-block", "policy", "@as-min", "@as-max", "mnt-by", "mnt-lower"],
{"@type": "as-block", "@as-min": "le=" + asn, "@as-max": "ge=" + asn},
)
log.info(lis)
policy = {}
@ -690,7 +759,7 @@ def test_policy(obj_type, name, mntner):
log.notice("%s has mnt in parent object" % (mntner))
return "PASS"
elif args["type"] == "as-block":
elif obj_type == "as-block":
Lname, Hname = name.split("-")
Lname, Hname = Lname.strip(), Hname.strip()
@ -703,11 +772,11 @@ def test_policy(obj_type, name, mntner):
log.debug(lis)
if len(lis) > 0:
status = 'FAIL'
status = "FAIL"
for o in lis:
for n in o:
if n[0] == "mnt-by" and n[1] == mntner:
status = 'PASS'
status = "PASS"
log.notice("%s has mnt for current object" % (mntner))
return status
log.notice("%s does not have mnt for current object" % (mntner))
@ -720,7 +789,10 @@ def test_policy(obj_type, name, mntner):
if Lasn > Hasn:
log.error("%s should come before %s" % (Lname, Hname))
lis = find(["as-block","policy","@as-min","@as-max","mnt-by","mnt-lower"], {"@type": "as-block","@as-min":"le=" + Lasn,"@as-max": "ge=" + Hasn})
lis = find(
["as-block", "policy", "@as-min", "@as-max", "mnt-by", "mnt-lower"],
{"@type": "as-block", "@as-min": "le=" + Lasn, "@as-max": "ge=" + Hasn},
)
log.debug(lis)
policy = {}
@ -752,108 +824,147 @@ def test_policy(obj_type, name, mntner):
if mntner in mntners:
log.notice("%s has mnt in parent object" % (mntner))
return "PASS"
pass
log.error("%s does not pass checks for %s %s" % (mntner, obj_type, name))
return "FAIL"
def sanity_check(dom):
# log.info(dom.keys)
# log.info(dom.dom)
"sanity check"
ck = "PASS"
if dom.schema == "dn42.inetnum":
cidr = dom.get("cidr")
Lnet, Hnet, mask = inetrange(cidr)
Lnet, Hnet, _ = inetrange(cidr)
cidr_range = pretty_ip(Lnet) + "-" + pretty_ip(Hnet)
file_range = dom.get("inetnum")
file_range = re.sub(r"\s+", "", file_range, flags=re.UNICODE)
if cidr_range != file_range:
log.error("inetnum range [%s] does not match: [%s]" %(file_range, cidr_range))
log.error(
"inetnum range [%s] does not match: [%s]" % (file_range, cidr_range)
)
ck = "FAIL"
if dom.schema == "dn42.inet6num":
cidr = dom.get("cidr")
log.info(cidr)
Lnet, Hnet, mask = inet6range(cidr)
Lnet, Hnet, _ = inet6range(cidr)
cidr_range = pretty_ip(Lnet) + "-" + pretty_ip(Hnet)
file_range = dom.get("inet6num")
file_range = re.sub(r"\s+", "", file_range, flags=re.UNICODE)
if cidr_range != file_range:
log.error("inetnum range [%s] does not match: [%s]" %(file_range, cidr_range))
log.error(
"inetnum range [%s] does not match: [%s]" % (file_range, cidr_range)
)
ck = "FAIL"
return ck
def get_args():
"""Get and parse command line arguments"""
parser = argparse.ArgumentParser(
description='Check Schema. Checks Schema of file for validity')
parser.add_argument('--merge-output',
help="Merge stderr into stdout (helps when reading output with pagers) [Default OFF]", action="store_true")
parser.add_argument('-v', '--verbose',
help="Enable verbose output [Default OFF]", action="store_true")
parser.add_argument('-vv', '--doubleVerbose',
help="Enable full verbose output [Default OFF]", action="store_true")
description="Check Schema. Checks Schema of file for validity"
)
parser.add_argument(
"--merge-output",
help="Merge stderr into stdout (helps when reading output with pagers) [Default OFF]",
action="store_true",
)
parser.add_argument(
"-v",
"--verbose",
help="Enable verbose output [Default OFF]",
action="store_true",
)
parser.add_argument(
"-vv",
"--doubleVerbose",
help="Enable full verbose output [Default OFF]",
action="store_true",
)
subparsers = parser.add_subparsers(help='sub-command help', dest="command")
subparsers = parser.add_subparsers(help="sub-command help", dest="command")
parser_file = subparsers.add_parser(
'check-file', help='Process a specific file')
parser_file.add_argument('-s', '--use-schema', nargs='?',
help="Override schema to validate [Default None]", action="store")
parser_file = subparsers.add_parser("check-file", help="Process a specific file")
parser_file.add_argument(
'infile', nargs="?", help="File to check", type=str)
"-s",
"--use-schema",
nargs="?",
help="Override schema to validate [Default None]",
action="store",
)
parser_file.add_argument("infile", nargs="?", help="File to check", type=str)
parser_schema = subparsers.add_parser(
'check-schemas', help='Validate all schemas')
parser_schema.add_argument(
'path', nargs="?", help="Path for schemas", type=str)
parser_schema = subparsers.add_parser("check-schemas", help="Validate all schemas")
parser_schema.add_argument("path", nargs="?", help="Path for schemas", type=str)
parser_index = subparsers.add_parser('index', help='Generate index')
parser_index.add_argument(
'path', nargs="?", help="Path for dn42 data", type=str)
parser_index = subparsers.add_parser("index", help="Generate index")
parser_index.add_argument("path", nargs="?", help="Path for dn42 data", type=str)
parser_scanindex = subparsers.add_parser(
'scan-index', help='Validate files in index')
"scan-index", help="Validate files in index"
)
parser_scanindex.add_argument(
'infile', nargs="?", help="Index file to scan", type=str)
parser_scanindex.add_argument('-m', '--use-mntner', nargs='?',
help="Only scan files that has MNT [Default None]", action="store")
"infile", nargs="?", help="Index file to scan", type=str
)
parser_scanindex.add_argument(
"-m",
"--use-mntner",
nargs="?",
help="Only scan files that has MNT [Default None]",
action="store",
)
parser_scan = subparsers.add_parser('scan', help='Validate files in index')
parser_scan = subparsers.add_parser("scan", help="Validate files in index")
parser_scan.add_argument("path", nargs="?", help="Path for dn42 data", type=str)
parser_scan.add_argument(
'path', nargs="?", help="Path for dn42 data", type=str)
parser_scan.add_argument('-m', '--use-mntner', nargs='?',
help="Only scan files that has a matching MNT [Default None]", action="store")
parser_scan.add_argument('-f', '--use-file', nargs='?',
help="Only scan file given [Default None]", action="store")
"-m",
"--use-mntner",
nargs="?",
help="Only scan files that has a matching MNT [Default None]",
action="store",
)
parser_scan.add_argument(
"-f",
"--use-file",
nargs="?",
help="Only scan file given [Default None]",
action="store",
)
parser_fmt = subparsers.add_parser('fmt', help='Format file')
parser_fmt = subparsers.add_parser("fmt", help="Format file")
parser_fmt.add_argument(
'infile', nargs="?", help="Path for dn42 data file", type=str)
parser_fmt.add_argument('-i', '--in-place',
help="Format file in place", action="store_true")
"infile", nargs="?", help="Path for dn42 data file", type=str
)
parser_fmt.add_argument(
"-i", "--in-place", help="Format file in place", action="store_true"
)
parser_sane = subparsers.add_parser('sanity-check', help='Check the file for sane-ness')
parser_sane = subparsers.add_parser(
"sanity-check", help="Check the file for sane-ness"
)
parser_sane.add_argument(
'infile', nargs="?", help="Path for dn42 data file", type=str)
"infile", nargs="?", help="Path for dn42 data file", type=str
)
parser_pol = subparsers.add_parser('policy', help='Format file')
parser_pol.add_argument('type', nargs="?", type=str, help="dn42 object type")
parser_pol.add_argument('name', nargs="?", type=str, help="dn42 object name")
parser_pol.add_argument('mntner', nargs="?", type=str, help="dn42 object mntner")
parser_pol = subparsers.add_parser("policy", help="Format file")
parser_pol.add_argument("type", nargs="?", type=str, help="dn42 object type")
parser_pol.add_argument("name", nargs="?", type=str, help="dn42 object name")
parser_pol.add_argument("mntner", nargs="?", type=str, help="dn42 object mntner")
parser_mroute = subparsers.add_parser('match-routes', help='Match routes to inetnums')
parser_mroute = subparsers.add_parser(
"match-routes", help="Match routes to inetnums"
)
_ = parser_mroute
return vars(parser.parse_args())
if __name__ == '__main__':
args = get_args()
def run(args):
"run"
if args["merge_output"]:
log.OUTPUT = sys.stdout
@ -884,10 +995,15 @@ if __name__ == '__main__':
scan_index(args["infile"], args["use_mntner"])
elif args["command"] == "scan":
import time
log.notice("## Scan Started at %s" %(time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())))
log.notice(
"## Scan Started at %s"
% (time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()))
)
ck = scan_files(args["path"], args["use_mntner"], args["use_file"])
log.notice("## Scan Completed at %s" %(time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())))
log.notice(
"## Scan Completed at %s"
% (time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()))
)
if ck == "INFO":
sys.exit(2)
@ -897,7 +1013,7 @@ if __name__ == '__main__':
elif args["command"] == "fmt":
dom = FileDOM(args["infile"])
if args["in_place"]:
with open(args["infile"], 'w+') as f:
with open(args["infile"], "w+") as f:
f.write(str(dom))
else:
print(str(dom))
@ -918,7 +1034,10 @@ if __name__ == '__main__':
status = test_policy(args["type"], args["name"], args["mntner"])
print("POLICY %-12s\t%-8s\t%20s\t%s" %(args["mntner"], args["type"], args["name"], status))
print(
"POLICY %-12s\t%-8s\t%20s\t%s"
% (args["mntner"], args["type"], args["name"], status)
)
if status != "PASS":
sys.exit(1)
@ -930,7 +1049,10 @@ if __name__ == '__main__':
sys.exit(1)
elif args["command"] == "match-routes":
lis = find(["mnt-by","cidr","route","@netlevel", "@netmin", "@netmax", "@uri"], {"@family":"ipv4"})
lis = find(
["mnt-by", "cidr", "route", "@netlevel", "@netmin", "@netmax", "@uri"],
{"@family": "ipv4"},
)
def field(x, field):
for i in x:
@ -979,23 +1101,43 @@ if __name__ == '__main__':
rlvl = int(o["@netlevel"][0])
if ilvl + 1 != rlvl:
print("\nNo Parent > ", o["route"][0], " ", rlvl, " ", ",".join(o["mnt-by"]), \
"Nearest INET ", inet["cidr"][0], " ", ilvl, " ", ",".join(inet["mnt-by"]))
print(
"\nNo Parent > ",
o["route"][0],
" ",
rlvl,
" ",
",".join(o["mnt-by"]),
"Nearest INET ",
inet["cidr"][0],
" ",
ilvl,
" ",
",".join(inet["mnt-by"]),
)
first = True
continue
if inet["@netmin"][0] > o["@netmin"][0] or inet["@netmax"][0] < o["@netmax"][0]:
print("\nNo Parent > ", o["route"][0], " ", rlvl, " ", ",".join(o["mnt-by"]), \
"Nearest INET ", inet["cidr"][0], " ", ilvl, " ", ",".join(inet["mnt-by"]))
print(
"\nNo Parent > ",
o["route"][0],
" ",
rlvl,
" ",
",".join(o["mnt-by"]),
"Nearest INET ",
inet["cidr"][0],
" ",
ilvl,
" ",
",".join(inet["mnt-by"]),
)
first = True
continue
continue
if first:
first = False
print(inet["cidr"]," ", ilvl, ",".join(inet["mnt-by"]))
print(" > ", o["route"][0], " ", rlvl, " ", ",".join(o["mnt-by"]))
else:
print(" > ", o["route"][0], " ", rlvl, " ", ",".join(o["mnt-by"]))
if __name__ == "__main__":
run(get_args())