add scan tools and index builder

This commit is contained in:
Jon Lundy 2020-06-11 21:29:01 -06:00
parent a589d521da
commit fc92089916
No known key found for this signature in database
GPG key ID: C63E6D61F3035024
9 changed files with 453 additions and 64 deletions

View file

@ -3,6 +3,6 @@ remarks: test
contact: xmpp:xuu@xmpp.dn42
contact: mail:xuu@dn42.us
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
nic-hdl: XUU-DN42
nic-hdl: XU0-DN42
mnt-by: XUU-MNT
source: DN42

202
utils/registry/build-index.py Executable file
View file

@ -0,0 +1,202 @@
#!/usr/bin/env python3
"""Builds registry index to be used by scan-index.py"""
import os
import sys
from ipaddress import ip_network, IPv6Network
from dataclasses import dataclass
from typing import TypeVar, Dict, Generator, List, Tuple
from dom.filedom import FileDOM, read_file
from dom.schema import SchemaDOM
def index_files(path: str) -> Generator[FileDOM, None, None]:
"""generate list of dom files"""
for root, _, files in os.walk(path):
if root == path:
continue
for f in files:
if f[0] == ".":
continue
dom = read_file(os.path.join(root, f))
yield dom
NET = IPv6Network
NET_LIST = TypeVar('NET_LIST', int, List[NET])
NET_TREE = Dict[NET, NET_LIST]
V6_NET = ip_network("::/0")
V4_NET = ip_network("::0.0.0.0/96")
@dataclass
class NetRecord:
"Network Record"
network: NET
mnters: List[str]
policy: str
status: str
@property
def object_type(self) -> str:
"""object type"""
return "inetnum" if V4_NET.network.supernet_of(self.network) \
else "inet6num"
@property
def object_name(self) -> str:
"""object name"""
return self.network.with_prefixlen.replace("/", "_")
def in_net(i: NET, nets: List[NET]) -> Tuple[bool, NET]:
"find a network within a list of networks"
found = False
net = None
for n in nets:
if n.supernet_of(i):
found = True
net = n
break
return found, net
def find_tree(ip: NET, nets: NET_TREE):
"""Find net in tree"""
net = V6_NET
current = nets[net]
while True:
found, net = in_net(ip, current[1])
if not found:
return True, current[0] + 1
if ip.network == net.network:
return True, current[0] + 2
current = nets[net]
continue
def make_tree(nets: List[NET]) -> Dict[NET, NET_LIST]:
"""build a network tree index"""
root = V6_NET
tree = {root: [-1, []]}
for i in sorted(
sorted(nets, key=lambda x: x.exploded),
key=lambda x: x.prefixlen):
current = tree[root]
while True:
found, n = in_net(i, current[1])
if found:
current = tree[n]
continue
if current[0] >= 0:
current[1].append(i)
tree[i] = [current[0] + 1, []]
break
return tree
def run(path: str = "."):
"""run main script"""
if not os.path.isdir(os.path.join(path, "schema")):
print("schema directory not found in path", file=sys.stderr)
sys.exit(1)
idx = index_files(path)
lookup = {} # type: Dict[str, FileDOM]
schemas = {} # type: Dict[str, SchemaDOM]
files = []
nets = [] # type: List[NetRecord]
print(r"Reading Files...", end="\r", flush=True, file=sys.stderr)
for (i, dom) in enumerate(idx):
if not dom.valid:
print("E", end="", flush=True)
continue
key, value = dom.index
lookup[key] = value
files.append(dom)
if dom.schema == "schema":
schema = SchemaDOM()
schema.parse(dom)
schemas[schema.ref] = schema
if dom.schema in ["inetnum", "inet6num"]:
nets.append(NetRecord(
dom.get("cidr").as_net6,
dom.mntner,
dom.get("policy", default="closed"),
dom.get("status", default="ASSIGNED"),
))
if i % 120 == 0:
print(
f"Reading Files: files: {len(files)} schemas: {len(schemas)}",
end="\r", flush=True, file=sys.stderr)
print(
f"Reading Files: done! files: {len(files)}, schemas: {len(schemas)}",
file=sys.stderr)
print("Writing .index", file=sys.stderr)
print("Writing .linkindex", file=sys.stderr)
with open(".index", 'w') as out:
with open(".links", 'w') as link_out:
for dom in files:
s = schemas.get(dom.rel)
if s is None:
print(
f"{dom.src} schema not found for {dom.rel}",
file=sys.stderr)
print(dom.rel,
dom.get(s.primary),
dom.src,
",".join(dom.mntner),
sep="\t",
file=out)
for (link, refs) in s.links.items():
d = dom.get(link)
if d is not None:
print(
f"{dom.name}\t{link}\t{d}\t{','.join(refs)}",
file=link_out)
print("Generate .netindex", file=sys.stderr)
tree = make_tree({n.network for n in nets})
netindex = []
for net in nets:
v = tree[net.network]
netindex.append((v[0],
net.network.network_address.exploded,
net.network.broadcast_address.exploded,
net.policy, net.status, ",".join(net.mnters)))
print("Writing .netindex", file=sys.stderr)
with open(".netindex", "w") as out:
for row in sorted(netindex, key=lambda x: x[0]):
print("\t".join([str(i) for i in row]), file=out)
print("done.", file=sys.stderr)
if __name__ == "__main__":
run(sys.argv[1] if len(sys.argv) > 1 else os.getcwd())

View file

@ -2,13 +2,13 @@
import re
from dataclasses import dataclass
from typing import Sequence, NamedTuple, List, Dict, Optional, Union
import ipaddress
from typing import Sequence, NamedTuple, List, Dict, Optional, Tuple, Union
from ipaddress import ip_network, IPv4Network, IPv6Network
import log
@dataclass
@dataclass(frozen=True)
class Value:
"""Dom Value"""
value: str
@ -19,22 +19,34 @@ class Value:
def __str__(self) -> str:
return self.value
@property
def lines(self) -> List[str]:
"""return value split into lines"""
return self.value.splitlines()
@property
def fields(self) -> List[str]:
"""return value split into fields"""
return self.value.split()
def as_ip(self) -> Union[ipaddress.IPv4Address, ipaddress.IPv6Address]:
"""return value as an ip address"""
return ipaddress.ip_address(self.value)
def as_net(self) -> Union[ipaddress.IPv4Network, ipaddress.IPv6Network]:
@property
def as_net(self) -> Union[IPv4Network, IPv6Network]:
"""return value as an ip network"""
return ipaddress.ip_network(self.value)
return ip_network(self.value)
@property
def as_net6(self) -> IPv6Network:
"""return value as an ip network"""
net = ip_network(self.value)
if isinstance(net, IPv6Network):
return net
n = net
return ip_network(
f"::FFFF:{n.network_address}/{n.prefixlen + 96}")
@property
def as_key(self) -> str:
"""Format as key name"""
return self.value.replace("/", "_").replace(" ", "")
@ -47,6 +59,7 @@ class Row(NamedTuple):
lineno: int
src: str = None
@property
def loc(self) -> str:
"""format as location"""
s = f"{self.src} Line {self.lineno} "
@ -57,14 +70,14 @@ class Row(NamedTuple):
class FileDOM:
"""Parses a reg file"""
def __init__(self, src: Optional[str] = None):
def __init__(self, src: Optional[str] = None, ns: Optional[str] = "dn42"):
self.valid = False
self.dom = [] # type: List[Row]
self.keys = {} # type: Dict[str, int]
self.multi = {} # type: Dict[str, int]
self.mntner = [] # type: List[str]
self.schema = None # type: Optional[str]
self.src = src
self.ns = ns
def parse(self, input_str: Sequence[str], src: Optional[str] = None):
"""Parse an input string generator"""
@ -73,7 +86,7 @@ class FileDOM:
multi = {}
mntner = []
last_multi = None
self.valid = True
self.valid = False
self.src = self.src if src is None else src
for lineno, i in enumerate(input_str, 1):
@ -81,7 +94,6 @@ class FileDOM:
if re.match(r'[ \t]', i):
if len(dom) == 0:
log.error(f"File {src} does not parse properly")
self.valid = False
return
dom[-1][1] += "\n" + i.strip()
@ -121,11 +133,43 @@ class FileDOM:
if dom[-1][0] == 'mnt-by':
mntner.append(dom[-1][1])
self.dom = [Row(k, Value(v), n) for k, v, n in dom]
self.dom = [Row(k, Value(v), n, self.src) for k, v, n in dom]
self.keys = keys
self.multi = multi
self.mntner = mntner
self.schema = self.dom[0].key
self.valid = True
@property
def schema(self) -> str:
"""return the schema name for file"""
if len(self.dom) < 0:
return "none"
return self.dom[0].key
@property
def name(self) -> str:
"""return the friendly name for file"""
if len(self.dom) < 1:
return "none"
fields = self.dom[0].value.fields
if len(fields) < 1:
return "none"
return fields[0]
@property
def rel(self) -> str:
"generate rel for schema ref"
return f"{self.ns}.{self.schema}"
@property
def index(self) -> Tuple[Tuple[str, str], Tuple[str, str]]:
"""generate index key/value pair"""
name = self.src.split("/")[-1].replace("_", "/")
return ((f"{self.ns}.{self.schema}", name),
(self.src, ",".join(self.mntner)))
def __str__(self):
length = 19
@ -134,7 +178,7 @@ class FileDOM:
length = len(i.key) + 2
s = ""
for i in self.dom:
sp = i.value.lines()
sp = i.value.lines
s += i.key + ":" + " " * (length - len(i.key)) + sp[0] + "\n"
for m in sp[1:]:
@ -173,4 +217,7 @@ class FileDOM:
def read_file(fn: str) -> FileDOM:
"""Parses FileDOM from file"""
with open(fn, mode='r', encoding='utf-8') as f:
return FileDOM().parse(f.readlines())
dom = FileDOM(src=fn)
dom.parse(f.readlines())
return dom

View file

@ -1,22 +1,19 @@
"""Schema DOM"""
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, List, Tuple
from enum import Enum, auto
from typing import Optional, List, Tuple, Dict, Set
import log
from .filedom import FileDOM, Row
SCHEMA_NAMESPACE = "dn42."
class Level(Enum):
"""State error level"""
info = 1
warning = 2
error = 3
info = auto()
warning = auto()
error = auto()
@dataclass
@ -39,11 +36,11 @@ class State:
"""print out state info"""
for (level, row, msg) in self.msgs:
if level == Level.info:
log.info(f"{row.loc()} {msg}")
log.info(f"{row.loc} {msg}")
elif level == Level.warning:
log.warning(f"{row.loc()} {msg}")
log.warning(f"{row.loc} {msg}")
elif level == Level.error:
log.error(f"{row.loc()} {msg}")
log.error(f"{row.loc} {msg}")
def info(self, r: Row, s: str):
"""Add warning"""
@ -68,11 +65,16 @@ class SchemaDOM:
self.primary = None
self.type = None
self.src = src
self.schema = {}
self._schema = {} # type: Dict[str, Set[str]]
self._spec = {} # type: Dict[str, str]
self._links = {} # type: Dict[str, List[str]]
@property
def links(self) -> Dict[str, List[str]]:
return self._links
def parse(self, f: FileDOM):
"""Parse a FileDOM into a SchemaDOM"""
self.src = self.src if f.src is None else f.src
schema = {}
@ -85,20 +87,20 @@ class SchemaDOM:
if row.key != 'key':
continue
lines = row.value.fields()
lines = row.value.fields
key = lines.pop(0)
schema[key] = set()
for i in lines:
if i == ">":
break
schema[key].add(i)
if i.startswith("lookup="):
self._links[key] = i.split("=", 2)[1].split(",")
schema = self._process_schema(schema)
self.valid = True
self.schema = schema
self._schema = schema
return schema
def _process_schema(self, schema):
@ -140,20 +142,20 @@ class SchemaDOM:
state = self._check_file_values(state, f, lookups)
state = inetnum_check(state, f)
print("CHECK\t%-54s\t%s\tMNTNERS: %s" %
(f.src, state, ','.join(f.mntner)))
print("CHECK\t%-10s\t%-44s\t%s\tMNTNERS: %s" %
(f.schema, f.src.split("/")[-1], state, ','.join(f.mntner)))
return state
def _check_file_structure(self, state: State, f: FileDOM) -> State:
for k, v in self.schema.items():
for k, v in self._schema.items():
row = Row(k, "", 0, f.src)
if 'required' in v and k not in f.keys:
state.error(row, "not found and is required")
elif 'recommend' in v and k not in f.keys:
state.info(row, "not found and is recommended")
if 'schema' in v and SCHEMA_NAMESPACE + f.dom[0].key != self.ref:
if 'schema' in v and f"{f.ns}.{f.dom[0].key}" != self.ref:
state.error(row, "not found and is required as the first line")
if 'single' in v and k in f.keys and len(f.keys[k]) > 1:
@ -173,7 +175,7 @@ class SchemaDOM:
lookups: Optional[List[Tuple[str, str]]] = None
) -> State:
for row in f.dom:
c = row.value.as_key()
c = row.value.as_key
src = "None" if f.src is None else f.src
if row.key == self.primary and not src.endswith(c):
@ -183,16 +185,17 @@ class SchemaDOM:
if row.key.startswith("x-"):
state.info(row, "is user defined")
continue
elif row.key not in self.schema:
if row.key not in self._schema:
state.error(row, "not in schema")
continue
else:
if 'deprecate' in self.schema[row.key]:
state.info(row, "was found and is deprecated")
if lookups is not None:
state = self._check_file_lookups(state, row, lookups)
if 'deprecate' in self._schema[row.key]:
state.info(row, "was found and is deprecated")
if lookups is not None:
state = self._check_file_lookups(state, row, lookups)
return state
@ -201,18 +204,19 @@ class SchemaDOM:
row: Row,
lookups: List[Tuple[str, str]] = None
) -> State:
for o in self.schema[row.key]:
if o.startswith("lookup="):
refs = o.split("=", 2)[1].split(",")
val = row.value.fields()[0]
found = False
for ref in refs:
if (ref, val) in lookups:
found = True
if not found:
state.error(row,
f"references object {val} " +
f"in {refs} but does not exist.")
if row.key not in self._links:
return state
refs = self._links[row.key]
val = row.value.fields[0]
found = False
for ref in refs:
if (ref, val) in lookups:
found = True
if not found:
state.error(row,
f"{row.key} references object {val} " +
f"in {refs} but does not exist.")
return state
@ -228,7 +232,7 @@ def read_file(src: str) -> SchemaDOM:
def inetnum_check(state: State, dom: FileDOM) -> State:
"""Sanity Check for checking the inet[6]num value"""
if dom.schema == "inetnum" or dom.schema == "inet6num":
cidr = dom.get("cidr").as_net()
cidr = dom.get("cidr").as_net
Lnet = cidr.network_address.exploded
Hnet = cidr.broadcast_address.exploded

View file

@ -82,7 +82,7 @@ class TestFileDOM(unittest.TestCase):
dom = FileDOM()
dom.parse(s.splitlines())
cidr = dom.get("cidr").as_net()
cidr = dom.get("cidr").as_net
self.assertEqual(cidr.compressed, "fd00::/8")
self.assertEqual(
cidr.exploded, "fd00:0000:0000:0000:0000:0000:0000:0000/8")
@ -105,7 +105,7 @@ class TestFileDOM(unittest.TestCase):
dom = FileDOM()
dom.parse(s.splitlines())
cidr = dom.get("cidr").as_net()
cidr = dom.get("cidr").as_net
self.assertEqual(cidr.compressed, "172.20.0.0/14")
self.assertEqual(
cidr.exploded, "172.20.0.0/14")

View file

@ -2,6 +2,7 @@
import inspect
import unittest
from pprint import pprint
from .schema import SchemaDOM
from .filedom import FileDOM
@ -274,11 +275,11 @@ class TestSchema(unittest.TestCase):
files.append(dom)
name = dom.src.split("/")[-1].replace("_", "/")
idx[(f"dn42.{dom.schema}", name)] = (dom.src, ",".join(dom.mntner))
key, value = dom.index
idx[key] = value
for dom in files:
s = schemas["dn42." + dom.schema]
s = schemas[f"{dom.ns}.{dom.schema}"]
state = s.check_file(dom, idx)
self.assertTrue(state)

View file

64
utils/registry/scan-index.py Executable file
View file

@ -0,0 +1,64 @@
#!/usr/bin/env python3
"""Scans Registry at given path for issues using an pregenerated index"""
import os
import sys
from typing import Dict
from dom.filedom import FileDOM, read_file
from dom.schema import SchemaDOM
def index_files(path: str):
"""generate list of dom files"""
for root, _, files in os.walk(path):
if root == path:
continue
for f in files:
if f[0] == ".":
continue
dom = read_file(os.path.join(root, f))
yield dom
def run(path: str = ".", index: str = ".index"):
"""run main script"""
lookups = {} # type: Dict[str, FileDOM]
schemas = {} # type: Dict[str, SchemaDOM]
schema_set = set()
with open(index) as fd:
for line in fd.readlines():
sp = line.split()
lookups[(sp[0], sp[1])] = (sp[2], sp[3])
if sp[0] == "dn42.schema":
schema_set.add(sp[2])
for s in schema_set:
dom = read_file(s)
schema = SchemaDOM()
schema.parse(dom)
schemas[schema.ref] = schema
files = index_files(path)
for dom in files:
key, value = dom.index
lookups[key] = value
for dom in files:
s = schemas.get(dom.rel)
if s is None:
print(f"{dom.src} schema not found for {dom.rel}")
status = s.check_file(dom, lookups=lookups)
status.print()
print(status)
if __name__ == "__main__":
run(sys.argv[1] if len(sys.argv) >= 2 else os.getcwd())

71
utils/registry/scan-registry.py Executable file
View file

@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""Scans Registry at given path for issues"""
import os
import sys
from typing import Dict
from dom.filedom import FileDOM, read_file
from dom.schema import SchemaDOM
def index_files(path: str):
"""generate list of dom files"""
for root, _, files in os.walk(path):
if root == path:
continue
for f in files:
if f[0] == ".":
continue
dom = read_file(os.path.join(root, f))
yield dom
def run(path: str = "."):
"""run main script"""
idx = index_files(path)
lookups = {} # type: Dict[str, FileDOM]
schemas = {} # type: Dict[str, SchemaDOM]
files = []
print(r"Reading Files...", end="\r", flush=True, file=sys.stderr)
for (i, dom) in enumerate(idx):
if not dom.valid:
print("E", end="", flush=True)
continue
key, value = dom.index
lookups[key] = value
files.append(dom)
if dom.schema == "schema":
schema = SchemaDOM()
schema.parse(dom)
schemas[schema.ref] = schema
if i % 120 == 0:
print(
f"Reading Files: files: {len(files)} schemas: {len(schemas)}",
end="\r", flush=True, file=sys.stderr)
print(
f"Reading Files: done! files: {len(files)}, schemas: {len(schemas)}",
file=sys.stderr)
for dom in files:
s = schemas.get(dom.rel)
if s is None:
print(f"{dom.src} schema not found for {dom.rel}")
status = s.check_file(dom, lookups)
status.print()
if __name__ == "__main__":
run(sys.argv[1] if len(sys.argv) > 1 else os.getcwd())