update rpsl. add whois

This commit is contained in:
Jonathan Lundy 2020-07-13 13:47:07 -06:00
parent c9c82b3132
commit 84b133f124
No known key found for this signature in database
GPG key ID: C63E6D61F3035024
14 changed files with 392 additions and 139 deletions

View file

@ -2,18 +2,18 @@
__version__ = "0.3.0"
from .filedom import FileDOM, Row, Value, index_files
from .file import FileDOM, Row, Value, index_files
from .schema import SchemaDOM, Level, State
from .transact import TransactDOM
from .config import Config
from .nettree import NetTree, NetRecord, NetList
from .rspldom import RPSL
from .nettree import NetTree, NetRecord, NetList, as_net6
from .rspl import RPSL
__all__ = [
"FileDOM", "Row", "Value", "index_files",
"SchemaDOM", "Level", "State",
"TransactDOM",
"Config",
"NetTree", "NetRecord", "NetList",
"NetTree", "NetRecord", "NetList", "as_net6",
"RPSL",
]

View file

@ -6,7 +6,7 @@ import os.path
from dataclasses import dataclass
from typing import Dict, Set, Tuple, Optional, TypeVar
from .filedom import FileDOM
from .file import FileDOM
C = TypeVar('C', bound='Config')
@ -66,6 +66,11 @@ class Config:
"return network parents"
return set(self.network_owners.values())
@property
def network_children(self) -> Set[str]:
"return network children"
return set(self.network_owners.keys()) - self.network_parents
@property
def schema_dir(self) -> str:
"get schema directory"

View file

@ -55,6 +55,15 @@ class Value:
"""Format as key name"""
return self.value.replace("/", "_").replace(" ", "")
@property
def as_spec(self) -> List[str]:
"get the spec definition"
fields = self.fields
i = fields.index(">")
if i is None:
return []
return fields[i:]
class Row(NamedTuple):
"""DOM Row"""
@ -183,11 +192,10 @@ class FileDOM:
return f"{self.namespace}.{self.schema}"
@property
def index(self) -> Tuple[Tuple[str, str], Tuple[str, str]]:
def index(self) -> Tuple[str, str]:
"""generate index key/value pair"""
name = self.src.split("/")[-1].replace("_", "/")
return ((f"{self.namespace}.{self.schema}", name),
(self.src, ",".join(self.mntner)))
return f"{self.namespace}.{self.schema}", name
def __str__(self):
length = 19

View file

@ -0,0 +1,23 @@
"Metafile"
from dataclasses import dataclass
from typing import Sequence, Generator
from .rspl import RPSL
from .file import Value
@dataclass
class MetaFile:
"file"
obj_type: str
obj_name: str
class MetaDOM:
"metafile dom"
def __init__(self, lis: Sequence[MetaFile], rpsl: RPSL):
self.lis = lis
self.rpsl = rpsl
def get(self, name: str) -> Generator[Value, None, None]:
"get values"

View file

@ -1,7 +1,7 @@
"Net Tree"
from ipaddress import ip_network, IPv6Network
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Optional, Generator, TypeVar
NET = IPv6Network
@ -10,16 +10,33 @@ V4_NET = ip_network("::ffff:0.0.0.0/96")
NT = TypeVar("NT", bound="NetTree")
def as_net6(value: str) -> IPv6Network:
"""return value as an ip network"""
net = ip_network(value)
if isinstance(net, IPv6Network):
return net
n = net
return ip_network(
f"::FFFF:{n.network_address}/{n.prefixlen + 96}")
@dataclass
class NetRecord:
"Network Record"
network: NET
policy: str
status: str
is_leaf: bool = False
@property
def object_type(self) -> str:
"""object type"""
if self.is_leaf:
return "route" if V4_NET.supernet_of(self.network) \
else "route6"
return "inetnum" if V4_NET.supernet_of(self.network) \
else "inet6num"
@ -35,6 +52,9 @@ class NetRecord:
return self.network.with_prefixlen.replace("/", "_")
def __str__(self) -> str:
return f"{self.object_type}/{self.object_name}"
@dataclass
class NetList:
@ -44,6 +64,7 @@ class NetList:
level: int
net: Optional[NetRecord]
nets: List[NET]
routes: List[NetRecord] = field(default_factory=list)
def in_net(self, i: NET) -> Tuple[bool, NET]:
"find a network within a list of networks"
@ -57,39 +78,55 @@ class NetList:
return found, net
def in_routes(self, i: NET) -> Tuple[bool, NET]:
"find a network within a list of networks"
found = False
net = None
for n in self.routes:
if n.network.supernet_of(i):
found = True
net = n
break
return found, net
class NetTree:
"Network Tree"
def __init__(self, nets: Optional[List[NET]] = None):
def __init__(self,
nets: Optional[List[NetRecord]] = None,
routes: Optional[List[NetRecord]] = None):
self.tree = {} # type: Dict[NET, NetList]
if routes is None:
routes = []
if nets is not None:
self.make_tree(nets)
self.make_tree(nets, routes)
def __getitem__(self, key):
return self.tree[key]
def find_tree(self, ip: NET) -> Tuple[bool, int]:
def find_tree(self, ip: str) -> Generator[NetList, None, None]:
"""Find net in tree"""
net = V6_NET
current = self.tree[net]
needle = as_net6(ip)
yield current
while True:
found, net = current.in_net(ip)
if not found:
return True, current.level + 1
found, net = current.in_net(needle)
if found:
current = self.tree[net]
yield current
continue
break
if ip == net:
return True, current.level + 2
current = self.tree[net]
continue
return False, 0
def make_tree(self, nets: List[NetRecord]):
def make_tree(self,
nets: List[NetRecord],
routes: List[NetRecord]):
"""build a network tree index"""
root = V6_NET
self.tree = {root: NetList(0, None, -1, None, [])}
index = 0
for index, net in enumerate(sorted(
sorted(nets, key=lambda x: x.network),
key=lambda x: x.network.prefixlen)):
@ -110,6 +147,23 @@ class NetTree:
index, current.index, current.level + 1, net, [])
break
for index, net in enumerate(sorted(
sorted(routes, key=lambda x: x.network),
key=lambda x: x.network.prefixlen), index):
current = self.tree[root]
while True:
found, n = current.in_net(net.network)
if found:
current = self.tree[n]
continue
rec = NetRecord(net.network, "-", "-", True)
current.routes.append(rec)
break
def write_csv(self, fn: str = ".netindex"):
"write tree to csv"
with open(fn, "w") as f:
@ -135,6 +189,19 @@ class NetTree:
v.net.object_type,
v.net.object_name,
)]) + "\n")
for route in v.routes:
net_addr = route.network.network_address.exploded
net_pfx = route.network.prefixlen
yield (
"|".join([str(i) for i in (
f"{0:04d}|{v.index:04d}|{v.level+1:04d}",
net_addr,
net_pfx,
route.policy,
route.status,
route.object_type,
route.object_name,
)]) + "\n")
@classmethod
def read_csv(cls, fn) -> NT:
@ -146,15 +213,20 @@ class NetTree:
if len(sp) != 9:
continue
net = ip_network(f"{sp[3]}/{sp[4]}")
rec = NetRecord(net, sp[5], sp[6])
lis = NetList(sp[0], sp[1], sp[2], rec, [])
inttree[sp[0]] = lis
if sp[0] != sp[1]:
inttree[sp[1]].nets.append(net)
is_leaf = sp[7] in ("route", "route6")
rec = NetRecord(net, sp[5], sp[6], is_leaf)
if is_leaf:
inttree[sp[1]].routes.append(rec)
else:
lis = NetList(sp[0], sp[1], sp[2], rec, [])
inttree[sp[0]] = lis
if sp[0] != sp[1]:
inttree[sp[1]].nets.append(net)
nettree = {}
for v in inttree.values():
nettree[v.net.network] = v
c = cls()
c.tree = NetTree
c.tree = nettree
return c

View file

@ -1,10 +1,10 @@
"RPSL"
import os.path
from typing import Dict, List, Tuple, TypeVar, Optional, Generator
from typing import Dict, List, Tuple, TypeVar, Optional, Sequence
from .filedom import FileDOM
from .nettree import NetTree
from .file import FileDOM
from .nettree import NetTree, NetList
from .schema import SchemaDOM, State
from .transact import TransactDOM
from .config import Config
@ -49,7 +49,7 @@ class RPSL:
def append_index(self, dom: FileDOM):
"append files to index"
key, value = dom.index
key, value = dom.index, (dom.src, ",".join(dom.mntner))
self._lookup[key] = value
def scan_files(self, files: List[FileDOM]) -> State:
@ -67,24 +67,35 @@ class RPSL:
def find(self,
text: str,
schema: Optional[str] = None) -> Generator[FileDOM, None, None]:
schema: Optional[str] = None) -> Sequence[str]:
"Find files that match text and schema"
keys = [(schema, text)]
if schema is None:
keys = self._lookup.get(text, [])
return [self._files[i] for i in keys]
def related(
self,
key: Tuple[str, str]) -> Sequence[str]:
"Get files related to file"
related = set()
for link in self.links(key):
key = (link[1], link[2])
related.add(key)
for i in keys:
yield self.load_file(self._files[i])
for link in self.links(i):
key = (link[1], link[2])
related.add(key)
return [self._files[i] for i in related]
for i in related:
if i in keys:
continue
yield self.load_file(self._files[i])
def find_network(self, ip: str) -> Sequence[NetList]:
"""Find Network in index
Args:
ip (str): ip address
Returns:
Generator[NetList, None, None]: generator of netlists
"""
return self._nettree.find_tree(ip)
def load_file(self, fn: str) -> FileDOM:
"load file"
@ -95,6 +106,14 @@ class RPSL:
return fo
def load_files(self, fns: Sequence[str]) -> Sequence[NetList]:
for fn in fns:
yield self.load_file(fn)
def links(self, key: Tuple[str, str]) -> List[Tuple[str, str]]:
"get links"
return self._links.get(key, [])
def schema(self, name: str) -> SchemaDOM:
"get schema"
return self._schema.get(name)

View file

@ -6,7 +6,7 @@ from typing import Optional, List, Tuple, Dict, Set, TypeVar
import log
from .filedom import FileDOM, Row
from .file import FileDOM, Row
DOM = TypeVar("DOM", bound="FileDOM")
STATE = TypeVar("STATE", bound="State")

View file

@ -0,0 +1,21 @@
"spec"
from dataclasses import dataclass
from typing import Dict, List, Enum
class Rule:
pass
@dataclass
class LabelRule(Rule):
name: str
def parse(self, fields: Sequence[str]) -> Optional[Tuple[str, str]]:
@dataclass
class Spec:
keys: Dict[str, SpecRule]
@classmethod
def from_dom(cls, dom: file.FileDOM):
for key in

View file

@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""Test FileDOM"""
import unittest
import inspect
from pprint import pprint
from .filedom import FileDOM
class TestFileDOM(unittest.TestCase):
"""Test FileDOM"""
def test_parse(self):
"""Test Parsing"""
s = """
person: Xuu
remarks: test
+
Multi-Line
contact: xmpp:xuu@xmpp.dn42
contact: mail:xuu@dn42.us
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
nic-hdl: XUU-DN42
mnt-by: XUU-MNT
source: DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
self.assertTrue(dom.valid)
self.assertEqual(dom.schema, "person")
self.assertEqual(dom.get("person"), "Xuu")
self.assertEqual(dom.get("contact"), "xmpp:xuu@xmpp.dn42")
self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us")
self.assertIsNone(dom.get("xxx"))
self.assertEqual(dom.get("xxx", default="default"), "default")
self.assertEqual(str(dom), s)
def test_put_values(self):
"""Test putting values"""
s = """
person: Xuu
remarks: test
contact: xmpp:xuu@xmpp.dn42
contact: mail:xuu@dn42.us
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
nic-hdl: XUU-DN42
mnt-by: XUU-MNT
source: DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
dom.put("source", "SOURIS")
self.assertEqual(dom.get("source"), "SOURIS")
dom.put("contact", "mail:me@sour.is", append=True)
self.assertEqual(str(dom.get("contact")), "xmpp:xuu@xmpp.dn42")
self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us")
self.assertEqual(dom.get("contact", index=2), "mail:me@sour.is")
def test_parse_ip6address(self):
"""Test network ip address parsing"""
s = """
inet6num: fd00:0000:0000:0000:0000:0000:0000:0000 - fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff
cidr: fd00::/8
netname: ROOT-DN42-ULA
descr: DN42 ULA Address Space
status: ALLOCATED
policy: open
org: ORG-DN42
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
s = inspect.cleandoc(s)+"\n"
dom = FileDOM(text=s.splitlines())
cidr = dom.get("cidr").as_net
self.assertEqual(cidr.compressed, "fd00::/8")
self.assertEqual(
cidr.exploded, "fd00:0000:0000:0000:0000:0000:0000:0000/8")
end = cidr.broadcast_address.exploded
start = cidr.network_address.exploded
self.assertEqual(dom.get("inet6num"), f"{start} - {end}")
def test_parse_ip4address(self):
"""Test network ip address parsing"""
s = """
inetnum: 172.20.0.0 - 172.23.255.255
cidr: 172.20.0.0/14
netname: ROOT-DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM(text=s.splitlines())
cidr = dom.get("cidr").as_net
self.assertEqual(cidr.compressed, "172.20.0.0/14")
self.assertEqual(
cidr.exploded, "172.20.0.0/14")
end = cidr.broadcast_address.exploded
start = cidr.network_address.exploded
self.assertEqual(dom.get("inetnum"), f"{start} - {end}")
@unittest.skip
def test_bad_parse(self):
"""bad parse stuff"""
s = """
person: Xuu
EXTRA
:
source: DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
pprint(dom.dom)
self.assertEqual(str(dom), s)
if __name__ == '__main__':
unittest.main()

View file

@ -2,7 +2,7 @@
from typing import Sequence, List, Optional, Tuple, TypeVar
from .filedom import FileDOM
from .file import FileDOM
from .schema import SchemaDOM
DOM = TypeVar("DOM", bound="TransactDOM")

View file

@ -40,12 +40,13 @@ def run(args: List[str], env: Dict[str, str]) -> int:
idx = index_files(path,
namespace=config.namespace,
primary_keys=config.primary_keys)
lookup, schemas, files, nets = build_index(idx, rspl=config)
lookup, schemas, files, nets, routes = build_index(idx, rspl=config)
print(
f"Reading Files: done! files: {len(files)}" +
f" schemas: {len(schemas)}" +
f" networks: {len(nets)}",
f" routes: {len(routes)}",
file=sys.stderr)
print("Writing .rpsl/index", file=sys.stderr)
@ -71,7 +72,7 @@ def run(args: List[str], env: Dict[str, str]) -> int:
file=link_out)
print("Generate .rpsl/nettree", file=sys.stderr)
tree = NetTree(nets)
tree = NetTree(nets, routes)
print("Writing .rpsl/nettree", file=sys.stderr)
tree.write_csv(".rpsl/nettree")
@ -105,17 +106,19 @@ def build_index(
schemas = {} # type: Dict[str, SchemaDOM]
files = [] # type: List[FileDOM]
nets = [] # type: List[NetRecord]
routes = [] # type: List[NetRecord]
print(r"Reading Files...", end="\r", flush=True, file=sys.stderr)
net_types = rspl.network_parents
net_leafs = rspl.network_children
for (i, dom) in enumerate(idx):
if not dom.valid:
print("E", end="", flush=True)
continue
key, _ = dom.index
key = dom.index
lookup.add(key)
files.append(dom)
@ -130,14 +133,23 @@ def build_index(
dom.get("status", default="ASSIGNED"),
))
if dom.schema in net_leafs:
routes.append(NetRecord(
dom.get(dom.primary_key).as_net6,
dom.get("policy", default="none"),
dom.get("status", default="none"),
True,
))
if i % 120 == 0:
print(
f"Reading Files: files: {len(files)}" +
f" schemas: {len(schemas)} " +
f" networks: {len(nets)}",
f" routes: {len(routes)}",
end="\r", flush=True, file=sys.stderr)
return (lookup, schemas, files, nets)
return (lookup, schemas, files, nets, routes)
def generate_links(
@ -147,15 +159,12 @@ def generate_links(
) -> Generator[Tuple[str, str, str], None, None]:
"print file links out to file"
for (link, refs) in links.items():
d = dom.get(link)
if d is None:
continue
for d in dom.get_all(link):
found = False
for ref in refs:
if (ref, d.value) in lookup:
found = True
yield (link, ref, d)
found = False
for ref in refs:
if (ref, d.value) in lookup:
found = True
yield (link, ref, d)
if not found:
print(f"{dom.name} missing link {link} {d.value}")
if not found:
print(f"{dom.name} missing link {link} {d.value}")

View file

@ -63,6 +63,7 @@ def run(args: List[str], env: Dict[str, str]) -> int:
print(rpsl, file=f)
print(f"Created: {rpsl.config_file}", file=sys.stderr)
env["RPSL_DIR"] = rpsl_dir
rpsl_index.run(args, env)
return 0

View file

@ -6,10 +6,10 @@ Usage: rpsl whois [text]
"""
import sys
from ipaddress import ip_network
from typing import List, Dict, Optional
from itertools import chain
from typing import List, Dict, Optional, Set, Tuple
from dn42.rpsl import RPSL, Config
from dn42.rpsl import RPSL, Config, FileDOM, as_net6
from dn42.utils import shift, exists
@ -41,15 +41,46 @@ def run(args: List[str], env: Dict[str, str]) -> int:
ip = None
try:
ip = ip_network(text)
ip = as_net6(text)
except ValueError:
pass
if ip is not None:
print(f"Searching network {text}...")
return 0
principle = [] # type: List[FileDOM]
related_nets = [] # type: List[FileDOM]
related_idx = set() # type: Set[Tuple[str, str]]
for dom in rpsl.find(text, schema):
if ip is not None:
print(f"# Searching network {text}...")
nets = list(rpsl.find_network(text))
last_net = nets[-1]
dom = rpsl.load_file(str(last_net.net))
principle.append(dom)
related_idx.add(dom.index)
ok, route = last_net.in_routes(ip)
if ok:
dom = rpsl.load_file(str(route))
principle.append(dom)
related_idx.add(dom.index)
for net in nets[:-1]:
dom = rpsl.load_file(str(net.net))
related_nets.append(dom)
else:
for dom in rpsl.find(text, schema):
principle.append(dom)
related_idx.add(dom.index)
print("# Found objects")
for dom in principle:
print(dom)
if len(related_nets) > 0:
print("# Related Networks")
for dom in related_nets:
print(dom)
print("# Related objects")
lis = set(chain.from_iterable(rpsl.related(i) for i in related_idx))
for dom in rpsl.load_files(sorted(lis)):
print(dom)
return 0

View file

@ -1,70 +0,0 @@
#!/usr/bin/env python3
"""Scans Registry at given path for issues"""
import os
import sys
from typing import Dict
from dom.filedom import FileDOM
from dom.schema import SchemaDOM
def index_files(path: str):
"""generate list of dom files"""
for root, _, files in os.walk(path):
if root == path:
continue
for f in files:
if f[0] == ".":
continue
dom = FileDOM.from_file(os.path.join(root, f))
yield dom
def run(path: str = "."):
"""run main script"""
idx = index_files(path)
lookups = {} # type: Dict[str, FileDOM]
schemas = {} # type: Dict[str, SchemaDOM]
files = []
print(r"Reading Files...", end="\r", flush=True, file=sys.stderr)
for (i, dom) in enumerate(idx):
if not dom.valid:
print("E", end="", flush=True)
continue
key, value = dom.index
lookups[key] = value
files.append(dom)
if dom.schema == "schema":
schema = SchemaDOM(dom)
schemas[schema.ref] = schema
if i % 120 == 0:
print(
f"Reading Files: files: {len(files)} schemas: {len(schemas)}",
end="\r", flush=True, file=sys.stderr)
print(
f"Reading Files: done! files: {len(files)}, schemas: {len(schemas)}",
file=sys.stderr)
for dom in files:
s = schemas.get(dom.rel)
if s is None:
print(f"{dom.src} schema not found for {dom.rel}")
continue
status = s.check_file(dom, lookups)
status.print()
if __name__ == "__main__":
run(sys.argv[1] if len(sys.argv) > 1 else os.getcwd())