diff --git a/utils/registry/Makefile b/utils/registry/Makefile deleted file mode 100644 index 6e18e28fd..000000000 --- a/utils/registry/Makefile +++ /dev/null @@ -1,2 +0,0 @@ -test: - python3 -m unittest discover \ No newline at end of file diff --git a/utils/registry/dn42/__init__.py b/utils/registry/dn42/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/utils/registry/dn42/rpsl/__init__.py b/utils/registry/dn42/rpsl/__init__.py deleted file mode 100644 index 6a27b6f25..000000000 --- a/utils/registry/dn42/rpsl/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -"DN42 RSPL Library" - -__version__ = "0.3.0" - -from .file import FileDOM, Row, Value, index_files -from .schema import SchemaDOM, Level, State -from .transact import TransactDOM -from .config import Config -from .nettree import NetTree, NetRecord, NetList, as_net6 -from .rspl import RPSL - -__all__ = [ - "FileDOM", "Row", "Value", "index_files", - "SchemaDOM", "Level", "State", - "TransactDOM", - "Config", - "NetTree", "NetRecord", "NetList", "as_net6", - "RPSL", -] diff --git a/utils/registry/dn42/rpsl/config.py b/utils/registry/dn42/rpsl/config.py deleted file mode 100644 index fe2d49e05..000000000 --- a/utils/registry/dn42/rpsl/config.py +++ /dev/null @@ -1,148 +0,0 @@ - -"RSPL Config" - -import os -import os.path -from dataclasses import dataclass -from typing import Dict, Set, Tuple, Optional, TypeVar - -from .file import FileDOM - - -C = TypeVar('C', bound='Config') - - -@dataclass -class Config: - "RSPL Config" - path: str - _dom: FileDOM - - @property - def namespace(self) -> str: - "Get namespace" - return self._dom.get("namespace", default="dn42").value - - @property - def schema(self) -> str: - "Get schema type name" - return str(self._dom.get("schema", default="schema")) - - @property - def owners(self) -> str: - "Get owner type name" - return str(self._dom.get("owner", default="mntner")) - - @property - def source(self) -> str: - "Get source" - return self._dom.get("source", default="DN42").value - - @property - def default_owner(self) -> str: - "Get default onwer" - return self._dom.get("default-owner", default=self._dom.mntner).value - - @property - def network_owners(self) -> Dict[str, str]: - "Get network owners" - network_owner = {} # type: Dict[str, str] - for (parent, child) in [ - i.fields for i in self._dom.get_all("network-owner")]: - network_owner[child] = parent - return network_owner - - @property - def primary_keys(self) -> Dict[str, str]: - "Get primary keys" - primary_keys = {} # type: Dict[str, str] - for (parent, key) in [ - i.fields for i in self._dom.get_all("primary-key")]: - primary_keys[parent] = key - return primary_keys - - @property - def network_parents(self) -> Set[str]: - "return network parents" - return set(self.network_owners.values()) - - @property - def network_children(self) -> Set[str]: - "return network children" - return set(self.network_owners.keys()) - self.network_parents - - @property - def schema_dir(self) -> str: - "get schema directory" - return os.path.join(self.path, self.schema) - - @property - def owner_dir(self) -> str: - "get owner directory" - return os.path.join(self.path, self.owners) - - @property - def config_file(self) -> str: - "get config file" - return os.path.join(self.path, ".rpsl/config") - - @property - def index_file(self) -> str: - "get index file" - return os.path.join(self.path, ".rpsl/index") - - @property - def links_file(self) -> str: - "get links file" - return os.path.join(self.path, ".rpsl/links") - - @property - def schema_file(self) -> str: - "get schema file" - return os.path.join(self.path, ".rpsl/schema") - - @property - def nettree_file(self) -> str: - "get nettree file" - return os.path.join(self.path, ".rpsl/nettree") - - @classmethod - def from_path(cls, path: str) -> C: - "create from path" - src = os.path.join(path, ".rpsl/config") - return cls(FileDOM.from_file(src)) - - @classmethod - def build(cls, # pylint: disable=too-many-arguments - path: str, - namespace: str = "dn42", - owners: str = "mntner", - schema: str = "schema", - source: str = "DN42", - dir_name: Optional[Set[Tuple[str, str]]] = None, - primary_keys: Optional[Set[Tuple[str, str]]] = None, - network_owners: Optional[Set[Tuple[str, str]]] = None, - ) -> FileDOM: - "Build config from parameters" - FileDOM.namespace = namespace - dom = FileDOM() - dom.src = os.path.join(path, ".rpsl/config") - dom.put("namespace", namespace) - dom.put("owners", owners) - dom.put("schema", schema) - dom.put("source", source) - for (k, v) in dir_name: - dom.put("dir-name", f"{k} {v}", append=True) - for (k, v) in primary_keys: - dom.put("primary-key", f"{k} {v}", append=True) - for (k, v) in network_owners: - dom.put("network-owner", f"{v} {k}", append=True) - - return cls(dom) - - def __init__(self, dom: FileDOM): - self._dom = dom - self.path = os.path.dirname(os.path.dirname(dom.src)) - - def __str__(self): - return self._dom.__str__() diff --git a/utils/registry/dn42/rpsl/file.py b/utils/registry/dn42/rpsl/file.py deleted file mode 100644 index e5b39ae9a..000000000 --- a/utils/registry/dn42/rpsl/file.py +++ /dev/null @@ -1,275 +0,0 @@ -"""FileDOM parse and formating""" - -import re -import os -from dataclasses import dataclass -from typing import Sequence, NamedTuple, List, \ - Dict, Optional, Tuple, Union, Generator, TypeVar -from ipaddress import ip_network, IPv4Network, IPv6Network - -import log - -DOM = TypeVar("DOM", bound="FileDOM") - - -@dataclass(frozen=True) -class Value: - """Dom Value""" - value: str - - def __eq__(self, other: str) -> bool: - return self.value == other - - def __str__(self) -> str: - return self.value - - @property - def lines(self) -> List[str]: - """return value split into lines""" - return self.value.splitlines() - - @property - def fields(self) -> List[str]: - """return value split into fields""" - return self.value.split() - - @property - def as_net(self) -> Union[IPv4Network, IPv6Network]: - """return value as an ip network""" - return ip_network(self.value) - - @property - def as_net6(self) -> IPv6Network: - """return value as an ip network""" - net = ip_network(self.value) - - if isinstance(net, IPv6Network): - return net - - n = net - return ip_network( - f"::FFFF:{n.network_address}/{n.prefixlen + 96}") - - @property - def as_key(self) -> str: - """Format as key name""" - return self.value.replace("/", "_").replace(" ", "") - - @property - def as_spec(self) -> List[str]: - "get the spec definition" - fields = self.fields - i = fields.index(">") - if i is None: - return [] - return fields[i:] - - -class Row(NamedTuple): - """DOM Row""" - key: str - value: Value - lineno: int - src: str = None - - @property - def loc(self) -> str: - """format as location""" - s = f"{self.src} Line {self.lineno} " - s += "" if self.key == "" else f"Key [{self.key}]:" - return s - - -class FileDOM: - """Parses a reg file""" - - def __init__(self, - text: Optional[Sequence[str]] = None, - src: Optional[str] = None): - self.valid = False - self.dom = [] # type: List[Row] - self.keys = {} # type: Dict[str, int] - self.multi = {} # type: Dict[str, int] - self.mntner = [] # type: List[str] - self.namespace = "" - self.primary_key = "" - - self.src = src - - if text is not None: - self.parse(text, src=src) - - def parse(self, text: Sequence[str], src: Optional[str] = None): - """Parse an input string generator""" - dom = [] - keys = {} - multi = {} - mntner = [] - last_multi = None - self.valid = False - self.src = self.src if src is None else src - - for lineno, i in enumerate(text, 1): - # print(lineno, i) - if re.match(r'[ \t]', i): - if len(dom) == 0: - log.error(f"File {src} does not parse properly") - return - - dom[-1][1] += "\n" + i.strip() - - if dom[-1][0] not in multi: - multi[dom[-1][0]] = [] - - if last_multi is None: - multi[dom[-1][0]].append(lineno) - last_multi = dom[-1][0] - - else: - if i[0] == '+': - dom[-1][1] += "\n" - - if dom[-1][0] not in multi: - multi[dom[-1][0]] = [] - - if last_multi is None: - multi[dom[-1][0]].append(lineno) - last_multi = dom[-1][0] - - i = i.split(":") - if len(i) < 2: - continue - - dom.append([i[0].strip(), ':'.join( - i[1:]).strip(), lineno - 1]) - - if i[0].strip() not in keys: - keys[i[0].strip()] = [] - - keys[i[0].strip()].append(len(dom) - 1) - - last_multi = None - - if dom[-1][0] == 'mnt-by': - mntner.append(dom[-1][1]) - - self.dom = [Row(k, Value(v), n, self.src) for k, v, n in dom] - if len(self.dom) > 1: - self.primary_key = self.dom[0].key - self.keys = keys - self.multi = multi - self.mntner = mntner - if self.src is None: - self.src = f"{self.schema}/{self.name}" - self.valid = True - - @property - def schema(self) -> str: - """return the schema name for file""" - if len(self.dom) < 1: - return None - - return self.dom[0].key - - @property - def name(self) -> str: - """return the friendly name for file""" - if self.primary_key != "": - return self.get(self.primary_key).value - - if len(self.dom) < 1: - return "none" - - fields = self.dom[0].value.fields - if len(fields) < 1: - return "none" - - return fields[0] - - @property - def rel(self) -> str: - "generate rel for schema ref" - return f"{self.namespace}.{self.schema}" - - @property - def index(self) -> Tuple[str, str]: - """generate index key/value pair""" - name = self.src.split("/")[-1].replace("_", "/") - return f"{self.namespace}.{self.schema}", name - - def __str__(self): - length = 19 - for i in self.dom: - if len(i.key) > length: - length = len(i.key) + 2 - s = "" - for i in self.dom: - sp = i.value.lines - if len(sp) == 0: - s += i.key + ":" + " " * (length - len(i.key)) + "\n" - continue - s += i.key + ":" + " " * (length - len(i.key)) + sp[0] + "\n" - for m in sp[1:]: - if m == "": - s += "+\n" - continue - s += " " * (length + 1) + m + "\n" - - return s - - def get(self, key, index=0, default=None): - """Get a key value""" - if key not in self.keys: - return default - if index >= len(self.keys[key]) or index <= -len(self.keys[key]): - return default - - return self.dom[self.keys[key][index]].value - - def get_all(self, key) -> Generator[str, None, None]: - "Get all values for a key" - if key not in self.keys: - return - for i in self.keys[key]: - yield self.dom[i].value - - def put(self, key, value, index=0, append=False): - """Put a value""" - if key not in self.keys: - self.keys[key] = [] - - i = (self.keys[key][index:index+1] or (None,))[0] - if i is None or append: - i = len(self.dom) - self.dom.append(Row(key, Value(value), i)) - elif i is not None: - self.dom[i] = Row(key, Value(value), i) - - if index not in self.keys[key]: - self.keys[key].append(i) - - @classmethod - def from_file(cls, fn: str) -> DOM: - """Parses FileDOM from file""" - with open(fn, mode='r', encoding='utf-8') as f: - dom = cls(src=fn, text=f.readlines()) - - return dom - - -def index_files(path: str, - namespace: str, - primary_keys: Dict[str, str]) -> FileDOM: - """generate list of dom files""" - for root, _, files in os.walk(path): - if root == path: - continue - if root.endswith(".rpsl"): - continue - - for f in files: - dom = FileDOM.from_file(os.path.join(root, f)) - dom.namespace = namespace - if dom.schema in primary_keys: - dom.primary_key = primary_keys[dom.schema] - yield dom diff --git a/utils/registry/dn42/rpsl/metafile.py b/utils/registry/dn42/rpsl/metafile.py deleted file mode 100644 index ce8afe83e..000000000 --- a/utils/registry/dn42/rpsl/metafile.py +++ /dev/null @@ -1,23 +0,0 @@ -"Metafile" -from dataclasses import dataclass -from typing import Sequence, Generator - -from .rspl import RPSL -from .file import Value - - -@dataclass -class MetaFile: - "file" - obj_type: str - obj_name: str - - -class MetaDOM: - "metafile dom" - def __init__(self, lis: Sequence[MetaFile], rpsl: RPSL): - self.lis = lis - self.rpsl = rpsl - - def get(self, name: str) -> Generator[Value, None, None]: - "get values" diff --git a/utils/registry/dn42/rpsl/nettree.py b/utils/registry/dn42/rpsl/nettree.py deleted file mode 100644 index b85e28733..000000000 --- a/utils/registry/dn42/rpsl/nettree.py +++ /dev/null @@ -1,232 +0,0 @@ -"Net Tree" - -from ipaddress import ip_network, IPv6Network -from dataclasses import dataclass, field -from typing import Dict, List, Tuple, Optional, Generator, TypeVar - -NET = IPv6Network -V6_NET = ip_network("::/0") -V4_NET = ip_network("::ffff:0.0.0.0/96") -NT = TypeVar("NT", bound="NetTree") - - -def as_net6(value: str) -> IPv6Network: - """return value as an ip network""" - net = ip_network(value) - - if isinstance(net, IPv6Network): - return net - - n = net - return ip_network( - f"::FFFF:{n.network_address}/{n.prefixlen + 96}") - - -@dataclass -class NetRecord: - "Network Record" - network: NET - policy: str - status: str - is_leaf: bool = False - - @property - def object_type(self) -> str: - """object type""" - if self.is_leaf: - return "route" if V4_NET.supernet_of(self.network) \ - else "route6" - - return "inetnum" if V4_NET.supernet_of(self.network) \ - else "inet6num" - - @property - def object_name(self) -> str: - """object name""" - if V4_NET.supernet_of(self.network): - n = self.network.network_address.exploded.replace(":", "")[-8:] - return ip_network(( - bytes.fromhex(n), - self.network.prefixlen - 96, - )).with_prefixlen.replace("/", "_") - - return self.network.with_prefixlen.replace("/", "_") - - def __str__(self) -> str: - return f"{self.object_type}/{self.object_name}" - - -@dataclass -class NetList: - "Network List" - index: int - parent: Optional[int] - level: int - net: Optional[NetRecord] - nets: List[NET] - routes: List[NetRecord] = field(default_factory=list) - - def in_net(self, i: NET) -> Tuple[bool, NET]: - "find a network within a list of networks" - found = False - net = None - for n in self.nets: - if n.supernet_of(i): - found = True - net = n - break - - return found, net - - def in_routes(self, i: NET) -> Tuple[bool, NET]: - "find a network within a list of networks" - found = False - net = None - for n in self.routes: - if n.network.supernet_of(i): - found = True - net = n - break - - return found, net - - -class NetTree: - "Network Tree" - def __init__(self, - nets: Optional[List[NetRecord]] = None, - routes: Optional[List[NetRecord]] = None): - self.tree = {} # type: Dict[NET, NetList] - if routes is None: - routes = [] - if nets is not None: - self.make_tree(nets, routes) - - def __getitem__(self, key): - return self.tree[key] - - def find_tree(self, ip: str) -> Generator[NetList, None, None]: - """Find net in tree""" - net = V6_NET - current = self.tree[net] - needle = as_net6(ip) - - yield current - while True: - found, net = current.in_net(needle) - if found: - current = self.tree[net] - yield current - continue - break - - def make_tree(self, - nets: List[NetRecord], - routes: List[NetRecord]): - """build a network tree index""" - root = V6_NET - self.tree = {root: NetList(0, None, -1, None, [])} - index = 0 - for index, net in enumerate(sorted( - sorted(nets, key=lambda x: x.network), - key=lambda x: x.network.prefixlen)): - - current = self.tree[root] - - while True: - found, n = current.in_net(net.network) - - if found: - current = self.tree[n] - continue - - if current.level >= 0: - current.nets.append(net.network) - - self.tree[net.network] = NetList( - index, current.index, current.level + 1, net, []) - break - - for index, net in enumerate(sorted( - sorted(routes, key=lambda x: x.network), - key=lambda x: x.network.prefixlen), index): - - current = self.tree[root] - - while True: - found, n = current.in_net(net.network) - if found: - current = self.tree[n] - continue - - rec = NetRecord(net.network, "-", "-", True) - current.routes.append(rec) - - break - - def write_csv(self, fn: str = ".netindex"): - "write tree to csv" - with open(fn, "w") as f: - f.writelines(self._lines()) - - def __str__(self) -> str: - return "".join(self._lines()) - - def _lines(self) -> Generator[str, None, None]: - for v in sorted( - sorted(self.tree.values(), key=lambda x: x.index), - key=lambda x: x.level): - - net_addr = v.net.network.network_address.exploded - net_pfx = v.net.network.prefixlen - yield ( - "|".join([str(i) for i in ( - f"{v.index:04d}|{v.parent:04d}|{v.level:04d}", - net_addr, - net_pfx, - v.net.policy, - v.net.status, - v.net.object_type, - v.net.object_name, - )]) + "\n") - for route in v.routes: - net_addr = route.network.network_address.exploded - net_pfx = route.network.prefixlen - yield ( - "|".join([str(i) for i in ( - f"{0:04d}|{v.index:04d}|{v.level+1:04d}", - net_addr, - net_pfx, - route.policy, - route.status, - route.object_type, - route.object_name, - )]) + "\n") - - @classmethod - def read_csv(cls, fn) -> NT: - "read tree from csv" - inttree = {} # type: Dict[int, NetRecord] - with open(fn) as fd: - for line in fd.readlines(): - sp = line.split(sep="|") - if len(sp) != 9: - continue - net = ip_network(f"{sp[3]}/{sp[4]}") - is_leaf = sp[7] in ("route", "route6") - rec = NetRecord(net, sp[5], sp[6], is_leaf) - if is_leaf: - inttree[sp[1]].routes.append(rec) - else: - lis = NetList(sp[0], sp[1], sp[2], rec, []) - inttree[sp[0]] = lis - - if sp[0] != sp[1]: - inttree[sp[1]].nets.append(net) - nettree = {} - for v in inttree.values(): - nettree[v.net.network] = v - - c = cls() - c.tree = nettree - return c diff --git a/utils/registry/dn42/rpsl/rspl.py b/utils/registry/dn42/rpsl/rspl.py deleted file mode 100644 index 98ea4ef52..000000000 --- a/utils/registry/dn42/rpsl/rspl.py +++ /dev/null @@ -1,119 +0,0 @@ -"RPSL" - -import os.path -from typing import Dict, List, Tuple, TypeVar, Optional, Sequence - -from .file import FileDOM -from .nettree import NetTree, NetList -from .schema import SchemaDOM, State -from .transact import TransactDOM -from .config import Config - -R = TypeVar('R', bound="RPSL") - - -class RPSL: - "RSPL" - - def __init__(self, config: Config): - self._config = config - self._files = {} # type: Dict[Tuple[str, str], str] - self._lookup = {} # type: Dict[str, List[Tuple[str, str]]] - self._links = {} \ - # type: Dict[Tuple[str, str], List[Tuple[str, str, str]]] - self._nettree = None # type: NetTree - self._schema = {} # type: Dict[str, SchemaDOM] - self._load_index() - - def _load_index(self): - with open(self._config.index_file) as fd: - for line in fd.readlines(): - sp = line.strip().split(sep="|") - self._files[(sp[0], sp[1])] = sp[2] - self._lookup[sp[1]] = self._lookup.get(sp[1], []) - self._lookup[sp[1]].append((sp[0], sp[1])) - - with open(self._config.links_file) as fd: - for line in fd.readlines(): - sp = line.strip().split(sep="|") - key = (sp[0], sp[1]) - arr = self._links.get(key, []) - arr.append((sp[2], sp[3], sp[4])) - self._links[key] = arr - - self._nettree = NetTree.read_csv(self._config.nettree_file) - - files = TransactDOM.from_file(self._config.schema_file) - for schema in files.schemas: - self._schema[schema.ref] = schema - - def append_index(self, dom: FileDOM): - "append files to index" - key, value = dom.index, (dom.src, ",".join(dom.mntner)) - self._lookup[key] = value - - def scan_files(self, files: List[FileDOM]) -> State: - "scan files for schema errors" - state = State() - for dom in files: - s = self._schema.get(dom.rel) - if s is None: - state.warning(dom.dom[0], - f"{dom.src} schema not found for {dom.rel}") - continue - - state = s.check_file(dom, lookups=self._files, state=state) - return state - - def find(self, - text: str, - schema: Optional[str] = None) -> Sequence[str]: - "Find files that match text and schema" - keys = [(schema, text)] - if schema is None: - keys = self._lookup.get(text, []) - - return [self._files[i] for i in keys] - - def related( - self, - key: Tuple[str, str]) -> Sequence[str]: - "Get files related to file" - related = set() - for link in self.links(key): - key = (link[1], link[2]) - related.add(key) - - return [self._files[i] for i in related] - - def find_network(self, ip: str) -> Sequence[NetList]: - """Find Network in index - - Args: - ip (str): ip address - - Returns: - Generator[NetList, None, None]: generator of netlists - """ - return self._nettree.find_tree(ip) - - def load_file(self, fn: str) -> FileDOM: - "load file" - fn = os.path.join(self._config.path, fn) - fo = FileDOM.from_file(fn) - fo.namespace = self._config.namespace - fo.primary_keys = self._config.primary_keys - - return fo - - def load_files(self, fns: Sequence[str]) -> Sequence[NetList]: - for fn in fns: - yield self.load_file(fn) - - def links(self, key: Tuple[str, str]) -> List[Tuple[str, str]]: - "get links" - return self._links.get(key, []) - - def schema(self, name: str) -> SchemaDOM: - "get schema" - return self._schema.get(name) diff --git a/utils/registry/dn42/rpsl/schema.py b/utils/registry/dn42/rpsl/schema.py deleted file mode 100644 index 5f219b936..000000000 --- a/utils/registry/dn42/rpsl/schema.py +++ /dev/null @@ -1,279 +0,0 @@ -"""Schema DOM""" -import re -from dataclasses import dataclass, field -from enum import Enum, auto -from typing import Optional, List, Tuple, Dict, Set, TypeVar - -import log - -from .file import FileDOM, Row - -DOM = TypeVar("DOM", bound="FileDOM") -STATE = TypeVar("STATE", bound="State") - - -class Level(Enum): - """State error level""" - info = auto() - warning = auto() - error = auto() - - -@dataclass -class State: - """State of schema check - """ - state: bool = True - msgs: List[Tuple[Level, Row, str]] = field(default_factory=list) - - def __eq__(self, other: bool) -> bool: - return self.state == other - - def __bool__(self): - return self.state - - def __str__(self) -> str: - return "PASS" if self.state else "FAIL" - - def extend(self, state: STATE): - "apply state to state" - self.msgs.extend(state.msgs) - self.state = state.state - - def print_msgs(self): - """print out state info""" - for (level, row, msg) in self.msgs: - if level == Level.info: - log.info(f"{row.loc} {msg}") - elif level == Level.warning: - log.warning(f"{row.loc} {msg}") - elif level == Level.error: - log.error(f"{row.loc} {msg}") - - def info(self, r: Row, s: str): - """Add warning""" - self.msgs.append((Level.info, r, s)) - - def warning(self, r: Row, s: str): - """Add warning""" - self.msgs.append((Level.warning, r, s)) - - def error(self, r: Row, s: str): - """Add error""" - self.state = False - self.msgs.append((Level.error, r, s)) - - -class SchemaDOM: - """Schema DOM""" - def __init__(self, - dom: FileDOM, - src: Optional[str] = None): - self.valid = False - self.name = None - self.ref = None - self.primary = None - self.type = None - self.src = src - self._schema = {} # type: Dict[str, Set[str]] - self._spec = {} # type: Dict[str, str] - self._links = {} # type: Dict[str, List[str]] - self.dom = dom - self.parse(dom) - - @property - def links(self) -> Dict[str, List[str]]: - "return schema links" - return self._links - - @property - def namespace(self) -> str: - "get namespace" - ns = "default" - ref = self._dom.get("ref") - if ref is not None: - ns = ref.value.split(".")[0] - return ns - - def parse(self, f: FileDOM): - """Parse a FileDOM into a SchemaDOM""" - self.src = self.src if f.src is None else f.src - self._dom = f - - schema = {} - for row in f.dom: - if row.key == 'ref': - self.ref = str(row.value) - elif row.key == 'schema': - self.name = str(row.value) - - if row.key != 'key': - continue - - lines = row.value.fields - key = lines.pop(0) - - schema[key] = set() - for i in lines: - if i == ">": - break - schema[key].add(i) - if i.startswith("lookup="): - self._links[key] = i.split("=", 2)[1].split(",") - schema = self._process_schema(schema) - - self.valid = True - self._schema = schema - return schema - - def _process_schema(self, schema): - for k, v in schema.items(): - if 'schema' in v: - self.type = k - - if 'primary' in v: - self.primary = k - schema[k].add("oneline") - if "multiline" in v: - schema[k].remove("multiline") - schema[k].add("single") - if "multiple" in v: - schema[k].remove("multiple") - schema[k].add("required") - if "optional" in v: - schema[k].remove("optional") - if "recommend" in v: - schema[k].remove("recommend") - if "deprecate" in v: - schema[k].remove("deprecate") - - if 'oneline' not in v: - schema[k].add("multiline") - if 'single' not in v: - schema[k].add("multiple") - - return schema - - def check_file(self, - f: FileDOM, - lookups=None, - state: Optional[State] = None) -> State: - """Check a FileDOM for correctness(tm)""" - if state is None: - state = State() - - file_state = State() - if not f.valid: - file_state.error(Row("", "", 0, f.src), "file does not parse") - - file_state = self._check_file_structure(file_state, f) - file_state = self._check_file_values(file_state, f, lookups) - file_state = inetnum_check(file_state, f) - - print("CHECK\t%-10s\t%-44s\t%s\tMNTNERS: %s" % - (f.schema, f.src.split("/")[-1], file_state, ','.join(f.mntner))) - - state.extend(file_state) - return state - - def _check_file_structure(self, state: State, f: FileDOM) -> State: - for k, v in self._schema.items(): - row = Row(k, "", 0, f.src) - if 'required' in v and k not in f.keys: - state.error(row, "not found and is required") - elif 'recommend' in v and k not in f.keys: - state.info(row, "not found and is recommended") - - if 'schema' in v and f"{f.namespace}.{f.dom[0].key}" != self.ref: - state.error(row, "not found and is required as the first line") - - if 'single' in v and k in f.keys and len(f.keys[k]) > 1: - state.warning(row, "first defined here and has repeated keys") - for i in f.keys[k][1:]: - state.error(row, f"repeated on {i} can only appear once") - - if 'oneline' in v and k in f.multi: - for i in f.keys[k]: - state.error(row, "can not have multiple lines") - - return state - - def _check_file_values(self, - state: State, - f: FileDOM, - lookups: Optional[List[Tuple[str, str]]] = None - ) -> State: - for row in f.dom: - c = row.value.as_key - - src = "None" if f.src is None else f.src - if row.key == self.primary and not src.endswith(c): - state.error(row, - f"primary [{row.value}]" + - f" does not match filename [{src}].") - - if row.key.startswith("x-"): - state.info(row, "is user defined") - continue - - if row.key not in self._schema: - state.error(row, "not in schema") - continue - - if 'deprecate' in self._schema[row.key]: - state.info(row, "was found and is deprecated") - - if lookups is not None: - state = self._check_file_lookups(state, row, lookups) - - return state - - def _check_file_lookups(self, - state: State, - row: Row, - lookups: List[Tuple[str, str]] = None - ) -> State: - if row.key not in self._links: - return state - - refs = self._links[row.key] - val = row.value.fields[0] - found = False - for ref in refs: - if (ref, val) in lookups: - found = True - if not found: - state.error(row, - f"{row.key} references object {val} " + - f"in {refs} but does not exist.") - return state - - def __str__(self) -> str: - return self._dom.__str__() - - @staticmethod - def from_file(src: str) -> DOM: - """Parses SchemaDOM from file""" - with open(src, mode='r', encoding='utf-8') as f: - dom = FileDOM(src=src, text=f.readlines()) - - return SchemaDOM(dom=dom) - - -def inetnum_check(state: State, dom: FileDOM) -> State: - """Sanity Check for checking the inet[6]num value""" - if dom.schema == "inetnum" or dom.schema == "inet6num": - cidr = dom.get("cidr").as_net - Lnet = cidr.network_address.exploded - Hnet = cidr.broadcast_address.exploded - - cidr_range = f"{Lnet}-{Hnet}" - file_range = dom.get(dom.schema) - file_range = re.sub(r"\s+", "", str(file_range), flags=re.UNICODE) - - if cidr_range != file_range: - state.error(Row("", "", 0, dom.src), - f"inetnum range [{file_range}] " + - f"does not match: [{cidr_range}]") - - return state diff --git a/utils/registry/dn42/rpsl/spec.py b/utils/registry/dn42/rpsl/spec.py deleted file mode 100644 index 1e3858dab..000000000 --- a/utils/registry/dn42/rpsl/spec.py +++ /dev/null @@ -1,21 +0,0 @@ -"spec" -from dataclasses import dataclass -from typing import Dict, List, Enum - -class Rule: - pass - - -@dataclass -class LabelRule(Rule): - name: str - - def parse(self, fields: Sequence[str]) -> Optional[Tuple[str, str]]: - -@dataclass -class Spec: - keys: Dict[str, SpecRule] - - @classmethod - def from_dom(cls, dom: file.FileDOM): - for key in diff --git a/utils/registry/dn42/rpsl/test_file.py b/utils/registry/dn42/rpsl/test_file.py deleted file mode 100644 index 4fb888538..000000000 --- a/utils/registry/dn42/rpsl/test_file.py +++ /dev/null @@ -1,134 +0,0 @@ -#!/usr/bin/env python3 -"""Test FileDOM""" -import unittest -import inspect -from pprint import pprint - -from .filedom import FileDOM - - -class TestFileDOM(unittest.TestCase): - """Test FileDOM""" - - def test_parse(self): - """Test Parsing""" - s = """ - person: Xuu - remarks: test - + - Multi-Line - contact: xmpp:xuu@xmpp.dn42 - contact: mail:xuu@dn42.us - pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F - nic-hdl: XUU-DN42 - mnt-by: XUU-MNT - source: DN42 - """ - s = inspect.cleandoc(s)+"\n" - - dom = FileDOM() - dom.parse(s.splitlines()) - - self.assertTrue(dom.valid) - self.assertEqual(dom.schema, "person") - self.assertEqual(dom.get("person"), "Xuu") - self.assertEqual(dom.get("contact"), "xmpp:xuu@xmpp.dn42") - self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us") - self.assertIsNone(dom.get("xxx")) - self.assertEqual(dom.get("xxx", default="default"), "default") - self.assertEqual(str(dom), s) - - def test_put_values(self): - """Test putting values""" - s = """ - person: Xuu - remarks: test - contact: xmpp:xuu@xmpp.dn42 - contact: mail:xuu@dn42.us - pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F - nic-hdl: XUU-DN42 - mnt-by: XUU-MNT - source: DN42 - """ - s = inspect.cleandoc(s)+"\n" - - dom = FileDOM() - dom.parse(s.splitlines()) - - dom.put("source", "SOURIS") - self.assertEqual(dom.get("source"), "SOURIS") - - dom.put("contact", "mail:me@sour.is", append=True) - self.assertEqual(str(dom.get("contact")), "xmpp:xuu@xmpp.dn42") - self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us") - self.assertEqual(dom.get("contact", index=2), "mail:me@sour.is") - - def test_parse_ip6address(self): - """Test network ip address parsing""" - s = """ - inet6num: fd00:0000:0000:0000:0000:0000:0000:0000 - fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff - cidr: fd00::/8 - netname: ROOT-DN42-ULA - descr: DN42 ULA Address Space - status: ALLOCATED - policy: open - org: ORG-DN42 - mnt-by: DN42-MNT - source: DN42 - """ # noqa: E501 - - s = inspect.cleandoc(s)+"\n" - - dom = FileDOM(text=s.splitlines()) - - cidr = dom.get("cidr").as_net - self.assertEqual(cidr.compressed, "fd00::/8") - self.assertEqual( - cidr.exploded, "fd00:0000:0000:0000:0000:0000:0000:0000/8") - - end = cidr.broadcast_address.exploded - start = cidr.network_address.exploded - - self.assertEqual(dom.get("inet6num"), f"{start} - {end}") - - def test_parse_ip4address(self): - """Test network ip address parsing""" - s = """ - inetnum: 172.20.0.0 - 172.23.255.255 - cidr: 172.20.0.0/14 - netname: ROOT-DN42 - """ - - s = inspect.cleandoc(s)+"\n" - - dom = FileDOM(text=s.splitlines()) - - cidr = dom.get("cidr").as_net - self.assertEqual(cidr.compressed, "172.20.0.0/14") - self.assertEqual( - cidr.exploded, "172.20.0.0/14") - - end = cidr.broadcast_address.exploded - start = cidr.network_address.exploded - - self.assertEqual(dom.get("inetnum"), f"{start} - {end}") - - @unittest.skip - def test_bad_parse(self): - """bad parse stuff""" - s = """ - person: Xuu - EXTRA - : - source: DN42 - """ - s = inspect.cleandoc(s)+"\n" - - dom = FileDOM() - dom.parse(s.splitlines()) - pprint(dom.dom) - self.assertEqual(str(dom), s) - - -if __name__ == '__main__': - unittest.main() diff --git a/utils/registry/dn42/rpsl/test_filedom.py b/utils/registry/dn42/rpsl/test_filedom.py deleted file mode 100644 index 4fb888538..000000000 --- a/utils/registry/dn42/rpsl/test_filedom.py +++ /dev/null @@ -1,134 +0,0 @@ -#!/usr/bin/env python3 -"""Test FileDOM""" -import unittest -import inspect -from pprint import pprint - -from .filedom import FileDOM - - -class TestFileDOM(unittest.TestCase): - """Test FileDOM""" - - def test_parse(self): - """Test Parsing""" - s = """ - person: Xuu - remarks: test - + - Multi-Line - contact: xmpp:xuu@xmpp.dn42 - contact: mail:xuu@dn42.us - pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F - nic-hdl: XUU-DN42 - mnt-by: XUU-MNT - source: DN42 - """ - s = inspect.cleandoc(s)+"\n" - - dom = FileDOM() - dom.parse(s.splitlines()) - - self.assertTrue(dom.valid) - self.assertEqual(dom.schema, "person") - self.assertEqual(dom.get("person"), "Xuu") - self.assertEqual(dom.get("contact"), "xmpp:xuu@xmpp.dn42") - self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us") - self.assertIsNone(dom.get("xxx")) - self.assertEqual(dom.get("xxx", default="default"), "default") - self.assertEqual(str(dom), s) - - def test_put_values(self): - """Test putting values""" - s = """ - person: Xuu - remarks: test - contact: xmpp:xuu@xmpp.dn42 - contact: mail:xuu@dn42.us - pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F - nic-hdl: XUU-DN42 - mnt-by: XUU-MNT - source: DN42 - """ - s = inspect.cleandoc(s)+"\n" - - dom = FileDOM() - dom.parse(s.splitlines()) - - dom.put("source", "SOURIS") - self.assertEqual(dom.get("source"), "SOURIS") - - dom.put("contact", "mail:me@sour.is", append=True) - self.assertEqual(str(dom.get("contact")), "xmpp:xuu@xmpp.dn42") - self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us") - self.assertEqual(dom.get("contact", index=2), "mail:me@sour.is") - - def test_parse_ip6address(self): - """Test network ip address parsing""" - s = """ - inet6num: fd00:0000:0000:0000:0000:0000:0000:0000 - fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff - cidr: fd00::/8 - netname: ROOT-DN42-ULA - descr: DN42 ULA Address Space - status: ALLOCATED - policy: open - org: ORG-DN42 - mnt-by: DN42-MNT - source: DN42 - """ # noqa: E501 - - s = inspect.cleandoc(s)+"\n" - - dom = FileDOM(text=s.splitlines()) - - cidr = dom.get("cidr").as_net - self.assertEqual(cidr.compressed, "fd00::/8") - self.assertEqual( - cidr.exploded, "fd00:0000:0000:0000:0000:0000:0000:0000/8") - - end = cidr.broadcast_address.exploded - start = cidr.network_address.exploded - - self.assertEqual(dom.get("inet6num"), f"{start} - {end}") - - def test_parse_ip4address(self): - """Test network ip address parsing""" - s = """ - inetnum: 172.20.0.0 - 172.23.255.255 - cidr: 172.20.0.0/14 - netname: ROOT-DN42 - """ - - s = inspect.cleandoc(s)+"\n" - - dom = FileDOM(text=s.splitlines()) - - cidr = dom.get("cidr").as_net - self.assertEqual(cidr.compressed, "172.20.0.0/14") - self.assertEqual( - cidr.exploded, "172.20.0.0/14") - - end = cidr.broadcast_address.exploded - start = cidr.network_address.exploded - - self.assertEqual(dom.get("inetnum"), f"{start} - {end}") - - @unittest.skip - def test_bad_parse(self): - """bad parse stuff""" - s = """ - person: Xuu - EXTRA - : - source: DN42 - """ - s = inspect.cleandoc(s)+"\n" - - dom = FileDOM() - dom.parse(s.splitlines()) - pprint(dom.dom) - self.assertEqual(str(dom), s) - - -if __name__ == '__main__': - unittest.main() diff --git a/utils/registry/dn42/rpsl/test_nettree.py b/utils/registry/dn42/rpsl/test_nettree.py deleted file mode 100644 index 39d95257e..000000000 --- a/utils/registry/dn42/rpsl/test_nettree.py +++ /dev/null @@ -1,62 +0,0 @@ -"Testing NetTree" -import unittest -from ipaddress import ip_network - -from .nettree import NetTree, NetRecord - -records = [ - NetRecord( - ip_network("::/0"), - ["DN42-MNT"], - "closed", - "ALLOCATED"), - NetRecord( - ip_network("::ffff:0.0.0.0/96"), - ["DN42-MNT"], - "closed", - "ALLOCATED"), - NetRecord( - ip_network("::ffff:172.21.64.0/125"), - ["XUU-MNT"], - "closed", - "ALLOCATED"), - NetRecord( - ip_network("fdea:a15a:77b9::/48"), - ["XUU-MNT"], - "closed", - "ALLOCATED"), -] - -text = [ - "0|0|0|0000:0000:0000:0000:0000:0000:0000:0000|0|inet6num|::_0|closed|ALLOCATED|DN42-MNT", # noqa: E501 - "1|0|1|fdea:a15a:77b9:0000:0000:0000:0000:0000|48|inet6num|fdea:a15a:77b9::_48|closed|ALLOCATED|XUU-MNT", # noqa: E501 - "2|0|1|0000:0000:0000:0000:0000:ffff:0000:0000|96|inetnum|0.0.0.0_0|closed|ALLOCATED|DN42-MNT", # noqa: E501 - "3|2|2|0000:0000:0000:0000:0000:ffff:ac15:4000|125|inetnum|172.21.64.0_29|closed|ALLOCATED|XUU-MNT" # noqa: E501 -] - - -class TestNetTree(unittest.TestCase): - "testing NetTree" - def test_nettree(self): - "test NetTree" - tree = NetTree(records) - for (left, right) in zip(str(tree).splitlines(), text): - self.assertEqual(left, right) - - def test_find(self): - "test NetTree" - tree = NetTree(records) - tt = [ - ("fdea:a15a:77b9:ffff::/64", (True, 2)), - ("fdea:a15a:77ba:ffff::/64", (True, 1)), - ("::ffff:172.21.64.0/126", (True, 3)), - ("::ffff:172.21.64.4/126", (True, 3)), - ("::ffff:172.21.64.8/126", (True, 2)), - - ] - - for (net, expect) in tt: - self.assertEqual( - tree.find_tree(ip_network(net)), - expect, - msg="network "+net) diff --git a/utils/registry/dn42/rpsl/test_schema.py b/utils/registry/dn42/rpsl/test_schema.py deleted file mode 100644 index 013797602..000000000 --- a/utils/registry/dn42/rpsl/test_schema.py +++ /dev/null @@ -1,285 +0,0 @@ -"""Test SchemaDOM""" - -import inspect -import unittest - -from .schema import SchemaDOM -from .filedom import FileDOM - - -def clean(s: str) -> str: - "Clean input for use" - return inspect.cleandoc(s) + "\n" - - -test_files = [ - ("SCHEMA-SCHEMA", clean( - r""" - schema: SCHEMA-SCHEMA - ref: dn42.schema - key: schema required single primary schema > [name] - key: ref required single > [schema] - key: key required multiple > [key-name] - {required|optional|recommend|deprecate} - {single|multiple} {primary|} {schema|} - lookup=str '>' [spec]... - key: mnt-by required multiple lookup=dn42.mntner > [mntner] - key: remarks optional multiple > [text]... - key: source required single lookup=dn42.registry - mnt-by: DN42-MNT - source: DN42 - remarks: # option descriptions - Attribute names must match /[a-zA-Z]([a-zA-Z0-9_\-]*[a-zA-Z0-9])?/. - + - required - : object required to have at least one - optional - : object not required to have at least one - + - single - : only one of this type allowed - multiple - : more than one of this type allowed - + - primary - : use field as lookup key for lookup - * only one allowed per schema - * does not allow newlines - + - schema - : use field name as the name of the schema - * only one allowed per schema - * does not allow newlines - + - lookup - : schema match to use for related record - """ # noqa: E501 - )), - ("INETNUM-SCHEMA", clean( - r""" - schema: INETNUM-SCHEMA - ref: dn42.inetnum - key: inetnum required single schema - key: cidr required single primary - key: netname required single - key: nserver optional multiple > [domain-name] - key: country optional multiple - key: descr optional single - key: status optional single > {ALLOCATED|ASSIGNED} {PI|PA|} - key: policy optional single > {open|closed|ask|reserved} - key: admin-c optional multiple lookup=dn42.person,dn42.role - key: tech-c optional multiple lookup=dn42.person,dn42.role - key: zone-c optional multiple lookup=dn42.person,dn42.role - key: mnt-by optional multiple lookup=dn42.mntner - key: remarks optional multiple - key: source required single lookup=dn42.registry - mnt-by: DN42-MNT - source: DN42 - """ # noqa: E501 - )), - ("ROLE-SCHEMA", clean( - r""" - schema: ROLE-SCHEMA - ref: dn42.role - key: role required single schema - key: nic-hdl required single primary - key: mnt-by required multiple lookup=dn42.mntner - key: org optional multiple lookup=dn42.organisation - key: admin-c optional multiple lookup=dn42.person - key: tech-c optional multiple lookup=dn42.person - key: abuse-c optional multiple lookup=dn42.person - key: abuse-mailbox optional multiple - key: descr optional single - key: remarks optional multiple - key: source required single lookup=dn42.registry - mnt-by: DN42-MNT - source: DN42 - """ # noqa: E501 - )), - ("PERSON-SCHEMA", clean( - r""" - schema: PERSON-SCHEMA - ref: dn42.person - key: person required single schema - key: nic-hdl required single primary - key: mnt-by required multiple lookup=dn42.mntner - key: org optional multiple lookup=dn42.organisation - key: nick optional multiple - key: pgp-fingerprint optional multiple - key: www optional multiple - key: e-mail optional multiple - key: contact optional multiple - key: abuse-mailbox optional multiple - key: phone optional multiple - key: fax-no optional multiple - key: address optional multiple - key: remarks optional multiple - key: source required single lookup=dn42.registry - mnt-by: DN42-MNT - source: DN42 - """ # noqa: E501 - )), - ("MNTNER-SCHEMA", clean( - r""" - schema: MNTNER-SCHEMA - ref: dn42.mntner - key: mntner required single primary schema - key: descr optional single - key: mnt-by required multiple lookup=dn42.mntner - key: admin-c optional multiple lookup=dn42.person,dn42.role - key: tech-c optional multiple lookup=dn42.person,dn42.role - key: auth optional multiple > [method] [value]... - key: org optional multiple lookup=dn42.organisation - key: abuse-mailbox optional single - key: remarks optional multiple - key: source required single lookup=dn42.registry - mnt-by: DN42-MNT - source: DN42 - """ # noqa: E501 - )), - ("REGISTRY-SCHEMA", clean( - r""" - schema: REGISTRY-SCHEMA - ref: dn42.registry - key: registry required single primary schema - key: url required multiple - key: descr optional multiple - key: mnt-by required multiple lookup=dn42.mntner - key: admin-c optional multiple lookup=dn42.person,dn42.role - key: tech-c optional multiple lookup=dn42.person,dn42.role - key: source required single lookup=dn42.registry - mnt-by: DN42-MNT - source: DN42 - """ # noqa: E501 - )), - ("172.21.64.0_29", clean( - r""" - inetnum: 172.21.64.0 - 172.21.64.7 - cidr: 172.21.64.0/29 - netname: XUU-TEST-NET - descr: Xuu TestNet - country: US - admin-c: SOURIS-DN42 - tech-c: SOURIS-DN42 - mnt-by: XUU-MNT - nserver: lavana.sjc.xuu.dn42 - nserver: kapha.mtr.xuu.dn42 - nserver: rishi.bre.xuu.dn42 - status: ALLOCATED - remarks: This is a transfernet. - source: DN42 - """ - )), - ("SOURIS-DN42", clean( - r""" - role: Souris Organization Role - abuse-mailbox: abuse@sour.is - admin-c: XUU-DN42 - tech-c: XUU-DN42 - nic-hdl: SOURIS-DN42 - mnt-by: XUU-MNT - source: DN42 - """ - )), - ("XUU-DN42", clean( - r""" - person: Xuu - remarks: test - contact: xmpp:xuu@xmpp.dn42 - contact: mail:xuu@dn42.us - pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F - nic-hdl: XUU-DN42 - mnt-by: XUU-MNT - source: DN42 - """ - )), - ("XUU-MNT", clean( - r""" - mntner: XUU-MNT - descr: Xuu Maintenance Object - admin-c: SOURIS-DN42 - tech-c: SOURIS-DN42 - mnt-by: XUU-MNT - source: DN42 - """ - )), - ("DN42-MNT", clean( - r""" - mntner: DN42-MNT - descr: mntner for owning objects in the name of whole dn42. - mnt-by: DN42-MNT - source: DN42 - """ # noqa: E501 - )), - ("DN42", clean( - r""" - registry: DN42 - url: https://git.dn42.us/dn42/registry - mnt-by: DN42-MNT - source: DN42 - """ - )), - ] - - -class TestSchema(unittest.TestCase): - """Test SchemaDOM - """ - - def test_schema_parse(self): - """Test schema parsing - """ - d = FileDOM(src="schema/SCHEMA-SCHEMA") - d.parse(test_files[0][1].splitlines()) - - self.assertEqual(str(d), test_files[0][1]) - self.assertTrue(d.valid) - - s = SchemaDOM() - s.parse(d) - - self.assertTrue(s.valid) - - state = s.check_file(d) - - self.assertTrue(state) - state.print() - - def test_schema_all(self): - """Test schema failures - """ - - schemas = {} - for (fname, text) in { - row for row in test_files if row[0].endswith("-SCHEMA")}: - dom = FileDOM(src=fname) - dom.parse(text.splitlines()) - - schema = SchemaDOM() - schema.parse(dom) - - self.assertTrue(schema.valid) - - schemas[schema.ref] = schema - - files = [] - idx = {} - - for (fname, text) in test_files: - dom = FileDOM(src=fname) - dom.parse(text.splitlines()) - - self.assertTrue(dom.valid) - self.assertEqual(str(dom), text) - - files.append(dom) - - key, value = dom.index - idx[key] = value - - for dom in files: - s = schemas[f"{dom.ns}.{dom.schema}"] - state = s.check_file(dom, idx) - - self.assertTrue(state) - state.print() diff --git a/utils/registry/dn42/rpsl/test_transact.py b/utils/registry/dn42/rpsl/test_transact.py deleted file mode 100644 index 710f41a9b..000000000 --- a/utils/registry/dn42/rpsl/test_transact.py +++ /dev/null @@ -1,193 +0,0 @@ -"Test TransactDOM" - -import unittest -import inspect - -from .transact import TransactDOM - - -def clean(s: str) -> str: - "Clean input for use" - return inspect.cleandoc(s) + "\n" - - -dn42_mnt_file = clean(""" - .BEGIN DN42-MNT - schema: SCHEMA-SCHEMA - ref: dn42.schema - key: schema required single primary schema > [name] - key: ref required single > [schema] - key: key required multiple > [key-name] - {required|optional|recommend|deprecate} - {single|multiple} {primary|} {schema|} - lookup=str '>' [spec]... - key: mnt-by required multiple lookup=dn42.mntner > [mntner] - key: remarks optional multiple > [text]... - key: source required single lookup=dn42.registry - mnt-by: DN42-MNT - source: DN42 - ... - schema: INETNUM-SCHEMA - ref: dn42.inetnum - key: inetnum required single schema - key: cidr required single primary - key: netname required single - key: nserver optional multiple > [domain-name] - key: country optional multiple - key: descr optional single - key: status optional single > {ALLOCATED|ASSIGNED} {PI|PA|} - key: policy optional single > {open|closed|ask|reserved} - key: admin-c optional multiple lookup=dn42.person,dn42.role - key: tech-c optional multiple lookup=dn42.person,dn42.role - key: zone-c optional multiple lookup=dn42.person,dn42.role - key: mnt-by optional multiple lookup=dn42.mntner - key: remarks optional multiple - key: source required single lookup=dn42.registry - mnt-by: DN42-MNT - source: DN42 - ... - schema: ROLE-SCHEMA - ref: dn42.role - key: role required single schema - key: nic-hdl required single primary - key: mnt-by required multiple lookup=dn42.mntner - key: org optional multiple lookup=dn42.organisation - key: admin-c optional multiple lookup=dn42.person - key: tech-c optional multiple lookup=dn42.person - key: abuse-c optional multiple lookup=dn42.person - key: abuse-mailbox optional multiple - key: descr optional single - key: remarks optional multiple - key: source required single lookup=dn42.registry - mnt-by: DN42-MNT - source: DN42 - ... - schema: PERSON-SCHEMA - ref: dn42.person - key: person required single schema - key: nic-hdl required single primary - key: mnt-by required multiple lookup=dn42.mntner - key: org optional multiple lookup=dn42.organisation - key: nick optional multiple - key: pgp-fingerprint optional multiple - key: www optional multiple - key: e-mail optional multiple - key: contact optional multiple - key: abuse-mailbox optional multiple - key: phone optional multiple - key: fax-no optional multiple - key: address optional multiple - key: remarks optional multiple - key: source required single lookup=dn42.registry - mnt-by: DN42-MNT - source: DN42 - ... - schema: MNTNER-SCHEMA - ref: dn42.mntner - key: mntner required single primary schema - key: descr optional single - key: mnt-by required multiple lookup=dn42.mntner - key: admin-c optional multiple lookup=dn42.person,dn42.role - key: tech-c optional multiple lookup=dn42.person,dn42.role - key: auth optional multiple > [method] [value]... - key: org optional multiple lookup=dn42.organisation - key: abuse-mailbox optional single - key: remarks optional multiple - key: source required single lookup=dn42.registry - mnt-by: DN42-MNT - source: DN42 - ... - schema: REGISTRY-SCHEMA - ref: dn42.registry - key: registry required single primary schema - key: url required multiple - key: descr optional multiple - key: mnt-by required multiple lookup=dn42.mntner - key: admin-c optional multiple lookup=dn42.person,dn42.role - key: tech-c optional multiple lookup=dn42.person,dn42.role - key: source required single lookup=dn42.registry - mnt-by: DN42-MNT - source: DN42 - ... - mntner: DN42-MNT - descr: mntner for owning objects in the name of whole dn42. - mnt-by: DN42-MNT - source: DN42 - ... - inetnum: 0.0.0.0 - 255.255.255.255 - cidr: 0.0.0.0/0 - netname: NET-BLK0-DN42 - policy: open - descr: * The entire IPv4 address space - mnt-by: DN42-MNT - status: ALLOCATED - source: DN42 - ... - registry: DN42 - url: https://git.dn42.us/dn42/registry - mnt-by: DN42-MNT - source: DN42 - .END - """) # noqa E501 - -xuu_mnt_file = clean(""" - .BEGIN XUU-MNT - .DELETE person XUU-DN42 - inetnum: 172.21.64.0 - 172.21.64.7 - cidr: 172.21.64.0/29 - netname: XUU-TEST-NET - descr: Xuu TestNet - country: US - admin-c: SOURIS-DN42 - tech-c: SOURIS-DN42 - mnt-by: XUU-MNT - nserver: lavana.sjc.xuu.dn42 - nserver: kapha.mtr.xuu.dn42 - nserver: rishi.bre.xuu.dn42 - status: ALLOCATED - remarks: This is a transfernet. - source: DN42 - ... - role: Souris Organization Role - abuse-mailbox: abuse@sour.is - admin-c: XUU-DN42 - tech-c: XUU-DN42 - nic-hdl: SOURIS-DN42 - mnt-by: XUU-MNT - source: DN42 - ... - person: Xuu - remarks: test - contact: xmpp:xuu@xmpp.dn42 - contact: mail:xuu@dn42.us - pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F - nic-hdl: XUU-DN42 - mnt-by: XUU-MNT - source: DN42 - . - mntner: XUU-MNT - descr: Xuu Maintenance Object - admin-c: SOURIS-DN42 - tech-c: SOURIS-DN42 - mnt-by: XUU-MNT - source: DN42 - .END - """) # noqa E501 - - -class TestTransactDOM(unittest.TestCase): - "test TransactDOM" - - def test_transact_parse(self): - "test tranact parsing" - ts = TransactDOM(text=dn42_mnt_file.splitlines()) - - assert ts.mntner == "DN42-MNT" - assert len(ts.files) == 9 - - ts = TransactDOM(text=xuu_mnt_file.splitlines()) - - assert ts.mntner == "XUU-MNT" - assert len(ts.files) == 4 - assert len(ts.delete) == 1 - assert ts.delete[0] == ("person", "XUU-DN42") diff --git a/utils/registry/dn42/rpsl/transact.py b/utils/registry/dn42/rpsl/transact.py deleted file mode 100644 index 66fff27ac..000000000 --- a/utils/registry/dn42/rpsl/transact.py +++ /dev/null @@ -1,74 +0,0 @@ -"TransactDOM" - -from typing import Sequence, List, Optional, Tuple, TypeVar - -from .file import FileDOM -from .schema import SchemaDOM - -DOM = TypeVar("DOM", bound="TransactDOM") - - -class TransactDOM(): - """Parses a transaction file""" - - def __init__(self, - text: Optional[Sequence[str]] = None): - self.valid = False - self.files = [] # type: List[FileDOM] - self.schemas = [] - self.delete = [] # type: List[Tuple[str, str]] - self.mntner = None # type: Optional[str] - - if text is not None: - self.parse(text) - - def parse(self, text: Sequence[str]): - "parse text" - - buffer = [] # type: List[str] - for (i, line) in enumerate(text, 1): - _ = i - - if self.mntner is None: - if not line.startswith(".BEGIN"): - continue - - fields = line.split() - - if len(fields) < 2: - continue - - self.mntner = fields[1] - continue - - if line.startswith("."): - if len(buffer) > 0: - dom = FileDOM(text=buffer) - buffer = [] - if dom.valid: - self.files.append(dom) - - if dom.schema == 'schema': - self.schemas.append(SchemaDOM(dom)) - - if line.startswith(".DELETE"): - sp = line.split() - if len(sp) > 2: - self.delete.append((sp[1], sp[2])) - - continue - - buffer.append(line) - - def __str__(self) -> str: - s = f".BEGIN {self.mntner}\n" - s += "\n".join({f"DELETE {i}" for i in self.delete}) - s += "...\n".join({str(record) for record in self.files}) - s += ".END" - return s - - @staticmethod - def from_file(src: str) -> DOM: - "Read transact from files" - with open(src) as f: - return TransactDOM(f.readlines()) diff --git a/utils/registry/dn42/utils.py b/utils/registry/dn42/utils.py deleted file mode 100644 index a0de4c537..000000000 --- a/utils/registry/dn42/utils.py +++ /dev/null @@ -1,45 +0,0 @@ -"DN42 Utils" -import os.path -from typing import List, Tuple - - -def remove_prefix(text, prefix): - "remove the prefix" - if text.startswith(prefix): - return text[len(prefix):] - return text - - -def shift(args: List[str]) -> Tuple[str, List[str]]: - "shift off first arg + rest" - if len(args) == 0: - return None, [] - - if len(args) == 1: - return args[0], [] - - return args[0], args[1:] - - -def find_rpsl(path: str) -> str: - "Find the root directory for RPSL" - path = os.path.abspath(path) - rpsl = os.path.join(path, ".rpsl") - while not os.path.exists(rpsl): - if path == "/": - break - path = os.path.dirname(path) - rpsl = os.path.join(path, ".rpsl") - - if not os.path.exists(rpsl): - return None - - return path - - -def exists(*args: str) -> bool: - "check if files exist" - for i in args: - if not os.path.exists(i): - return False - return True diff --git a/utils/registry/log.py b/utils/registry/log.py deleted file mode 100644 index 52dab50aa..000000000 --- a/utils/registry/log.py +++ /dev/null @@ -1,206 +0,0 @@ -"""Simple Logger""" - -from __future__ import print_function - -import os -import sys -import inspect -import datetime -import traceback -from enum import IntEnum - -OUTPUT = sys.stderr - -LEVEL = ["CRIT", "ERR ", "WARN", "NOTE", "INFO", "DBUG", "...."] -CLEVEL = ["\x1B[41mCRIT\x1B[0m", - "\x1B[31mERR \x1B[0m", - "\x1B[33mWARN\x1B[0m", - "\x1B[32mNOTE\x1B[0m", - "\x1B[34mINFO\x1B[0m", - "\x1B[90mDBUG\x1B[0m", - "\x1B[90m....\x1B[0m"] - -MSG = "{0} {1} {2} {3} {4} {5} :: {6}" -CMSG = "[{1}]\x1B[90m {2} {3}:{5} [{4}]\x1B[0m {6}\x1B[0m" -CMULTI = "[{1}]\x1B[90m {2}\x1B[0m" - - -class Level(IntEnum): - """Log Level enumeration""" - VERB_CRITICAL = 0 - VERB_ERROR = 1 - VERB_WARN = 2 - VERB_NOTICE = 3 - VERB_INFO = 4 - VERB_DEBUG = 5 - VERB_NONE = -1 - - -class Log: - """Logger""" - log_dir = "" - log_pfx = "main" - - level_console = Level.VERB_ERROR - level_file = Level.VERB_NONE - level_full = False - - count = [0, 0, 0, 0, 0, 0] - - def __init__(self): - self.prog_name = sys.argv[0].rsplit("/", 1)[-1] - self.prog_name = self.prog_name.split(".", 1)[0] - self.log_pfx = self.prog_name - - def __del__(self): - if self.level_console >= 5: - crit, err, warn, note, inf, dbug = tuple(self.count) - os.write(1, "[\x1B[90m\x1B[90mDBUG\x1B[90m] Log Counters" + - f" crit:{crit}" + - f" err:{err}" + - f" warn: {warn}" + - f" note: {note}" + - f" info: {inf}" + - f" dbug: {dbug}\x1B[0m\n") - - def set_dir(self, name: str): - """Set output directory""" - if not os.path.isdir(name): - os.makedirs(name) - self.log_dir = name - - def output(self, level: Level, message: str, frame=1): - """Write a message to console or log, conditionally.""" - if level < 0 or level > 5: - level = 5 - - self.count[level] += 1 - - # function_name = inspect.stack()[1][3] - cur_date = datetime.datetime.now() - - (frame, file, ln, fn, _, _) = inspect.getouterframes( - inspect.currentframe())[frame] - - message = str(message).split("\n") - cmsg = CMSG if self.level_full else CMULTI - - if self.level_console >= level: - - if len(message) == 1: - if self.level_full: - arg = (str(cur_date), - CLEVEL[level], - self.prog_name, - file, fn, ln, message[0]) - else: - arg = str(cur_date), CLEVEL[level], message[0] - - print(cmsg.format(*arg), file=OUTPUT) - else: - if self.level_full: - arg = str(cur_date), CLEVEL[ - level], self.prog_name, file, fn, ln, "" - print(cmsg.format(*arg), file=OUTPUT) - - for line in message: - print(CMULTI.format(str(cur_date), - CLEVEL[Level.VERB_NONE], line), - file=OUTPUT) - - if self.level_file >= level: - self.set_dir("./logs") - log_file_name = os.path.join( - self.log_dir, - self.log_pfx + str(cur_date.strftime('%Y-%m-%d')) + ".txt") - - with open(log_file_name, "a") as logger: - logger.write(MSG.format(str(cur_date), - LEVEL[level], - self.prog_name, - file, fn, ln, message[0]) + "\n") - for line in message[1:]: - logger.write(MSG.format(str(cur_date), - LEVEL[Level.VERB_NONE], - self.prog_name, - file, fn, ln, line) + "\n") - - def fatal(self, message: str): - """Log a fatal error""" - self.output(Level.VERB_CRITICAL, message, 2) - sys.exit(1) - - def critical(self, message: str): - """Log a critical error""" - self.output(Level.VERB_CRITICAL, message, 2) - - def error(self, message: str): - """Log a normal error""" - self.output(Level.VERB_ERROR, message, 2) - - def warning(self, message: str): - """Log a warning""" - self.output(Level.VERB_WARN, message, 2) - - def notice(self, message: str): - """Log a notice""" - self.output(Level.VERB_NOTICE, message, 2) - - def info(self, message: str): - """Log an informational""" - self.output(Level.VERB_INFO, message, 2) - - def debug(self, message: str): - """Log a debug""" - self.output(Level.VERB_DEBUG, message, 2) - - -default = Log() - -fatal = default.fatal -critical = default.critical -error = default.error -warning = default.warning -notice = default.notice -info = default.info -debug = default.debug - - -class LogException: - """Catches an exception to log it""" - stop = None - - def __init__(self, stop: bool = True): - self.stop = stop - - def __enter__(self, stop: bool = True): - pass - - def __exit__(self, exc_type, value, trace) -> bool: - - if exc_type is None: - return True - - if exc_type is SystemExit and value.args == (0,): - return True - - log_string, _ = fmt_exception(exc_type, value, trace) - default.output(Level.VERB_CRITICAL, 'Failure\n\n' + log_string, 2) - - if self.stop is False: - return False - - fatal("ABORTING EXECUTION") - return False - - -def fmt_exception(exc_type, exc_value, exc_traceback): - """format exception to string""" - lines = traceback.format_exception(exc_type, exc_value, exc_traceback) - log_string = ''.join(line for line in lines) - email_string = ''.join('
' + line for line in lines) - - return log_string, email_string - - -exception = LogException diff --git a/utils/registry/main.py b/utils/registry/main.py deleted file mode 100644 index 497fd2f14..000000000 --- a/utils/registry/main.py +++ /dev/null @@ -1,82 +0,0 @@ -"""rpsl a tool for managing RPSL databases -========================================== - -Usage: rpsl [command] [options] - rpsl help [command] - -""" - - -import os -import sys -from typing import Optional - -import importlib -import pkgutil - -from dn42.utils import find_rpsl, remove_prefix, shift - -discovered_plugins = { - remove_prefix(name, "rpsl_"): importlib.import_module(name) - for finder, name, ispkg - in pkgutil.iter_modules() - if name.startswith("rpsl_") -} - - -def do_help(cmd: Optional[str] = None): - "Print Help and exit" - - print(__doc__, file=sys.stderr) - - if cmd is None: - print("Available commands:", file=sys.stderr) - for pkg in discovered_plugins.keys(): - print(f" - {pkg}", file=sys.stderr) - return 0 - - if cmd not in discovered_plugins: - print(f"Command not found: {cmd}", file=sys.stderr) - return 1 - - print(discovered_plugins[cmd].__doc__, file=sys.stderr) - return 0 - - -def run() -> int: - "run application command" - _, args = shift(sys.argv) # drop exec name - cmd, args = shift(args) - - working_dir = os.getcwd() - working_dir = os.environ.get("WORKING_DIR", working_dir) - - prog_dir = os.path.dirname(os.path.realpath(__file__)) - - rpsl_dir = os.environ.get("RPSL_DIR", working_dir) - rpsl_dir = find_rpsl(rpsl_dir) - - if cmd is None or cmd == 'help': - cmd, _ = shift(args) - return do_help(cmd) - - if cmd not in discovered_plugins: - print(f"Unsupported Command: {cmd}") - return 1 - - pkg = discovered_plugins[cmd] - - if 'run' not in dir(pkg): - print(f"Command {cmd} is not compatible with rspl.", file=sys.stderr) - return 1 - - return pkg.run(args, { - "WORKING_DIR": working_dir, - "BIN_DIR": prog_dir, - "RPSL_DIR": rpsl_dir, - }) - - -if __name__ == '__main__': - code = run() - sys.exit(code) diff --git a/utils/registry/rpsl b/utils/registry/rpsl deleted file mode 100755 index 393f905f1..000000000 --- a/utils/registry/rpsl +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env python3 -"run main code" - -import sys -from main import run - -if __name__ == '__main__': - code = run() - sys.exit(code) diff --git a/utils/registry/rpsl_index/__init__.py b/utils/registry/rpsl_index/__init__.py deleted file mode 100644 index bafa94b1b..000000000 --- a/utils/registry/rpsl_index/__init__.py +++ /dev/null @@ -1,170 +0,0 @@ -"""RSPL Build Indexes -===================== - -Usage: rspl index - -""" - - -import os -import sys - -from typing import Dict, Generator, List, Set, Tuple, Sequence - -from dn42.rpsl import FileDOM, SchemaDOM, TransactDOM, NetTree, \ - NetRecord, Config, index_files -from dn42.utils import remove_prefix - - -def run(args: List[str], env: Dict[str, str]) -> int: - "rspl index" - _ = args - - path = env.get("RPSL_DIR") - if path is None: - print("RPSL directory not found. do `rpsl init` or set RPSL_DIR", - file=sys.stderr) - return 1 - - config = Config.from_path(path) - if not os.path.exists(config.config_file): - print("RPSL config files not found. do `rpsl init`?", file=sys.stderr) - return 1 - - if not os.path.isdir(config.schema_dir): - print("schema directory not found in path", file=sys.stderr) - sys.exit(1) - - print(r"Reading Files...", end="\r", flush=True, file=sys.stderr) - - idx = index_files(path, - namespace=config.namespace, - primary_keys=config.primary_keys) - lookup, schemas, files, nets, routes = build_index(idx, rspl=config) - - print( - f"Reading Files: done! files: {len(files)}" + - f" schemas: {len(schemas)}" + - f" networks: {len(nets)}", - f" routes: {len(routes)}", - file=sys.stderr) - - print("Writing .rpsl/index", file=sys.stderr) - with open(".rpsl/index", 'w') as out: - print("Writing .rpsl/links", file=sys.stderr) - with open(".rpsl/links", 'w') as link_out: - for dom in files: - s = schemas.get(dom.rel) - if s is None: - print( - f"{dom.src} schema not found for {dom.rel}", - file=sys.stderr) - continue - - primary, mntner = dom.get(s.primary), ",".join(dom.mntner) - _ = mntner - src = remove_prefix(dom.src, path+os.sep) - print(dom.rel, primary, src, # mntner, - sep="|", file=out) - - for (link, rel, d) in generate_links(dom, s.links, lookup): - print(f"{dom.rel}|{dom.name}|{link}|{rel}|{d}", - file=link_out) - - print("Generate .rpsl/nettree", file=sys.stderr) - tree = NetTree(nets, routes) - - print("Writing .rpsl/nettree", file=sys.stderr) - tree.write_csv(".rpsl/nettree") - - print("Writing .rpsl/schema", file=sys.stderr) - s = TransactDOM() - s.mntner = "DN42-MNT" - s.files = schemas.values() - with open(".rpsl/schema", "w") as out: - print(s, file=out) - - print("done.", file=sys.stderr) - - return 0 - - -class NotRPSLPath(Exception): - "error raised if unable to determine RPSL root" - - -def build_index( - idx: Sequence[FileDOM], - rspl: Config, - ) -> Tuple[ - Set[Tuple[str, str]], - Dict[str, SchemaDOM], - List[FileDOM], - List[NetRecord]]: - "build index for files" - lookup = set() # type: Set[Tuple[str, str]] - schemas = {} # type: Dict[str, SchemaDOM] - files = [] # type: List[FileDOM] - nets = [] # type: List[NetRecord] - routes = [] # type: List[NetRecord] - - print(r"Reading Files...", end="\r", flush=True, file=sys.stderr) - - net_types = rspl.network_parents - net_leafs = rspl.network_children - - for (i, dom) in enumerate(idx): - if not dom.valid: - print("E", end="", flush=True) - continue - - key = dom.index - lookup.add(key) - files.append(dom) - - if dom.schema == rspl.schema: - schema = SchemaDOM(dom) - schemas[schema.ref] = schema - - if dom.schema in net_types: - nets.append(NetRecord( - dom.get("cidr").as_net6, - dom.get("policy", default="closed"), - dom.get("status", default="ASSIGNED"), - )) - - if dom.schema in net_leafs: - routes.append(NetRecord( - dom.get(dom.primary_key).as_net6, - dom.get("policy", default="none"), - dom.get("status", default="none"), - True, - )) - - if i % 120 == 0: - print( - f"Reading Files: files: {len(files)}" + - f" schemas: {len(schemas)} " + - f" networks: {len(nets)}", - f" routes: {len(routes)}", - end="\r", flush=True, file=sys.stderr) - - return (lookup, schemas, files, nets, routes) - - -def generate_links( - dom: FileDOM, - links: Dict[str, List[str]], - lookup: Set[Tuple[str, str]] - ) -> Generator[Tuple[str, str, str], None, None]: - "print file links out to file" - for (link, refs) in links.items(): - for d in dom.get_all(link): - found = False - for ref in refs: - if (ref, d.value) in lookup: - found = True - yield (link, ref, d) - - if not found: - print(f"{dom.name} missing link {link} {d.value}") diff --git a/utils/registry/rpsl_init/__init__.py b/utils/registry/rpsl_init/__init__.py deleted file mode 100644 index 587ee917d..000000000 --- a/utils/registry/rpsl_init/__init__.py +++ /dev/null @@ -1,103 +0,0 @@ -"""RSPL Initialize data store -============================= - -Usage: rspl init [options] - -Options: ---namespace= Namespace (default: current working dir name) ---schema= Schema (default: schema) ---owners= Owner (default: mntner) ---default-owner= Default Owner (default: DN42-MNT) ---source= Source (default: DN42) ---force Force creation of config -""" - - -import sys -import os.path -import argparse -from typing import List, Dict, Generator, Tuple, Set, TypeVar - -from dn42.rpsl import Config, FileDOM, SchemaDOM -import rpsl_index - -Group = TypeVar("Group", set, tuple) - - -parser = argparse.ArgumentParser() -parser.add_argument("--namespace", type=str, default=None) -parser.add_argument("--owners", type=str, default="mntner") -parser.add_argument("--schema", type=str, default="schema") -parser.add_argument("--force", action='store_true') - - -def run(args: List[str], env: Dict[str, str]) -> int: - "rspl init" - opts = parser.parse_args(args) - if opts.namespace is None: - opts.namespace = os.path.basename(env.get("WORKING_DIR")) - - rpsl_dir = env.get("RPSL_DIR") - if rpsl_dir is not None and not opts.force: - print(f"RPSL database already initialized! Found in: {rpsl_dir}") - return 1 - - rpsl_dir = env.get("WORKING_DIR") - schema_dir = os.path.join(rpsl_dir, opts.schema) - network_owners, primary_keys, dir_name = {}, {}, {} - - if os.path.exists(schema_dir): - ns, network_owners, primary_keys, dir_name = \ - _parse_schema(schema_dir, opts.namespace) - - rpsl = Config.build(path=rpsl_dir, - namespace=ns, - schema=opts.schema, - owners=opts.owners, - dir_name=dir_name, - network_owners=network_owners, - primary_keys=primary_keys) - - os.makedirs(os.path.dirname(rpsl.config_file), exist_ok=True) - with open(rpsl.config_file, "w") as f: - print(rpsl, file=f) - - print(f"Created: {rpsl.config_file}", file=sys.stderr) - env["RPSL_DIR"] = rpsl_dir - rpsl_index.run(args, env) - - return 0 - - -def _read_schemas(path: str) -> Generator[SchemaDOM, None, None]: - for root, _, files in os.walk(path): - for f in files: - dom = FileDOM.from_file(os.path.join(root, f)) - schema = SchemaDOM(dom) - yield schema - - -def _parse_schema(path: str, ns: str) -> Tuple[str, Group, Group, Group]: - schemas = _read_schemas(path) - - namespace = ns - network_owner = set() # type: Set[str, str] - primary_key = set() # type: Set[str, str] - dir_name = set() # type: Set[str, str] - - for s in schemas: - if s.type == "schema": - if s.namespace != namespace: - namespace = s.namespace - - for i in s.dom.get_all("network-owner"): - network_owner.add((s.type, i.value)) - - d = s.dom.get("dir-name") - if d is not None: - dir_name.add((s.type, d.value)) - - if s.primary != s.type: - primary_key.add((s.type, s.primary)) - - return namespace, network_owner, primary_key, dir_name diff --git a/utils/registry/rpsl_scan/__init__.py b/utils/registry/rpsl_scan/__init__.py deleted file mode 100644 index 30430e72d..000000000 --- a/utils/registry/rpsl_scan/__init__.py +++ /dev/null @@ -1,66 +0,0 @@ -"""RSPL Scan -============ - -Usage: rspl scan [options] - -Options: ---scan-dir= Scan given directory ---scan-file= Scan given file ---add-index Add scanned items to lookup table - -""" - -import os -import sys -import argparse -from typing import List, Dict - -from dn42.rpsl import RPSL, Config, TransactDOM, index_files - -parser = argparse.ArgumentParser() -parser.add_argument("--add-index", action='store_true') -parser.add_argument("--scan-dir", type=str, default=None) -parser.add_argument("--scan-file", type=str, default=None) - - -def run(args: List[str], env: Dict[str, str]) -> int: - """run scan script""" - opts = parser.parse_args(args) - - path = env.get("RPSL_DIR") - if path is None: - print("RPSL directory not found. do `rpsl init` or set RPSL_DIR", - file=sys.stderr) - return 1 - - config = Config.from_path(path) - if not os.path.exists(config.index_file) or \ - not os.path.exists(config.schema_file): - print("RPSL index files not found. do `rpsl index`?", file=sys.stderr) - return 1 - - rpsl = RPSL(config) - files = _file_gen(path, opts, wd=env.get("WORKING_DIR"), config=config) - - if opts.add_index: - files, g = [], files - print("Add scanned items to lookup index...", file=sys.stderr) - for dom in g: - files.append(dom) - rpsl.append_index(dom) - - print("Scanning files...", file=sys.stderr) - status = rpsl.scan_files(files) - status.print_msgs() - print(status) - return 0 if status else 1 - - -def _file_gen(path, opts: argparse.Namespace, wd: str, config: Config): - if opts.scan_dir is not None: - path = os.path.join(wd, opts.scan_dir) - elif opts.scan_file is not None: - path = os.path.join(wd, opts.scan_file) - return TransactDOM.from_file(path).files - - return index_files(path, config.namespace, config.primary_keys) diff --git a/utils/registry/rpsl_status/__init__.py b/utils/registry/rpsl_status/__init__.py deleted file mode 100644 index 8feeade1e..000000000 --- a/utils/registry/rpsl_status/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -"""RSPL Status -============== -""" - - -from typing import List, Dict - - -def run(args: List[str], env: Dict[str, str]) -> int: - "do run" - print("RUN STATUS", args, env) - return 0 diff --git a/utils/registry/rpsl_whois/__init__.py b/utils/registry/rpsl_whois/__init__.py deleted file mode 100644 index b67afcb22..000000000 --- a/utils/registry/rpsl_whois/__init__.py +++ /dev/null @@ -1,86 +0,0 @@ -"""RSPL Whois Search -==================== - -Usage: rpsl whois [text] - -""" - -import sys -from itertools import chain -from typing import List, Dict, Optional, Set, Tuple - -from dn42.rpsl import RPSL, Config, FileDOM, as_net6 -from dn42.utils import shift, exists - - -def run(args: List[str], env: Dict[str, str]) -> int: - "do whois search" - if len(args) == 0: - print("Usage: rpsl whois [text]") - - rpsl_dir = env.get("RPSL_DIR") - if rpsl_dir is None: - print("RPSL index files not found. do `rpsl index`?", file=sys.stderr) - return 1 - - config = Config.from_path(rpsl_dir) - if not exists(config.index_file, - config.schema_file, - config.links_file): - print("RPSL index files not found. do `rpsl index`?", file=sys.stderr) - return 1 - - rpsl = RPSL(config) - - schema = None # type: Optional[str] - text, args = shift(args) - - if len(args) > 0: - schema = text - text, args = shift(args) - - ip = None - try: - ip = as_net6(text) - except ValueError: - pass - - principle = [] # type: List[FileDOM] - related_nets = [] # type: List[FileDOM] - related_idx = set() # type: Set[Tuple[str, str]] - - if ip is not None: - print(f"# Searching network {text}...") - nets = list(rpsl.find_network(text)) - last_net = nets[-1] - dom = rpsl.load_file(str(last_net.net)) - principle.append(dom) - related_idx.add(dom.index) - ok, route = last_net.in_routes(ip) - if ok: - dom = rpsl.load_file(str(route)) - principle.append(dom) - related_idx.add(dom.index) - - for net in nets[:-1]: - dom = rpsl.load_file(str(net.net)) - related_nets.append(dom) - else: - for dom in rpsl.find(text, schema): - principle.append(dom) - related_idx.add(dom.index) - - print("# Found objects") - for dom in principle: - print(dom) - - if len(related_nets) > 0: - print("# Related Networks") - for dom in related_nets: - print(dom) - - print("# Related objects") - lis = set(chain.from_iterable(rpsl.related(i) for i in related_idx)) - for dom in rpsl.load_files(sorted(lis)): - print(dom) - return 0