update rpsl tooling

This commit is contained in:
Jonathan Lundy 2020-06-26 09:07:42 -06:00
parent d160e8bb7c
commit 763de1cc2f
No known key found for this signature in database
GPG key ID: C63E6D61F3035024
20 changed files with 444 additions and 341 deletions

View file

@ -0,0 +1,19 @@
"DN42 RSPL Library"
__version__ = "0.3.0"
from .filedom import FileDOM, Row, Value, index_files
from .schema import SchemaDOM, Level, State
from .transact import TransactDOM
from .config import Config
from .nettree import NetTree, NetRecord, NetList
from .rspldom import RPSL
__all__ = [
"FileDOM", "Row", "Value", "index_files",
"SchemaDOM", "Level", "State",
"TransactDOM",
"Config",
"NetTree", "NetRecord", "NetList",
"RPSL",
]

View file

@ -0,0 +1,145 @@
"RSPL Config"
import os
import os.path
from dataclasses import dataclass
from typing import Dict, Set, Tuple, Optional, TypeVar
from .filedom import FileDOM
C = TypeVar('C', bound='Config')
@dataclass
class Config:
"RSPL Config"
path: str
_dom: FileDOM
@property
def namespace(self) -> str:
"Get namespace"
return self._dom.get("namespace", default="dn42").value
@property
def schema(self) -> str:
"Get schema type name"
return self._dom.get("schema", default="schema").value
@property
def owners(self) -> str:
"Get owner type name"
return self._dom.get("owner", default="mntner").value
@property
def source(self) -> str:
"Get source"
return self._dom.get("source", default="DN42").value
@property
def default_owner(self) -> str:
"Get default onwer"
return self._dom.get("default-owner", default=self._dom.mntner).value
@property
def network_owners(self) -> Dict[str, str]:
"Get network owners"
network_owner = {} # type: Dict[str, str]
for (parent, child) in [
i.fields for i in self._dom.get_all("network-owner")]:
network_owner[child] = parent
return network_owner
@property
def primary_keys(self) -> Dict[str, str]:
"Get primary keys"
primary_keys = {} # type: Dict[str, str]
for (parent, key) in [
i.fields for i in self._dom.get_all("primary-key")]:
primary_keys[parent] = key
return primary_keys
@property
def network_parents(self) -> Set[str]:
"return network parents"
return set(self.network_owners.values())
@property
def schema_dir(self) -> str:
"get schema directory"
return os.path.join(self.path, self.schema)
@property
def owner_dir(self) -> str:
"get owner directory"
return os.path.join(self.path, self.owners)
@property
def config_file(self) -> str:
"get config file"
return os.path.join(self.path, ".rpsl/config")
@property
def index_file(self) -> str:
"get index file"
return os.path.join(self.path, ".rpsl/index")
@property
def links_file(self) -> str:
"get links file"
return os.path.join(self.path, ".rpsl/links")
@property
def schema_file(self) -> str:
"get schema file"
return os.path.join(self.path, ".rpsl/schema")
@property
def nettree_file(self) -> str:
"get nettree file"
return os.path.join(self.path, ".rpsl/nettree")
@classmethod
def from_path(cls, path: str) -> C:
"create from path"
src = os.path.join(path, ".rpsl/config")
return cls(FileDOM.from_file(src))
@classmethod
def build(cls, # pylint: disable=too-many-arguments
path: str,
namespace: str = "dn42",
schema: str = "schema",
owners: str = "mntner",
default_owner: str = "DN42-MNT",
primary_keys: Optional[Set[Tuple[str, str]]] = None,
network_owners: Optional[Set[Tuple[str, str]]] = None,
source: str = "DN42") -> FileDOM:
"Build config from parameters"
FileDOM.namespace = namespace
dom = FileDOM()
dom.src = os.path.join(path, "config")
dom.put("namespace", namespace)
dom.put("schema", schema)
dom.put("owners", owners)
dom.put("default-owner", default_owner)
for (k, v) in primary_keys:
dom.put("primary-key", f"{k} {v}", append=True)
for (k, v) in network_owners:
dom.put("network-owner", f"{v} {k}", append=True)
dom.put("mnt-by", default_owner)
dom.put("source", source)
return cls(dom)
def __init__(self, dom: FileDOM):
self._dom = dom
self.path = os.path.dirname(os.path.dirname(dom.src))
# TODO: bit of a smelly side effect. Need something better.
FileDOM.namespace = self.namespace
FileDOM.primary_keys = self.primary_keys
def __str__(self):
return self._dom.__str__()

View file

@ -1,6 +1,7 @@
"""FileDOM parse and formating"""
import re
import os
from dataclasses import dataclass
from typing import Sequence, NamedTuple, List, \
Dict, Optional, Tuple, Union, Generator, TypeVar
@ -8,7 +9,7 @@ from ipaddress import ip_network, IPv4Network, IPv6Network
import log
F = TypeVar("F", bound="FileDOM")
DOM = TypeVar("DOM", bound="FileDOM")
@dataclass(frozen=True)
@ -73,17 +74,18 @@ class Row(NamedTuple):
class FileDOM:
"""Parses a reg file"""
namespace: str = "dn42"
primary_keys: Dict[str, str] = {}
def __init__(self,
text: Optional[Sequence[str]] = None,
src: Optional[str] = None,
ns: Optional[str] = "dn42"):
src: Optional[str] = None):
self.valid = False
self.dom = [] # type: List[Row]
self.keys = {} # type: Dict[str, int]
self.multi = {} # type: Dict[str, int]
self.mntner = [] # type: List[str]
self.src = src
self.ns = ns
if text is not None:
self.parse(text, src=src)
@ -161,11 +163,8 @@ class FileDOM:
@property
def name(self) -> str:
"""return the friendly name for file"""
if self.schema in ("inetnum", "inet6num"):
return self.get("cidr").value
if self.schema in ("person", "role"):
return self.get("nic-hdl").value
if self.schema in FileDOM.primary_keys:
return self.get(FileDOM.primary_keys[self.schema]).value
if len(self.dom) < 1:
return "none"
@ -179,13 +178,13 @@ class FileDOM:
@property
def rel(self) -> str:
"generate rel for schema ref"
return f"{self.ns}.{self.schema}"
return f"{FileDOM.namespace}.{self.schema}"
@property
def index(self) -> Tuple[Tuple[str, str], Tuple[str, str]]:
"""generate index key/value pair"""
name = self.src.split("/")[-1].replace("_", "/")
return ((f"{self.ns}.{self.schema}", name),
return ((f"{FileDOM.namespace}.{self.schema}", name),
(self.src, ",".join(self.mntner)))
def __str__(self):
@ -237,10 +236,23 @@ class FileDOM:
if index not in self.keys[key]:
self.keys[key].append(i)
@staticmethod
def from_file(fn: str) -> F:
@classmethod
def from_file(cls, fn: str) -> DOM:
"""Parses FileDOM from file"""
with open(fn, mode='r', encoding='utf-8') as f:
dom = FileDOM(src=fn, text=f.readlines())
dom = cls(src=fn, text=f.readlines())
return dom
def index_files(path: str) -> FileDOM:
"""generate list of dom files"""
for root, _, files in os.walk(path):
if root == path:
continue
if root.endswith(".rpsl"):
continue
for f in files:
dom = FileDOM.from_file(os.path.join(root, f))
yield dom

View file

@ -2,18 +2,18 @@
from ipaddress import ip_network, IPv6Network
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional, Generator
from typing import Dict, List, Tuple, Optional, Generator, TypeVar
NET = IPv6Network
V6_NET = ip_network("::/0")
V4_NET = ip_network("::ffff:0.0.0.0/96")
NT = TypeVar("NT", bound="NetTree")
@dataclass
class NetRecord:
"Network Record"
network: NET
mnters: List[str]
policy: str
status: str
@ -135,3 +135,26 @@ class NetTree:
v.net.object_type,
v.net.object_name,
)]) + "\n")
@classmethod
def read_csv(cls, fn) -> NT:
"read tree from csv"
inttree = {} # type: Dict[int, NetRecord]
with open(fn) as fd:
for line in fd.readlines():
sp = line.split(sep="|")
if len(sp) != 9:
continue
net = ip_network(f"{sp[3]}/{sp[4]}")
rec = NetRecord(net, sp[5], sp[6])
lis = NetList(sp[0], sp[1], sp[2], rec, [])
inttree[sp[0]] = lis
if sp[0] != sp[1]:
inttree[sp[1]].nets.append(net)
nettree = {}
for v in inttree.values():
nettree[v.net.network] = v
c = cls()
c.tree = NetTree
return c

View file

@ -0,0 +1,84 @@
"RPSL"
import os.path
from typing import Dict, List, Tuple, TypeVar, Optional, Generator
from .filedom import FileDOM
from .nettree import NetTree
from .schema import SchemaDOM, State
from .transact import TransactDOM
from .config import Config
R = TypeVar('R', bound="RPSL")
class RPSL:
"RSPL"
def __init__(self, config: Config):
self._config = config
self._files = {} # type: Dict[Tuple[str, str], str]
self._lookup = {} # type: Dict[str, List[Tuple[str, str]]]
self._links = {} # type: Dict[Tuple[str, str], List[Tuple[str, str]]]
self._nettree = None # type: NetTree
self._schema = {} # type: Dict[str, SchemaDOM]
self._load_index()
def _load_index(self):
with open(self._config.index_file) as fd:
for line in fd.readlines():
sp = line.strip().split(sep="|")
self._files[(sp[0], sp[1])] = sp[2]
self._lookup[sp[1]] = self._lookup.get(sp[1], [])
self._lookup[sp[1]].append((sp[0], sp[1]))
with open(self._config.links_file) as fd:
for line in fd.readlines():
sp = line.strip().split(sep="|")
key = (sp[0], sp[1])
self._links[key] = self._lookup.get(key, [])
self._links[key].append((sp[2], sp[3]))
self._nettree = NetTree.read_csv(self._config.nettree_file)
files = TransactDOM.from_file(self._config.schema_file)
for schema in files.schemas:
self._schema[schema.ref] = schema
def append_index(self, dom: FileDOM):
"append files to index"
key, value = dom.index
self._lookup[key] = value
def scan_files(self, files: List[FileDOM]) -> State:
"scan files for schema errors"
state = State()
for dom in files:
s = self._schema.get(dom.rel)
if s is None:
state.warning(dom.dom[0],
f"{dom.src} schema not found for {dom.rel}")
continue
state = s.check_file(dom, lookups=self._files, state=state)
return state
def find(self,
text: str,
schema: Optional[str] = None) -> Generator[FileDOM, None, None]:
"Find files that match text and schema"
keys = [(schema, text)]
if schema is None:
keys = self._lookup.get(text, [])
for i in keys:
yield self.load_file(self._files[i])
print(self.links(i))
def load_file(self, fn: str) -> FileDOM:
"load file"
fn = os.path.join(self._config.path, fn)
return FileDOM.from_file(fn)
def links(self, key: Tuple[str, str]) -> List[Tuple[str, str]]:
return self._links.get(key, [])

View file

@ -34,7 +34,7 @@ class State:
def __str__(self) -> str:
return "PASS" if self.state else "FAIL"
def print(self):
def print_msgs(self):
"""print out state info"""
for (level, row, msg) in self.msgs:
if level == Level.info:
@ -139,9 +139,13 @@ class SchemaDOM:
return schema
def check_file(self, f: FileDOM, lookups=None) -> State:
def check_file(self,
f: FileDOM,
lookups=None,
state: Optional[State] = None) -> State:
"""Check a FileDOM for correctness(tm)"""
state = State()
if state is None:
state = State()
if not f.valid:
state.error(Row("", "", 0, f.src), "file does not parse")
@ -163,7 +167,7 @@ class SchemaDOM:
elif 'recommend' in v and k not in f.keys:
state.info(row, "not found and is recommended")
if 'schema' in v and f"{f.ns}.{f.dom[0].key}" != self.ref:
if 'schema' in v and f"{f.namespace}.{f.dom[0].key}" != self.ref:
state.error(row, "not found and is required as the first line")
if 'single' in v and k in f.keys and len(f.keys[k]) > 1:

View file

@ -0,0 +1,45 @@
"DN42 Utils"
import os.path
from typing import List, Tuple
def remove_prefix(text, prefix):
"remove the prefix"
if text.startswith(prefix):
return text[len(prefix):]
return text
def shift(args: List[str]) -> Tuple[str, List[str]]:
"shift off first arg + rest"
if len(args) == 0:
return None, []
if len(args) == 1:
return args[0], []
return args[0], args[1:]
def find_rpsl(path: str) -> str:
"Find the root directory for RPSL"
path = os.path.abspath(path)
rpsl = os.path.join(path, ".rpsl")
while not os.path.exists(rpsl):
if path == "/":
break
path = os.path.dirname(path)
rpsl = os.path.join(path, ".rpsl")
if not os.path.exists(rpsl):
return None
return path
def exists(*args: str) -> bool:
"check if files exist"
for i in args:
if not os.path.exists(i):
return False
return True

View file

@ -1,119 +0,0 @@
"RPSL"
import os
import os.path
from dataclasses import dataclass, field
from typing import Dict, List, Set, Tuple, TypeVar
from .filedom import FileDOM
from .nettree import NetTree
from .schema import SchemaDOM
C = TypeVar('C', bound='RPSLConfig')
@dataclass
class RPSLConfig:
"RSPLConfig"
namespace: str
root: str
schema: str
owners: str
default_owner: str
source: str
network_owner: Set[Tuple[str, str]] = field(default_factory=set)
primary_key: Set[Tuple[str, str]] = field(default_factory=set)
@property
def network_parents(self) -> Set[str]:
"return network parents"
return set(self.network_owner.values())
@property
def schema_dir(self) -> str:
"get schema directory"
return os.path.join(self.root, self.schema)
@property
def owner_dir(self) -> str:
"get owner directory"
return os.path.join(self.root, self.owners)
@property
def config_file(self) -> str:
"get config file"
return os.path.join(self.root, ".rpsl/config")
@staticmethod
def default() -> C:
"create default"
root = os.getcwd()
return RPSLConfig("dn42", root, "schema", "mntner", "DN42-MNT", "DN42",
{}, {})
@staticmethod
def from_dom(dom: FileDOM) -> C:
"create from dom"
ns = dom.get("namespace", default="dn42").value
schema = dom.get("schema", default="schema").value
owners = dom.get("owners", default="mntner").value
source = dom.get("source", default="DN42").value
default_owner = dom.get("default-owner", default=dom.mntner).value
root = os.path.dirname(dom.src)
network_owner = {} # type: Dict[str, str]
for (parent, child) in [
i.fields for i in dom.get_all("network-owner")]:
network_owner[child] = parent
primary_key = {} # type: Dict[str, str]
for (parent, child) in [
i.fields for i in dom.get_all("primary-key")]:
primary_key[child] = parent
return RPSLConfig(namespace=ns,
root=root,
schema=schema,
owners=owners,
source=source,
default_owner=default_owner,
network_owner=network_owner,
primary_key=primary_key,
)
def __str__(self):
dom = FileDOM(ns=self.namespace)
dom.put("namespace", self.namespace)
dom.put("schema", self.schema)
dom.put("owners", self.owners)
dom.put("default-owner", self.default_owner)
for (k, v) in self.primary_key:
dom.put("primary-key", f"{k} {v}", append=True)
for (k, v) in self.network_owner:
dom.put("network-owner", f"{v} {k}", append=True)
dom.put("mnt-by", self.default_owner)
dom.put("source", self.source)
return dom.__str__()
R = TypeVar('R', bound="RPSL")
@dataclass
class RSPL:
"RSPL"
config: RPSLConfig
files: List[FileDOM]
nettree: NetTree
schema: Dict[str, SchemaDOM]
@staticmethod
def from_index(path: str) -> R:
"Create RSPL from indexs"
@staticmethod
def from_files(path: str, schema_only: bool = False) -> R:
"Create RSPL from files"

View file

@ -9,18 +9,12 @@ Usage: rpsl [command] [options]
import os
import sys
from typing import Tuple, List, Optional
from typing import Optional
import importlib
import pkgutil
def remove_prefix(text, prefix):
"remove the prefix"
if text.startswith(prefix):
return text[len(prefix):]
return text
from dn42.utils import find_rpsl, remove_prefix, shift
discovered_plugins = {
remove_prefix(name, "rpsl_"): importlib.import_module(name)
@ -49,32 +43,19 @@ def do_help(cmd: Optional[str] = None):
return 0
def find_rpsl(path: str) -> str:
"Find the root directory for RPSL"
path = os.path.abspath(path)
rpsl = os.path.join(path, ".rpsl")
while not os.path.exists(rpsl):
if path == "/":
break
path = os.path.dirname(path)
rpsl = os.path.join(path, ".rpsl")
if not os.path.exists(rpsl):
return None
return path
def run() -> int:
"run application"
"run application command"
_, args = shift(sys.argv) # drop exec name
cmd, args = shift(args)
working_dir = os.getcwd()
working_dir = os.environ.get("WORKING_DIR", working_dir)
prog_dir = os.path.dirname(os.path.realpath(__file__))
rpsl_dir = os.environ.get("RPSL_DIR", working_dir)
rpsl_dir = find_rpsl(rpsl_dir)
cmd, args = shift(shift(sys.argv)[1])
if cmd is None or cmd == 'help':
cmd, _ = shift(args)
return do_help(cmd)
@ -96,17 +77,6 @@ def run() -> int:
})
def shift(args: List[str]) -> Tuple[str, List[str]]:
"shift off first arg + rest"
if len(args) == 0:
return None, []
if len(args) == 1:
return args[0], []
return args[0], args[1:]
if __name__ == '__main__':
code = run()
sys.exit(code)

View file

@ -1,8 +1,9 @@
#!/usr/bin/env python3
"run main code"
import sys
from main import run
if __name__ == '__main__':
code = run()
sys.exit(code)
sys.exit(code)

View file

@ -9,29 +9,37 @@ Usage: rspl index
import os
import sys
from typing import Dict, Generator, List, Set, Tuple
from typing import Dict, Generator, List, Set, Tuple, Sequence
from dom.filedom import FileDOM
from dom.schema import SchemaDOM
from dom.nettree import NetTree, NetRecord
from dom.transact import TransactDOM
from dom.rspl import RPSLConfig
from dn42.rpsl import FileDOM, SchemaDOM, TransactDOM, NetTree, \
NetRecord, Config, index_files
from dn42.utils import remove_prefix
def run(args: List[str], env: Dict[str, str]) -> int:
"rspl index"
_ = args
path = env.get("WORKING_DIR")
path = env.get("RPSL_DIR")
if path is None:
print("RPSL directory not found. do `rpsl init` or set RPSL_DIR",
file=sys.stderr)
return 1
if not os.path.isdir(os.path.join(path, "schema")):
config = Config.from_path(path)
if not os.path.exists(config.index_file) or \
not os.path.exists(config.schema_file):
print("RPSL index files not found. do `rpsl index`?", file=sys.stderr)
return 1
if not os.path.isdir(config.schema_dir):
print("schema directory not found in path", file=sys.stderr)
sys.exit(1)
print(r"Reading Files...", end="\r", flush=True, file=sys.stderr)
idx = index_files(path)
lookup, schemas, files, nets = build_index(idx)
lookup, schemas, files, nets = build_index(idx, rspl=config)
print(
f"Reading Files: done! files: {len(files)}" +
@ -83,41 +91,15 @@ class NotRPSLPath(Exception):
"error raised if unable to determine RPSL root"
def index_files(path: str) -> Generator[FileDOM, None, None]:
"""generate list of dom files"""
path = os.path.abspath(path)
rpsl = os.path.join(path, ".rpsl/config")
while not os.path.exists(rpsl):
if path == "/":
break
path = os.path.dirname(path)
rpsl = os.path.join(path, ".rpsl/config")
if not os.path.exists(rpsl):
raise NotRPSLPath()
yield FileDOM.from_file(rpsl)
for root, _, files in os.walk(path):
if root == path:
continue
if root.endswith(".rpsl"):
continue
for f in files:
dom = FileDOM.from_file(os.path.join(root, f))
yield dom
def build_index(
idx: Generator[FileDOM, None, None]
idx: Sequence[FileDOM],
rspl: Config,
) -> Tuple[
Set[Tuple[str, str]],
Dict[str, SchemaDOM],
List[FileDOM],
List[NetRecord]]:
"build index for files"
rspl = RPSLConfig.default()
lookup = set() # type: Set[Tuple[str, str]]
schemas = {} # type: Dict[str, SchemaDOM]
files = [] # type: List[FileDOM]
@ -136,10 +118,6 @@ def build_index(
lookup.add(key)
files.append(dom)
if dom.schema == "namespace":
rspl = RPSLConfig.from_dom(dom)
net_types = rspl.network_parents
if dom.schema == rspl.schema:
schema = SchemaDOM(dom)
schemas[schema.ref] = schema
@ -147,7 +125,6 @@ def build_index(
if dom.schema in net_types:
nets.append(NetRecord(
dom.get("cidr").as_net6,
dom.mntner,
dom.get("policy", default="closed"),
dom.get("status", default="ASSIGNED"),
))
@ -181,10 +158,3 @@ def generate_links(
if not found:
print(f"{dom.name} missing link {link} {d.value}")
def remove_prefix(text, prefix):
"remove the prefix"
if text.startswith(prefix):
return text[len(prefix):]
return text

View file

@ -18,9 +18,10 @@ import os.path
import argparse
from typing import List, Dict, Generator, Tuple, Set, TypeVar
from dom.rspl import RPSLConfig
from dom.filedom import FileDOM
from dom.schema import SchemaDOM
from dn42.rpsl import Config, FileDOM, SchemaDOM
Group = TypeVar("Group", set, tuple)
parser = argparse.ArgumentParser()
parser.add_argument("--namespace", type=str, default=None)
@ -43,15 +44,20 @@ def run(args: List[str], env: Dict[str, str]) -> int:
return 1
rpsl_dir = env.get("WORKING_DIR")
rpsl = RPSLConfig(root=rpsl_dir,
namespace=opts.namespace,
schema=opts.schema,
owners=opts.owners,
source=opts.source,
default_owner=opts.default_owner)
schema_dir = os.path(rpsl_dir, "schema")
if os.path.exists(rpsl.schema_dir):
rpsl.network_owner, rpsl.primary_key = _parse_schema(rpsl.schema_dir)
network_owners, primary_keys = {}, {}
if os.path.exists(schema_dir):
network_owners, primary_keys = _parse_schema(schema_dir)
rpsl = Config.build(path=rpsl_dir,
namespace=opts.namespace,
schema=opts.schema,
owners=opts.owners,
source=opts.source,
default_owner=opts.default_owner,
network_owners=network_owners,
primary_keys=primary_keys)
os.makedirs(os.path.dirname(rpsl.config_file), exist_ok=True)
with open(rpsl.config_file, "w") as f:
@ -69,9 +75,6 @@ def _read_schemas(path: str) -> Generator[SchemaDOM, None, None]:
yield schema
Group = TypeVar("Group", set, tuple)
def _parse_schema(path: str) -> Tuple[Group, Group]:
schemas = _read_schemas(path)
@ -85,5 +88,4 @@ def _parse_schema(path: str) -> Tuple[Group, Group]:
if s.primary != s.type:
primary_key.add((s.type, s.primary))
print(network_owner)
return network_owner, primary_key

View file

@ -15,9 +15,7 @@ import sys
import argparse
from typing import List, Dict
from dom.filedom import FileDOM
from dom.schema import SchemaDOM
from dom.transact import TransactDOM
from dn42.rpsl import RPSL, Config, TransactDOM, index_files
parser = argparse.ArgumentParser()
parser.add_argument("--add-index", action='store_true')
@ -25,19 +23,6 @@ parser.add_argument("--scan-dir", type=str, default=None)
parser.add_argument("--scan-file", type=str, default=None)
def index_files(path: str):
"""generate list of dom files"""
for root, _, files in os.walk(path):
if root == path:
continue
if root.endswith(".rpsl"):
continue
for f in files:
dom = FileDOM.from_file(os.path.join(root, f))
yield dom
def run(args: List[str], env: Dict[str, str]) -> int:
"""run scan script"""
opts = parser.parse_args(args)
@ -48,49 +33,34 @@ def run(args: List[str], env: Dict[str, str]) -> int:
file=sys.stderr)
return 1
index_file = os.path.join(path, ".rpsl/index")
schema_file = os.path.join(path, ".rpsl/schema")
if not os.path.exists(index_file) or not os.path.exists(schema_file):
print("RPSL index files not found. do `rpsl index`?")
config = Config.from_path(path)
if not os.path.exists(config.index_file) or \
not os.path.exists(config.schema_file):
print("RPSL index files not found. do `rpsl index`?", file=sys.stderr)
return 1
lookups = {} # type: Dict[str, FileDOM]
schemas = {} # type: Dict[str, SchemaDOM]
with open(index_file) as fd:
print("Reading index... ", end="", file=sys.stderr, flush=True)
for line in fd.readlines():
sp = line.strip().split(sep="|")
lookups[(sp[0], sp[1])] = (sp[2], "")
print("done.", file=sys.stderr, flush=True)
schema_set = TransactDOM.from_file(schema_file)
for schema in schema_set.schemas:
schemas[schema.ref] = schema
def file_gen(path):
if opts.scan_dir is not None:
path = os.path.join(env.get("WORKING_DIR"), opts.scan_dir)
elif opts.scan_file is not None:
path = os.path.join(env.get("WORKING_DIR"), opts.scan_file)
return TransactDOM.from_file(path).files
return index_files(path)
rpsl = RPSL(config)
files = _file_gen(path, opts, wd=env.get("WORKING_DIR"))
if opts.add_index:
files, g = [], files
print("Add scanned items to lookup index...", file=sys.stderr)
for dom in file_gen(path):
key, value = dom.index
lookups[key] = value
for dom in g:
files.append(dom)
rpsl.append_index(dom)
for dom in file_gen(path):
s = schemas.get(dom.rel)
if s is None:
print(f"{dom.src} schema not found for {dom.rel}")
status = s.check_file(dom, lookups=lookups)
status.print()
print("Scanning files...", file=sys.stderr)
status = rpsl.scan_files(files)
status.print_msgs()
print(status)
return 0 if status else 1
def _file_gen(path, opts: argparse.Namespace, wd: str):
if opts.scan_dir is not None:
path = os.path.join(wd, opts.scan_dir)
elif opts.scan_file is not None:
path = os.path.join(wd, opts.scan_file)
return TransactDOM.from_file(path).files
return index_files(path)

View file

@ -5,21 +5,34 @@ Usage: rpsl whois [text]
"""
import os.path
import sys
from ipaddress import ip_network
from typing import List, Dict, Tuple
from typing import List, Dict, Optional
from dom.filedom import FileDOM
from dn42.rpsl import RPSL, Config
from dn42.utils import shift, exists
def run(args: List[str], env: Dict[str, str]) -> int:
"do whois search"
path = env.get("RPSL_DIR")
if len(args) == 0:
print("Usage: rpsl whois [text]")
schema = None
rpsl_dir = env.get("RPSL_DIR")
if rpsl_dir is None:
print("RPSL index files not found. do `rpsl index`?", file=sys.stderr)
return 1
config = Config.from_path(rpsl_dir)
if not exists(config.index_file,
config.schema_file,
config.links_file):
print("RPSL index files not found. do `rpsl index`?", file=sys.stderr)
return 1
rpsl = RPSL(config)
schema = None # type: Optional[str]
text, args = shift(args)
if len(args) > 0:
@ -32,47 +45,11 @@ def run(args: List[str], env: Dict[str, str]) -> int:
except ValueError:
pass
lookups, find = load_lookup(path)
if ip is not None:
print(f"Searching network {text}...")
return 0
keys = [(schema, text)]
if schema is None:
keys = find.get(text, [])
for i in keys:
fn = os.path.join(path, lookups[i][2])
print(FileDOM.from_file(fn))
for dom in rpsl.find(text, schema):
print(dom)
return 0
def load_lookup(path: str) -> Tuple[Dict[Tuple[str, str], FileDOM],
Dict[str, List[Tuple[str, str]]]]:
"Load lookup data"
index_file = os.path.join(path, ".rpsl/index")
lookups = {} # type: Dict[Tuple[str, str], FileDOM]
find = {} # type: Dict[str, List[Tuple[str, str]]]
with open(index_file) as fd:
for line in fd.readlines():
sp = line.strip().split(sep="|")
lookups[(sp[0], sp[1])] = (sp[0], sp[1], sp[2])
find[sp[1]] = find.get(sp[1], [])
find[sp[1]].append((sp[0], sp[1]))
return lookups, find
def shift(args: List[str]) -> Tuple[str, List[str]]:
"shift off first arg + rest"
if len(args) == 0:
return None, []
if len(args) == 1:
return args[0], []
return args[0], args[1:]