From 84b133f124ccd430253d4ffe065c49cc327de11f Mon Sep 17 00:00:00 2001 From: Jonathan Lundy Date: Mon, 13 Jul 2020 13:47:07 -0600 Subject: [PATCH] update rpsl. add whois --- utils/registry/dn42/rpsl/__init__.py | 8 +- utils/registry/dn42/rpsl/config.py | 7 +- .../dn42/rpsl/{filedom.py => file.py} | 14 +- utils/registry/dn42/rpsl/metafile.py | 23 +++ utils/registry/dn42/rpsl/nettree.py | 116 ++++++++++++--- .../dn42/rpsl/{rspldom.py => rspl.py} | 47 ++++-- utils/registry/dn42/rpsl/schema.py | 2 +- utils/registry/dn42/rpsl/spec.py | 21 +++ utils/registry/dn42/rpsl/test_file.py | 134 ++++++++++++++++++ utils/registry/dn42/rpsl/transact.py | 2 +- utils/registry/rpsl_index/__init__.py | 39 +++-- utils/registry/rpsl_init/__init__.py | 1 + utils/registry/rpsl_whois/__init__.py | 47 ++++-- utils/registry/scan-registry.py | 70 --------- 14 files changed, 392 insertions(+), 139 deletions(-) rename utils/registry/dn42/rpsl/{filedom.py => file.py} (95%) create mode 100644 utils/registry/dn42/rpsl/metafile.py rename utils/registry/dn42/rpsl/{rspldom.py => rspl.py} (70%) create mode 100644 utils/registry/dn42/rpsl/spec.py create mode 100644 utils/registry/dn42/rpsl/test_file.py delete mode 100755 utils/registry/scan-registry.py diff --git a/utils/registry/dn42/rpsl/__init__.py b/utils/registry/dn42/rpsl/__init__.py index d855f10c9..6a27b6f25 100644 --- a/utils/registry/dn42/rpsl/__init__.py +++ b/utils/registry/dn42/rpsl/__init__.py @@ -2,18 +2,18 @@ __version__ = "0.3.0" -from .filedom import FileDOM, Row, Value, index_files +from .file import FileDOM, Row, Value, index_files from .schema import SchemaDOM, Level, State from .transact import TransactDOM from .config import Config -from .nettree import NetTree, NetRecord, NetList -from .rspldom import RPSL +from .nettree import NetTree, NetRecord, NetList, as_net6 +from .rspl import RPSL __all__ = [ "FileDOM", "Row", "Value", "index_files", "SchemaDOM", "Level", "State", "TransactDOM", "Config", - "NetTree", "NetRecord", "NetList", + "NetTree", "NetRecord", "NetList", "as_net6", "RPSL", ] diff --git a/utils/registry/dn42/rpsl/config.py b/utils/registry/dn42/rpsl/config.py index f964aeec7..fe2d49e05 100644 --- a/utils/registry/dn42/rpsl/config.py +++ b/utils/registry/dn42/rpsl/config.py @@ -6,7 +6,7 @@ import os.path from dataclasses import dataclass from typing import Dict, Set, Tuple, Optional, TypeVar -from .filedom import FileDOM +from .file import FileDOM C = TypeVar('C', bound='Config') @@ -66,6 +66,11 @@ class Config: "return network parents" return set(self.network_owners.values()) + @property + def network_children(self) -> Set[str]: + "return network children" + return set(self.network_owners.keys()) - self.network_parents + @property def schema_dir(self) -> str: "get schema directory" diff --git a/utils/registry/dn42/rpsl/filedom.py b/utils/registry/dn42/rpsl/file.py similarity index 95% rename from utils/registry/dn42/rpsl/filedom.py rename to utils/registry/dn42/rpsl/file.py index 329bb0104..e5b39ae9a 100644 --- a/utils/registry/dn42/rpsl/filedom.py +++ b/utils/registry/dn42/rpsl/file.py @@ -55,6 +55,15 @@ class Value: """Format as key name""" return self.value.replace("/", "_").replace(" ", "") + @property + def as_spec(self) -> List[str]: + "get the spec definition" + fields = self.fields + i = fields.index(">") + if i is None: + return [] + return fields[i:] + class Row(NamedTuple): """DOM Row""" @@ -183,11 +192,10 @@ class FileDOM: return f"{self.namespace}.{self.schema}" @property - def index(self) -> Tuple[Tuple[str, str], Tuple[str, str]]: + def index(self) -> Tuple[str, str]: """generate index key/value pair""" name = self.src.split("/")[-1].replace("_", "/") - return ((f"{self.namespace}.{self.schema}", name), - (self.src, ",".join(self.mntner))) + return f"{self.namespace}.{self.schema}", name def __str__(self): length = 19 diff --git a/utils/registry/dn42/rpsl/metafile.py b/utils/registry/dn42/rpsl/metafile.py new file mode 100644 index 000000000..ce8afe83e --- /dev/null +++ b/utils/registry/dn42/rpsl/metafile.py @@ -0,0 +1,23 @@ +"Metafile" +from dataclasses import dataclass +from typing import Sequence, Generator + +from .rspl import RPSL +from .file import Value + + +@dataclass +class MetaFile: + "file" + obj_type: str + obj_name: str + + +class MetaDOM: + "metafile dom" + def __init__(self, lis: Sequence[MetaFile], rpsl: RPSL): + self.lis = lis + self.rpsl = rpsl + + def get(self, name: str) -> Generator[Value, None, None]: + "get values" diff --git a/utils/registry/dn42/rpsl/nettree.py b/utils/registry/dn42/rpsl/nettree.py index ab6b0b221..b85e28733 100644 --- a/utils/registry/dn42/rpsl/nettree.py +++ b/utils/registry/dn42/rpsl/nettree.py @@ -1,7 +1,7 @@ "Net Tree" from ipaddress import ip_network, IPv6Network -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Dict, List, Tuple, Optional, Generator, TypeVar NET = IPv6Network @@ -10,16 +10,33 @@ V4_NET = ip_network("::ffff:0.0.0.0/96") NT = TypeVar("NT", bound="NetTree") +def as_net6(value: str) -> IPv6Network: + """return value as an ip network""" + net = ip_network(value) + + if isinstance(net, IPv6Network): + return net + + n = net + return ip_network( + f"::FFFF:{n.network_address}/{n.prefixlen + 96}") + + @dataclass class NetRecord: "Network Record" network: NET policy: str status: str + is_leaf: bool = False @property def object_type(self) -> str: """object type""" + if self.is_leaf: + return "route" if V4_NET.supernet_of(self.network) \ + else "route6" + return "inetnum" if V4_NET.supernet_of(self.network) \ else "inet6num" @@ -35,6 +52,9 @@ class NetRecord: return self.network.with_prefixlen.replace("/", "_") + def __str__(self) -> str: + return f"{self.object_type}/{self.object_name}" + @dataclass class NetList: @@ -44,6 +64,7 @@ class NetList: level: int net: Optional[NetRecord] nets: List[NET] + routes: List[NetRecord] = field(default_factory=list) def in_net(self, i: NET) -> Tuple[bool, NET]: "find a network within a list of networks" @@ -57,39 +78,55 @@ class NetList: return found, net + def in_routes(self, i: NET) -> Tuple[bool, NET]: + "find a network within a list of networks" + found = False + net = None + for n in self.routes: + if n.network.supernet_of(i): + found = True + net = n + break + + return found, net + class NetTree: "Network Tree" - def __init__(self, nets: Optional[List[NET]] = None): + def __init__(self, + nets: Optional[List[NetRecord]] = None, + routes: Optional[List[NetRecord]] = None): self.tree = {} # type: Dict[NET, NetList] + if routes is None: + routes = [] if nets is not None: - self.make_tree(nets) + self.make_tree(nets, routes) def __getitem__(self, key): return self.tree[key] - def find_tree(self, ip: NET) -> Tuple[bool, int]: + def find_tree(self, ip: str) -> Generator[NetList, None, None]: """Find net in tree""" net = V6_NET current = self.tree[net] + needle = as_net6(ip) + yield current while True: - found, net = current.in_net(ip) - if not found: - return True, current.level + 1 + found, net = current.in_net(needle) + if found: + current = self.tree[net] + yield current + continue + break - if ip == net: - return True, current.level + 2 - - current = self.tree[net] - continue - - return False, 0 - - def make_tree(self, nets: List[NetRecord]): + def make_tree(self, + nets: List[NetRecord], + routes: List[NetRecord]): """build a network tree index""" root = V6_NET self.tree = {root: NetList(0, None, -1, None, [])} + index = 0 for index, net in enumerate(sorted( sorted(nets, key=lambda x: x.network), key=lambda x: x.network.prefixlen)): @@ -110,6 +147,23 @@ class NetTree: index, current.index, current.level + 1, net, []) break + for index, net in enumerate(sorted( + sorted(routes, key=lambda x: x.network), + key=lambda x: x.network.prefixlen), index): + + current = self.tree[root] + + while True: + found, n = current.in_net(net.network) + if found: + current = self.tree[n] + continue + + rec = NetRecord(net.network, "-", "-", True) + current.routes.append(rec) + + break + def write_csv(self, fn: str = ".netindex"): "write tree to csv" with open(fn, "w") as f: @@ -135,6 +189,19 @@ class NetTree: v.net.object_type, v.net.object_name, )]) + "\n") + for route in v.routes: + net_addr = route.network.network_address.exploded + net_pfx = route.network.prefixlen + yield ( + "|".join([str(i) for i in ( + f"{0:04d}|{v.index:04d}|{v.level+1:04d}", + net_addr, + net_pfx, + route.policy, + route.status, + route.object_type, + route.object_name, + )]) + "\n") @classmethod def read_csv(cls, fn) -> NT: @@ -146,15 +213,20 @@ class NetTree: if len(sp) != 9: continue net = ip_network(f"{sp[3]}/{sp[4]}") - rec = NetRecord(net, sp[5], sp[6]) - lis = NetList(sp[0], sp[1], sp[2], rec, []) - inttree[sp[0]] = lis - if sp[0] != sp[1]: - inttree[sp[1]].nets.append(net) + is_leaf = sp[7] in ("route", "route6") + rec = NetRecord(net, sp[5], sp[6], is_leaf) + if is_leaf: + inttree[sp[1]].routes.append(rec) + else: + lis = NetList(sp[0], sp[1], sp[2], rec, []) + inttree[sp[0]] = lis + + if sp[0] != sp[1]: + inttree[sp[1]].nets.append(net) nettree = {} for v in inttree.values(): nettree[v.net.network] = v c = cls() - c.tree = NetTree + c.tree = nettree return c diff --git a/utils/registry/dn42/rpsl/rspldom.py b/utils/registry/dn42/rpsl/rspl.py similarity index 70% rename from utils/registry/dn42/rpsl/rspldom.py rename to utils/registry/dn42/rpsl/rspl.py index 8af635cc5..98ea4ef52 100644 --- a/utils/registry/dn42/rpsl/rspldom.py +++ b/utils/registry/dn42/rpsl/rspl.py @@ -1,10 +1,10 @@ "RPSL" import os.path -from typing import Dict, List, Tuple, TypeVar, Optional, Generator +from typing import Dict, List, Tuple, TypeVar, Optional, Sequence -from .filedom import FileDOM -from .nettree import NetTree +from .file import FileDOM +from .nettree import NetTree, NetList from .schema import SchemaDOM, State from .transact import TransactDOM from .config import Config @@ -49,7 +49,7 @@ class RPSL: def append_index(self, dom: FileDOM): "append files to index" - key, value = dom.index + key, value = dom.index, (dom.src, ",".join(dom.mntner)) self._lookup[key] = value def scan_files(self, files: List[FileDOM]) -> State: @@ -67,24 +67,35 @@ class RPSL: def find(self, text: str, - schema: Optional[str] = None) -> Generator[FileDOM, None, None]: + schema: Optional[str] = None) -> Sequence[str]: "Find files that match text and schema" keys = [(schema, text)] if schema is None: keys = self._lookup.get(text, []) + return [self._files[i] for i in keys] + + def related( + self, + key: Tuple[str, str]) -> Sequence[str]: + "Get files related to file" related = set() + for link in self.links(key): + key = (link[1], link[2]) + related.add(key) - for i in keys: - yield self.load_file(self._files[i]) - for link in self.links(i): - key = (link[1], link[2]) - related.add(key) + return [self._files[i] for i in related] - for i in related: - if i in keys: - continue - yield self.load_file(self._files[i]) + def find_network(self, ip: str) -> Sequence[NetList]: + """Find Network in index + + Args: + ip (str): ip address + + Returns: + Generator[NetList, None, None]: generator of netlists + """ + return self._nettree.find_tree(ip) def load_file(self, fn: str) -> FileDOM: "load file" @@ -95,6 +106,14 @@ class RPSL: return fo + def load_files(self, fns: Sequence[str]) -> Sequence[NetList]: + for fn in fns: + yield self.load_file(fn) + def links(self, key: Tuple[str, str]) -> List[Tuple[str, str]]: "get links" return self._links.get(key, []) + + def schema(self, name: str) -> SchemaDOM: + "get schema" + return self._schema.get(name) diff --git a/utils/registry/dn42/rpsl/schema.py b/utils/registry/dn42/rpsl/schema.py index 8b90a39d3..5f219b936 100644 --- a/utils/registry/dn42/rpsl/schema.py +++ b/utils/registry/dn42/rpsl/schema.py @@ -6,7 +6,7 @@ from typing import Optional, List, Tuple, Dict, Set, TypeVar import log -from .filedom import FileDOM, Row +from .file import FileDOM, Row DOM = TypeVar("DOM", bound="FileDOM") STATE = TypeVar("STATE", bound="State") diff --git a/utils/registry/dn42/rpsl/spec.py b/utils/registry/dn42/rpsl/spec.py new file mode 100644 index 000000000..1e3858dab --- /dev/null +++ b/utils/registry/dn42/rpsl/spec.py @@ -0,0 +1,21 @@ +"spec" +from dataclasses import dataclass +from typing import Dict, List, Enum + +class Rule: + pass + + +@dataclass +class LabelRule(Rule): + name: str + + def parse(self, fields: Sequence[str]) -> Optional[Tuple[str, str]]: + +@dataclass +class Spec: + keys: Dict[str, SpecRule] + + @classmethod + def from_dom(cls, dom: file.FileDOM): + for key in diff --git a/utils/registry/dn42/rpsl/test_file.py b/utils/registry/dn42/rpsl/test_file.py new file mode 100644 index 000000000..4fb888538 --- /dev/null +++ b/utils/registry/dn42/rpsl/test_file.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +"""Test FileDOM""" +import unittest +import inspect +from pprint import pprint + +from .filedom import FileDOM + + +class TestFileDOM(unittest.TestCase): + """Test FileDOM""" + + def test_parse(self): + """Test Parsing""" + s = """ + person: Xuu + remarks: test + + + Multi-Line + contact: xmpp:xuu@xmpp.dn42 + contact: mail:xuu@dn42.us + pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F + nic-hdl: XUU-DN42 + mnt-by: XUU-MNT + source: DN42 + """ + s = inspect.cleandoc(s)+"\n" + + dom = FileDOM() + dom.parse(s.splitlines()) + + self.assertTrue(dom.valid) + self.assertEqual(dom.schema, "person") + self.assertEqual(dom.get("person"), "Xuu") + self.assertEqual(dom.get("contact"), "xmpp:xuu@xmpp.dn42") + self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us") + self.assertIsNone(dom.get("xxx")) + self.assertEqual(dom.get("xxx", default="default"), "default") + self.assertEqual(str(dom), s) + + def test_put_values(self): + """Test putting values""" + s = """ + person: Xuu + remarks: test + contact: xmpp:xuu@xmpp.dn42 + contact: mail:xuu@dn42.us + pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F + nic-hdl: XUU-DN42 + mnt-by: XUU-MNT + source: DN42 + """ + s = inspect.cleandoc(s)+"\n" + + dom = FileDOM() + dom.parse(s.splitlines()) + + dom.put("source", "SOURIS") + self.assertEqual(dom.get("source"), "SOURIS") + + dom.put("contact", "mail:me@sour.is", append=True) + self.assertEqual(str(dom.get("contact")), "xmpp:xuu@xmpp.dn42") + self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us") + self.assertEqual(dom.get("contact", index=2), "mail:me@sour.is") + + def test_parse_ip6address(self): + """Test network ip address parsing""" + s = """ + inet6num: fd00:0000:0000:0000:0000:0000:0000:0000 - fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff + cidr: fd00::/8 + netname: ROOT-DN42-ULA + descr: DN42 ULA Address Space + status: ALLOCATED + policy: open + org: ORG-DN42 + mnt-by: DN42-MNT + source: DN42 + """ # noqa: E501 + + s = inspect.cleandoc(s)+"\n" + + dom = FileDOM(text=s.splitlines()) + + cidr = dom.get("cidr").as_net + self.assertEqual(cidr.compressed, "fd00::/8") + self.assertEqual( + cidr.exploded, "fd00:0000:0000:0000:0000:0000:0000:0000/8") + + end = cidr.broadcast_address.exploded + start = cidr.network_address.exploded + + self.assertEqual(dom.get("inet6num"), f"{start} - {end}") + + def test_parse_ip4address(self): + """Test network ip address parsing""" + s = """ + inetnum: 172.20.0.0 - 172.23.255.255 + cidr: 172.20.0.0/14 + netname: ROOT-DN42 + """ + + s = inspect.cleandoc(s)+"\n" + + dom = FileDOM(text=s.splitlines()) + + cidr = dom.get("cidr").as_net + self.assertEqual(cidr.compressed, "172.20.0.0/14") + self.assertEqual( + cidr.exploded, "172.20.0.0/14") + + end = cidr.broadcast_address.exploded + start = cidr.network_address.exploded + + self.assertEqual(dom.get("inetnum"), f"{start} - {end}") + + @unittest.skip + def test_bad_parse(self): + """bad parse stuff""" + s = """ + person: Xuu + EXTRA + : + source: DN42 + """ + s = inspect.cleandoc(s)+"\n" + + dom = FileDOM() + dom.parse(s.splitlines()) + pprint(dom.dom) + self.assertEqual(str(dom), s) + + +if __name__ == '__main__': + unittest.main() diff --git a/utils/registry/dn42/rpsl/transact.py b/utils/registry/dn42/rpsl/transact.py index 94048d87c..66fff27ac 100644 --- a/utils/registry/dn42/rpsl/transact.py +++ b/utils/registry/dn42/rpsl/transact.py @@ -2,7 +2,7 @@ from typing import Sequence, List, Optional, Tuple, TypeVar -from .filedom import FileDOM +from .file import FileDOM from .schema import SchemaDOM DOM = TypeVar("DOM", bound="TransactDOM") diff --git a/utils/registry/rpsl_index/__init__.py b/utils/registry/rpsl_index/__init__.py index bd3eb352f..bafa94b1b 100644 --- a/utils/registry/rpsl_index/__init__.py +++ b/utils/registry/rpsl_index/__init__.py @@ -40,12 +40,13 @@ def run(args: List[str], env: Dict[str, str]) -> int: idx = index_files(path, namespace=config.namespace, primary_keys=config.primary_keys) - lookup, schemas, files, nets = build_index(idx, rspl=config) + lookup, schemas, files, nets, routes = build_index(idx, rspl=config) print( f"Reading Files: done! files: {len(files)}" + f" schemas: {len(schemas)}" + f" networks: {len(nets)}", + f" routes: {len(routes)}", file=sys.stderr) print("Writing .rpsl/index", file=sys.stderr) @@ -71,7 +72,7 @@ def run(args: List[str], env: Dict[str, str]) -> int: file=link_out) print("Generate .rpsl/nettree", file=sys.stderr) - tree = NetTree(nets) + tree = NetTree(nets, routes) print("Writing .rpsl/nettree", file=sys.stderr) tree.write_csv(".rpsl/nettree") @@ -105,17 +106,19 @@ def build_index( schemas = {} # type: Dict[str, SchemaDOM] files = [] # type: List[FileDOM] nets = [] # type: List[NetRecord] + routes = [] # type: List[NetRecord] print(r"Reading Files...", end="\r", flush=True, file=sys.stderr) net_types = rspl.network_parents + net_leafs = rspl.network_children for (i, dom) in enumerate(idx): if not dom.valid: print("E", end="", flush=True) continue - key, _ = dom.index + key = dom.index lookup.add(key) files.append(dom) @@ -130,14 +133,23 @@ def build_index( dom.get("status", default="ASSIGNED"), )) + if dom.schema in net_leafs: + routes.append(NetRecord( + dom.get(dom.primary_key).as_net6, + dom.get("policy", default="none"), + dom.get("status", default="none"), + True, + )) + if i % 120 == 0: print( f"Reading Files: files: {len(files)}" + f" schemas: {len(schemas)} " + f" networks: {len(nets)}", + f" routes: {len(routes)}", end="\r", flush=True, file=sys.stderr) - return (lookup, schemas, files, nets) + return (lookup, schemas, files, nets, routes) def generate_links( @@ -147,15 +159,12 @@ def generate_links( ) -> Generator[Tuple[str, str, str], None, None]: "print file links out to file" for (link, refs) in links.items(): - d = dom.get(link) - if d is None: - continue + for d in dom.get_all(link): + found = False + for ref in refs: + if (ref, d.value) in lookup: + found = True + yield (link, ref, d) - found = False - for ref in refs: - if (ref, d.value) in lookup: - found = True - yield (link, ref, d) - - if not found: - print(f"{dom.name} missing link {link} {d.value}") + if not found: + print(f"{dom.name} missing link {link} {d.value}") diff --git a/utils/registry/rpsl_init/__init__.py b/utils/registry/rpsl_init/__init__.py index 70e61f733..587ee917d 100644 --- a/utils/registry/rpsl_init/__init__.py +++ b/utils/registry/rpsl_init/__init__.py @@ -63,6 +63,7 @@ def run(args: List[str], env: Dict[str, str]) -> int: print(rpsl, file=f) print(f"Created: {rpsl.config_file}", file=sys.stderr) + env["RPSL_DIR"] = rpsl_dir rpsl_index.run(args, env) return 0 diff --git a/utils/registry/rpsl_whois/__init__.py b/utils/registry/rpsl_whois/__init__.py index c0cce9e1a..b67afcb22 100644 --- a/utils/registry/rpsl_whois/__init__.py +++ b/utils/registry/rpsl_whois/__init__.py @@ -6,10 +6,10 @@ Usage: rpsl whois [text] """ import sys -from ipaddress import ip_network -from typing import List, Dict, Optional +from itertools import chain +from typing import List, Dict, Optional, Set, Tuple -from dn42.rpsl import RPSL, Config +from dn42.rpsl import RPSL, Config, FileDOM, as_net6 from dn42.utils import shift, exists @@ -41,15 +41,46 @@ def run(args: List[str], env: Dict[str, str]) -> int: ip = None try: - ip = ip_network(text) + ip = as_net6(text) except ValueError: pass - if ip is not None: - print(f"Searching network {text}...") - return 0 + principle = [] # type: List[FileDOM] + related_nets = [] # type: List[FileDOM] + related_idx = set() # type: Set[Tuple[str, str]] - for dom in rpsl.find(text, schema): + if ip is not None: + print(f"# Searching network {text}...") + nets = list(rpsl.find_network(text)) + last_net = nets[-1] + dom = rpsl.load_file(str(last_net.net)) + principle.append(dom) + related_idx.add(dom.index) + ok, route = last_net.in_routes(ip) + if ok: + dom = rpsl.load_file(str(route)) + principle.append(dom) + related_idx.add(dom.index) + + for net in nets[:-1]: + dom = rpsl.load_file(str(net.net)) + related_nets.append(dom) + else: + for dom in rpsl.find(text, schema): + principle.append(dom) + related_idx.add(dom.index) + + print("# Found objects") + for dom in principle: print(dom) + if len(related_nets) > 0: + print("# Related Networks") + for dom in related_nets: + print(dom) + + print("# Related objects") + lis = set(chain.from_iterable(rpsl.related(i) for i in related_idx)) + for dom in rpsl.load_files(sorted(lis)): + print(dom) return 0 diff --git a/utils/registry/scan-registry.py b/utils/registry/scan-registry.py deleted file mode 100755 index 52c34d198..000000000 --- a/utils/registry/scan-registry.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env python3 -"""Scans Registry at given path for issues""" - -import os -import sys -from typing import Dict - -from dom.filedom import FileDOM -from dom.schema import SchemaDOM - - -def index_files(path: str): - """generate list of dom files""" - for root, _, files in os.walk(path): - if root == path: - continue - - for f in files: - if f[0] == ".": - continue - - dom = FileDOM.from_file(os.path.join(root, f)) - - yield dom - - -def run(path: str = "."): - """run main script""" - idx = index_files(path) - - lookups = {} # type: Dict[str, FileDOM] - schemas = {} # type: Dict[str, SchemaDOM] - files = [] - - print(r"Reading Files...", end="\r", flush=True, file=sys.stderr) - - for (i, dom) in enumerate(idx): - if not dom.valid: - print("E", end="", flush=True) - continue - - key, value = dom.index - lookups[key] = value - files.append(dom) - - if dom.schema == "schema": - schema = SchemaDOM(dom) - schemas[schema.ref] = schema - - if i % 120 == 0: - print( - f"Reading Files: files: {len(files)} schemas: {len(schemas)}", - end="\r", flush=True, file=sys.stderr) - - print( - f"Reading Files: done! files: {len(files)}, schemas: {len(schemas)}", - file=sys.stderr) - - for dom in files: - s = schemas.get(dom.rel) - if s is None: - print(f"{dom.src} schema not found for {dom.rel}") - continue - - status = s.check_file(dom, lookups) - status.print() - - -if __name__ == "__main__": - run(sys.argv[1] if len(sys.argv) > 1 else os.getcwd())