mirror of
https://git.dn42.dev/dn42/registry.git
synced 2025-05-08 13:45:22 +08:00
Merge pull request 'xuu-20200713/rpsl-changes' (#43) from xuu-20200713/rpsl-changes into master
Reviewed-on: https://git.dn42.dev/dn42/registry/pulls/43
This commit is contained in:
commit
dc9d7a372a
15 changed files with 404 additions and 151 deletions
24
README.md
24
README.md
|
@ -1,4 +1,4 @@
|
|||
*This repo is https://git.dn42.dev/dn42/registry
|
||||
*This repo is https://git.dn42.dev/dn42/registry
|
||||
If you are using a different url, please update as soon as possible*
|
||||
|
||||
# Guide for creating a Pull Request
|
||||
|
@ -12,9 +12,9 @@ git clone git@git.dn42.dev:dn42/registry.git
|
|||
2. **Create a branch for your changes**
|
||||
|
||||
The name of the branch ***must*** follow a specific format:
|
||||
`<username>-YYYYMMDD/<name>`
|
||||
- `<username>` is your gitea username.
|
||||
- `YYYYMMDD` is the current date.
|
||||
`<username>-YYYYMMDD/<name>`
|
||||
- `<username>` is your gitea username.
|
||||
- `YYYYMMDD` is the current date.
|
||||
- `<name>` is a descriptive name for your change.
|
||||
|
||||
The branch must be created in the registry on the date described in the branch name, so create the branch and push it to the registry immediately.
|
||||
|
@ -50,7 +50,7 @@ git commit
|
|||
|
||||
4. **Push your changes back to the registry**
|
||||
|
||||
Remember to squash your commits and sign them using your MNTNER [authentication method](https://dn42.dev/howto/Registry-Authentication).
|
||||
Remember to squash your commits and sign them using your MNTNER [authentication method](https://dn42.dev/howto/Registry-Authentication).
|
||||
It is also good practice to rebase your work on top of any other changes that may have happened on the master branch.
|
||||
|
||||
```sh
|
||||
|
@ -60,7 +60,7 @@ git fetch origin master
|
|||
|
||||
# ensure you are using your new branch
|
||||
|
||||
git checkout burble-20200704/mychange
|
||||
git checkout burble-20200704/mychange
|
||||
|
||||
# rebase your branch on top of the master
|
||||
#
|
||||
|
@ -84,17 +84,17 @@ In the gitea GUI, select your branch, check your changes again for a final time
|
|||
|
||||
If you are using SSH authentication, please post the full commit hash that you signed and SSH signature in to the PR comments.
|
||||
|
||||
Your changes will now go through automatic checking and then manual review by the registry maintainers.
|
||||
Your changes will now go through automatic checking and then manual review by the registry maintainers.
|
||||
|
||||
6. **Making updates**
|
||||
|
||||
If you need to change your PR to fix review issues simply make the updates to your branch and follow the process in (4) to rebase, squash and sign your changes again.
|
||||
Please remember to do this for every update.
|
||||
If you need to change your PR to fix review issues simply make the updates to your branch and follow the process in (4) to rebase, squash and sign your changes again.
|
||||
Please remember to do this for every update.
|
||||
|
||||
|
||||
# Gitea Usage
|
||||
|
||||
The DN42 registry is a community resource for *your* benefit.
|
||||
The DN42 registry is a community resource for *your* benefit.
|
||||
Whilst registered users are free to create and use their own repositories, please be considerate in your usage.
|
||||
|
||||
- Repositories should be related to DN42
|
||||
|
@ -103,9 +103,9 @@ Whilst registered users are free to create and use their own repositories, pleas
|
|||
|
||||
# Data Privacy
|
||||
|
||||
Gitea and the DN42 registry contains personal information for users who are registered in DN42; this information is stored in Canada and viewable by any registered member. In addition, anyone with access to the repository is able to make their own copies of the registry, which they may then process or transfer in arbitrary ways. You must assume that all data entered in to the registry cannot be kept private and will be made publically available.
|
||||
Gitea and the DN42 registry contains personal information for users who are registered in DN42; this information is stored in Canada and viewable by any registered member. In addition, anyone with access to the repository is able to make their own copies of the registry, which they may then process or transfer in arbitrary ways. You must assume that all data entered in to the registry cannot be kept private and will be made publically available.
|
||||
|
||||
Any personal information stored in the registry is optional and voluntarily provided by you. Whilst the registry maintainers will make best efforts to update or delete personal data, you must accept that the technical restrictions of git may make this impossible and that your information will likely have been distributed beyond the control of the registry maintainers.
|
||||
Any personal information stored in the registry is optional and voluntarily provided by you. Whilst the registry maintainers will make best efforts to update or delete personal data, you must accept that the technical restrictions of git may make this impossible and that your information will likely have been distributed beyond the control of the registry maintainers.
|
||||
|
||||
If this is not acceptable for you, you must not upload your personal details to the registry.
|
||||
|
||||
|
|
|
@ -2,18 +2,18 @@
|
|||
|
||||
__version__ = "0.3.0"
|
||||
|
||||
from .filedom import FileDOM, Row, Value, index_files
|
||||
from .file import FileDOM, Row, Value, index_files
|
||||
from .schema import SchemaDOM, Level, State
|
||||
from .transact import TransactDOM
|
||||
from .config import Config
|
||||
from .nettree import NetTree, NetRecord, NetList
|
||||
from .rspldom import RPSL
|
||||
from .nettree import NetTree, NetRecord, NetList, as_net6
|
||||
from .rspl import RPSL
|
||||
|
||||
__all__ = [
|
||||
"FileDOM", "Row", "Value", "index_files",
|
||||
"SchemaDOM", "Level", "State",
|
||||
"TransactDOM",
|
||||
"Config",
|
||||
"NetTree", "NetRecord", "NetList",
|
||||
"NetTree", "NetRecord", "NetList", "as_net6",
|
||||
"RPSL",
|
||||
]
|
||||
|
|
|
@ -6,7 +6,7 @@ import os.path
|
|||
from dataclasses import dataclass
|
||||
from typing import Dict, Set, Tuple, Optional, TypeVar
|
||||
|
||||
from .filedom import FileDOM
|
||||
from .file import FileDOM
|
||||
|
||||
|
||||
C = TypeVar('C', bound='Config')
|
||||
|
@ -66,6 +66,11 @@ class Config:
|
|||
"return network parents"
|
||||
return set(self.network_owners.values())
|
||||
|
||||
@property
|
||||
def network_children(self) -> Set[str]:
|
||||
"return network children"
|
||||
return set(self.network_owners.keys()) - self.network_parents
|
||||
|
||||
@property
|
||||
def schema_dir(self) -> str:
|
||||
"get schema directory"
|
||||
|
|
|
@ -55,6 +55,15 @@ class Value:
|
|||
"""Format as key name"""
|
||||
return self.value.replace("/", "_").replace(" ", "")
|
||||
|
||||
@property
|
||||
def as_spec(self) -> List[str]:
|
||||
"get the spec definition"
|
||||
fields = self.fields
|
||||
i = fields.index(">")
|
||||
if i is None:
|
||||
return []
|
||||
return fields[i:]
|
||||
|
||||
|
||||
class Row(NamedTuple):
|
||||
"""DOM Row"""
|
||||
|
@ -183,11 +192,10 @@ class FileDOM:
|
|||
return f"{self.namespace}.{self.schema}"
|
||||
|
||||
@property
|
||||
def index(self) -> Tuple[Tuple[str, str], Tuple[str, str]]:
|
||||
def index(self) -> Tuple[str, str]:
|
||||
"""generate index key/value pair"""
|
||||
name = self.src.split("/")[-1].replace("_", "/")
|
||||
return ((f"{self.namespace}.{self.schema}", name),
|
||||
(self.src, ",".join(self.mntner)))
|
||||
return f"{self.namespace}.{self.schema}", name
|
||||
|
||||
def __str__(self):
|
||||
length = 19
|
23
utils/registry/dn42/rpsl/metafile.py
Normal file
23
utils/registry/dn42/rpsl/metafile.py
Normal file
|
@ -0,0 +1,23 @@
|
|||
"Metafile"
|
||||
from dataclasses import dataclass
|
||||
from typing import Sequence, Generator
|
||||
|
||||
from .rspl import RPSL
|
||||
from .file import Value
|
||||
|
||||
|
||||
@dataclass
|
||||
class MetaFile:
|
||||
"file"
|
||||
obj_type: str
|
||||
obj_name: str
|
||||
|
||||
|
||||
class MetaDOM:
|
||||
"metafile dom"
|
||||
def __init__(self, lis: Sequence[MetaFile], rpsl: RPSL):
|
||||
self.lis = lis
|
||||
self.rpsl = rpsl
|
||||
|
||||
def get(self, name: str) -> Generator[Value, None, None]:
|
||||
"get values"
|
|
@ -1,7 +1,7 @@
|
|||
"Net Tree"
|
||||
|
||||
from ipaddress import ip_network, IPv6Network
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Dict, List, Tuple, Optional, Generator, TypeVar
|
||||
|
||||
NET = IPv6Network
|
||||
|
@ -10,16 +10,33 @@ V4_NET = ip_network("::ffff:0.0.0.0/96")
|
|||
NT = TypeVar("NT", bound="NetTree")
|
||||
|
||||
|
||||
def as_net6(value: str) -> IPv6Network:
|
||||
"""return value as an ip network"""
|
||||
net = ip_network(value)
|
||||
|
||||
if isinstance(net, IPv6Network):
|
||||
return net
|
||||
|
||||
n = net
|
||||
return ip_network(
|
||||
f"::FFFF:{n.network_address}/{n.prefixlen + 96}")
|
||||
|
||||
|
||||
@dataclass
|
||||
class NetRecord:
|
||||
"Network Record"
|
||||
network: NET
|
||||
policy: str
|
||||
status: str
|
||||
is_leaf: bool = False
|
||||
|
||||
@property
|
||||
def object_type(self) -> str:
|
||||
"""object type"""
|
||||
if self.is_leaf:
|
||||
return "route" if V4_NET.supernet_of(self.network) \
|
||||
else "route6"
|
||||
|
||||
return "inetnum" if V4_NET.supernet_of(self.network) \
|
||||
else "inet6num"
|
||||
|
||||
|
@ -35,6 +52,9 @@ class NetRecord:
|
|||
|
||||
return self.network.with_prefixlen.replace("/", "_")
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.object_type}/{self.object_name}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class NetList:
|
||||
|
@ -44,6 +64,7 @@ class NetList:
|
|||
level: int
|
||||
net: Optional[NetRecord]
|
||||
nets: List[NET]
|
||||
routes: List[NetRecord] = field(default_factory=list)
|
||||
|
||||
def in_net(self, i: NET) -> Tuple[bool, NET]:
|
||||
"find a network within a list of networks"
|
||||
|
@ -57,39 +78,55 @@ class NetList:
|
|||
|
||||
return found, net
|
||||
|
||||
def in_routes(self, i: NET) -> Tuple[bool, NET]:
|
||||
"find a network within a list of networks"
|
||||
found = False
|
||||
net = None
|
||||
for n in self.routes:
|
||||
if n.network.supernet_of(i):
|
||||
found = True
|
||||
net = n
|
||||
break
|
||||
|
||||
return found, net
|
||||
|
||||
|
||||
class NetTree:
|
||||
"Network Tree"
|
||||
def __init__(self, nets: Optional[List[NET]] = None):
|
||||
def __init__(self,
|
||||
nets: Optional[List[NetRecord]] = None,
|
||||
routes: Optional[List[NetRecord]] = None):
|
||||
self.tree = {} # type: Dict[NET, NetList]
|
||||
if routes is None:
|
||||
routes = []
|
||||
if nets is not None:
|
||||
self.make_tree(nets)
|
||||
self.make_tree(nets, routes)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.tree[key]
|
||||
|
||||
def find_tree(self, ip: NET) -> Tuple[bool, int]:
|
||||
def find_tree(self, ip: str) -> Generator[NetList, None, None]:
|
||||
"""Find net in tree"""
|
||||
net = V6_NET
|
||||
current = self.tree[net]
|
||||
needle = as_net6(ip)
|
||||
|
||||
yield current
|
||||
while True:
|
||||
found, net = current.in_net(ip)
|
||||
if not found:
|
||||
return True, current.level + 1
|
||||
found, net = current.in_net(needle)
|
||||
if found:
|
||||
current = self.tree[net]
|
||||
yield current
|
||||
continue
|
||||
break
|
||||
|
||||
if ip == net:
|
||||
return True, current.level + 2
|
||||
|
||||
current = self.tree[net]
|
||||
continue
|
||||
|
||||
return False, 0
|
||||
|
||||
def make_tree(self, nets: List[NetRecord]):
|
||||
def make_tree(self,
|
||||
nets: List[NetRecord],
|
||||
routes: List[NetRecord]):
|
||||
"""build a network tree index"""
|
||||
root = V6_NET
|
||||
self.tree = {root: NetList(0, None, -1, None, [])}
|
||||
index = 0
|
||||
for index, net in enumerate(sorted(
|
||||
sorted(nets, key=lambda x: x.network),
|
||||
key=lambda x: x.network.prefixlen)):
|
||||
|
@ -110,6 +147,23 @@ class NetTree:
|
|||
index, current.index, current.level + 1, net, [])
|
||||
break
|
||||
|
||||
for index, net in enumerate(sorted(
|
||||
sorted(routes, key=lambda x: x.network),
|
||||
key=lambda x: x.network.prefixlen), index):
|
||||
|
||||
current = self.tree[root]
|
||||
|
||||
while True:
|
||||
found, n = current.in_net(net.network)
|
||||
if found:
|
||||
current = self.tree[n]
|
||||
continue
|
||||
|
||||
rec = NetRecord(net.network, "-", "-", True)
|
||||
current.routes.append(rec)
|
||||
|
||||
break
|
||||
|
||||
def write_csv(self, fn: str = ".netindex"):
|
||||
"write tree to csv"
|
||||
with open(fn, "w") as f:
|
||||
|
@ -135,6 +189,19 @@ class NetTree:
|
|||
v.net.object_type,
|
||||
v.net.object_name,
|
||||
)]) + "\n")
|
||||
for route in v.routes:
|
||||
net_addr = route.network.network_address.exploded
|
||||
net_pfx = route.network.prefixlen
|
||||
yield (
|
||||
"|".join([str(i) for i in (
|
||||
f"{0:04d}|{v.index:04d}|{v.level+1:04d}",
|
||||
net_addr,
|
||||
net_pfx,
|
||||
route.policy,
|
||||
route.status,
|
||||
route.object_type,
|
||||
route.object_name,
|
||||
)]) + "\n")
|
||||
|
||||
@classmethod
|
||||
def read_csv(cls, fn) -> NT:
|
||||
|
@ -146,15 +213,20 @@ class NetTree:
|
|||
if len(sp) != 9:
|
||||
continue
|
||||
net = ip_network(f"{sp[3]}/{sp[4]}")
|
||||
rec = NetRecord(net, sp[5], sp[6])
|
||||
lis = NetList(sp[0], sp[1], sp[2], rec, [])
|
||||
inttree[sp[0]] = lis
|
||||
if sp[0] != sp[1]:
|
||||
inttree[sp[1]].nets.append(net)
|
||||
is_leaf = sp[7] in ("route", "route6")
|
||||
rec = NetRecord(net, sp[5], sp[6], is_leaf)
|
||||
if is_leaf:
|
||||
inttree[sp[1]].routes.append(rec)
|
||||
else:
|
||||
lis = NetList(sp[0], sp[1], sp[2], rec, [])
|
||||
inttree[sp[0]] = lis
|
||||
|
||||
if sp[0] != sp[1]:
|
||||
inttree[sp[1]].nets.append(net)
|
||||
nettree = {}
|
||||
for v in inttree.values():
|
||||
nettree[v.net.network] = v
|
||||
|
||||
c = cls()
|
||||
c.tree = NetTree
|
||||
c.tree = nettree
|
||||
return c
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
"RPSL"
|
||||
|
||||
import os.path
|
||||
from typing import Dict, List, Tuple, TypeVar, Optional, Generator
|
||||
from typing import Dict, List, Tuple, TypeVar, Optional, Sequence
|
||||
|
||||
from .filedom import FileDOM
|
||||
from .nettree import NetTree
|
||||
from .file import FileDOM
|
||||
from .nettree import NetTree, NetList
|
||||
from .schema import SchemaDOM, State
|
||||
from .transact import TransactDOM
|
||||
from .config import Config
|
||||
|
@ -49,7 +49,7 @@ class RPSL:
|
|||
|
||||
def append_index(self, dom: FileDOM):
|
||||
"append files to index"
|
||||
key, value = dom.index
|
||||
key, value = dom.index, (dom.src, ",".join(dom.mntner))
|
||||
self._lookup[key] = value
|
||||
|
||||
def scan_files(self, files: List[FileDOM]) -> State:
|
||||
|
@ -67,24 +67,35 @@ class RPSL:
|
|||
|
||||
def find(self,
|
||||
text: str,
|
||||
schema: Optional[str] = None) -> Generator[FileDOM, None, None]:
|
||||
schema: Optional[str] = None) -> Sequence[str]:
|
||||
"Find files that match text and schema"
|
||||
keys = [(schema, text)]
|
||||
if schema is None:
|
||||
keys = self._lookup.get(text, [])
|
||||
|
||||
return [self._files[i] for i in keys]
|
||||
|
||||
def related(
|
||||
self,
|
||||
key: Tuple[str, str]) -> Sequence[str]:
|
||||
"Get files related to file"
|
||||
related = set()
|
||||
for link in self.links(key):
|
||||
key = (link[1], link[2])
|
||||
related.add(key)
|
||||
|
||||
for i in keys:
|
||||
yield self.load_file(self._files[i])
|
||||
for link in self.links(i):
|
||||
key = (link[1], link[2])
|
||||
related.add(key)
|
||||
return [self._files[i] for i in related]
|
||||
|
||||
for i in related:
|
||||
if i in keys:
|
||||
continue
|
||||
yield self.load_file(self._files[i])
|
||||
def find_network(self, ip: str) -> Sequence[NetList]:
|
||||
"""Find Network in index
|
||||
|
||||
Args:
|
||||
ip (str): ip address
|
||||
|
||||
Returns:
|
||||
Generator[NetList, None, None]: generator of netlists
|
||||
"""
|
||||
return self._nettree.find_tree(ip)
|
||||
|
||||
def load_file(self, fn: str) -> FileDOM:
|
||||
"load file"
|
||||
|
@ -95,6 +106,14 @@ class RPSL:
|
|||
|
||||
return fo
|
||||
|
||||
def load_files(self, fns: Sequence[str]) -> Sequence[NetList]:
|
||||
for fn in fns:
|
||||
yield self.load_file(fn)
|
||||
|
||||
def links(self, key: Tuple[str, str]) -> List[Tuple[str, str]]:
|
||||
"get links"
|
||||
return self._links.get(key, [])
|
||||
|
||||
def schema(self, name: str) -> SchemaDOM:
|
||||
"get schema"
|
||||
return self._schema.get(name)
|
|
@ -6,7 +6,7 @@ from typing import Optional, List, Tuple, Dict, Set, TypeVar
|
|||
|
||||
import log
|
||||
|
||||
from .filedom import FileDOM, Row
|
||||
from .file import FileDOM, Row
|
||||
|
||||
DOM = TypeVar("DOM", bound="FileDOM")
|
||||
STATE = TypeVar("STATE", bound="State")
|
||||
|
|
21
utils/registry/dn42/rpsl/spec.py
Normal file
21
utils/registry/dn42/rpsl/spec.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
"spec"
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Enum
|
||||
|
||||
class Rule:
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class LabelRule(Rule):
|
||||
name: str
|
||||
|
||||
def parse(self, fields: Sequence[str]) -> Optional[Tuple[str, str]]:
|
||||
|
||||
@dataclass
|
||||
class Spec:
|
||||
keys: Dict[str, SpecRule]
|
||||
|
||||
@classmethod
|
||||
def from_dom(cls, dom: file.FileDOM):
|
||||
for key in
|
134
utils/registry/dn42/rpsl/test_file.py
Normal file
134
utils/registry/dn42/rpsl/test_file.py
Normal file
|
@ -0,0 +1,134 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Test FileDOM"""
|
||||
import unittest
|
||||
import inspect
|
||||
from pprint import pprint
|
||||
|
||||
from .filedom import FileDOM
|
||||
|
||||
|
||||
class TestFileDOM(unittest.TestCase):
|
||||
"""Test FileDOM"""
|
||||
|
||||
def test_parse(self):
|
||||
"""Test Parsing"""
|
||||
s = """
|
||||
person: Xuu
|
||||
remarks: test
|
||||
+
|
||||
Multi-Line
|
||||
contact: xmpp:xuu@xmpp.dn42
|
||||
contact: mail:xuu@dn42.us
|
||||
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
|
||||
nic-hdl: XUU-DN42
|
||||
mnt-by: XUU-MNT
|
||||
source: DN42
|
||||
"""
|
||||
s = inspect.cleandoc(s)+"\n"
|
||||
|
||||
dom = FileDOM()
|
||||
dom.parse(s.splitlines())
|
||||
|
||||
self.assertTrue(dom.valid)
|
||||
self.assertEqual(dom.schema, "person")
|
||||
self.assertEqual(dom.get("person"), "Xuu")
|
||||
self.assertEqual(dom.get("contact"), "xmpp:xuu@xmpp.dn42")
|
||||
self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us")
|
||||
self.assertIsNone(dom.get("xxx"))
|
||||
self.assertEqual(dom.get("xxx", default="default"), "default")
|
||||
self.assertEqual(str(dom), s)
|
||||
|
||||
def test_put_values(self):
|
||||
"""Test putting values"""
|
||||
s = """
|
||||
person: Xuu
|
||||
remarks: test
|
||||
contact: xmpp:xuu@xmpp.dn42
|
||||
contact: mail:xuu@dn42.us
|
||||
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
|
||||
nic-hdl: XUU-DN42
|
||||
mnt-by: XUU-MNT
|
||||
source: DN42
|
||||
"""
|
||||
s = inspect.cleandoc(s)+"\n"
|
||||
|
||||
dom = FileDOM()
|
||||
dom.parse(s.splitlines())
|
||||
|
||||
dom.put("source", "SOURIS")
|
||||
self.assertEqual(dom.get("source"), "SOURIS")
|
||||
|
||||
dom.put("contact", "mail:me@sour.is", append=True)
|
||||
self.assertEqual(str(dom.get("contact")), "xmpp:xuu@xmpp.dn42")
|
||||
self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us")
|
||||
self.assertEqual(dom.get("contact", index=2), "mail:me@sour.is")
|
||||
|
||||
def test_parse_ip6address(self):
|
||||
"""Test network ip address parsing"""
|
||||
s = """
|
||||
inet6num: fd00:0000:0000:0000:0000:0000:0000:0000 - fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff
|
||||
cidr: fd00::/8
|
||||
netname: ROOT-DN42-ULA
|
||||
descr: DN42 ULA Address Space
|
||||
status: ALLOCATED
|
||||
policy: open
|
||||
org: ORG-DN42
|
||||
mnt-by: DN42-MNT
|
||||
source: DN42
|
||||
""" # noqa: E501
|
||||
|
||||
s = inspect.cleandoc(s)+"\n"
|
||||
|
||||
dom = FileDOM(text=s.splitlines())
|
||||
|
||||
cidr = dom.get("cidr").as_net
|
||||
self.assertEqual(cidr.compressed, "fd00::/8")
|
||||
self.assertEqual(
|
||||
cidr.exploded, "fd00:0000:0000:0000:0000:0000:0000:0000/8")
|
||||
|
||||
end = cidr.broadcast_address.exploded
|
||||
start = cidr.network_address.exploded
|
||||
|
||||
self.assertEqual(dom.get("inet6num"), f"{start} - {end}")
|
||||
|
||||
def test_parse_ip4address(self):
|
||||
"""Test network ip address parsing"""
|
||||
s = """
|
||||
inetnum: 172.20.0.0 - 172.23.255.255
|
||||
cidr: 172.20.0.0/14
|
||||
netname: ROOT-DN42
|
||||
"""
|
||||
|
||||
s = inspect.cleandoc(s)+"\n"
|
||||
|
||||
dom = FileDOM(text=s.splitlines())
|
||||
|
||||
cidr = dom.get("cidr").as_net
|
||||
self.assertEqual(cidr.compressed, "172.20.0.0/14")
|
||||
self.assertEqual(
|
||||
cidr.exploded, "172.20.0.0/14")
|
||||
|
||||
end = cidr.broadcast_address.exploded
|
||||
start = cidr.network_address.exploded
|
||||
|
||||
self.assertEqual(dom.get("inetnum"), f"{start} - {end}")
|
||||
|
||||
@unittest.skip
|
||||
def test_bad_parse(self):
|
||||
"""bad parse stuff"""
|
||||
s = """
|
||||
person: Xuu
|
||||
EXTRA
|
||||
:
|
||||
source: DN42
|
||||
"""
|
||||
s = inspect.cleandoc(s)+"\n"
|
||||
|
||||
dom = FileDOM()
|
||||
dom.parse(s.splitlines())
|
||||
pprint(dom.dom)
|
||||
self.assertEqual(str(dom), s)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
from typing import Sequence, List, Optional, Tuple, TypeVar
|
||||
|
||||
from .filedom import FileDOM
|
||||
from .file import FileDOM
|
||||
from .schema import SchemaDOM
|
||||
|
||||
DOM = TypeVar("DOM", bound="TransactDOM")
|
||||
|
|
|
@ -40,12 +40,13 @@ def run(args: List[str], env: Dict[str, str]) -> int:
|
|||
idx = index_files(path,
|
||||
namespace=config.namespace,
|
||||
primary_keys=config.primary_keys)
|
||||
lookup, schemas, files, nets = build_index(idx, rspl=config)
|
||||
lookup, schemas, files, nets, routes = build_index(idx, rspl=config)
|
||||
|
||||
print(
|
||||
f"Reading Files: done! files: {len(files)}" +
|
||||
f" schemas: {len(schemas)}" +
|
||||
f" networks: {len(nets)}",
|
||||
f" routes: {len(routes)}",
|
||||
file=sys.stderr)
|
||||
|
||||
print("Writing .rpsl/index", file=sys.stderr)
|
||||
|
@ -71,7 +72,7 @@ def run(args: List[str], env: Dict[str, str]) -> int:
|
|||
file=link_out)
|
||||
|
||||
print("Generate .rpsl/nettree", file=sys.stderr)
|
||||
tree = NetTree(nets)
|
||||
tree = NetTree(nets, routes)
|
||||
|
||||
print("Writing .rpsl/nettree", file=sys.stderr)
|
||||
tree.write_csv(".rpsl/nettree")
|
||||
|
@ -105,17 +106,19 @@ def build_index(
|
|||
schemas = {} # type: Dict[str, SchemaDOM]
|
||||
files = [] # type: List[FileDOM]
|
||||
nets = [] # type: List[NetRecord]
|
||||
routes = [] # type: List[NetRecord]
|
||||
|
||||
print(r"Reading Files...", end="\r", flush=True, file=sys.stderr)
|
||||
|
||||
net_types = rspl.network_parents
|
||||
net_leafs = rspl.network_children
|
||||
|
||||
for (i, dom) in enumerate(idx):
|
||||
if not dom.valid:
|
||||
print("E", end="", flush=True)
|
||||
continue
|
||||
|
||||
key, _ = dom.index
|
||||
key = dom.index
|
||||
lookup.add(key)
|
||||
files.append(dom)
|
||||
|
||||
|
@ -130,14 +133,23 @@ def build_index(
|
|||
dom.get("status", default="ASSIGNED"),
|
||||
))
|
||||
|
||||
if dom.schema in net_leafs:
|
||||
routes.append(NetRecord(
|
||||
dom.get(dom.primary_key).as_net6,
|
||||
dom.get("policy", default="none"),
|
||||
dom.get("status", default="none"),
|
||||
True,
|
||||
))
|
||||
|
||||
if i % 120 == 0:
|
||||
print(
|
||||
f"Reading Files: files: {len(files)}" +
|
||||
f" schemas: {len(schemas)} " +
|
||||
f" networks: {len(nets)}",
|
||||
f" routes: {len(routes)}",
|
||||
end="\r", flush=True, file=sys.stderr)
|
||||
|
||||
return (lookup, schemas, files, nets)
|
||||
return (lookup, schemas, files, nets, routes)
|
||||
|
||||
|
||||
def generate_links(
|
||||
|
@ -147,15 +159,12 @@ def generate_links(
|
|||
) -> Generator[Tuple[str, str, str], None, None]:
|
||||
"print file links out to file"
|
||||
for (link, refs) in links.items():
|
||||
d = dom.get(link)
|
||||
if d is None:
|
||||
continue
|
||||
for d in dom.get_all(link):
|
||||
found = False
|
||||
for ref in refs:
|
||||
if (ref, d.value) in lookup:
|
||||
found = True
|
||||
yield (link, ref, d)
|
||||
|
||||
found = False
|
||||
for ref in refs:
|
||||
if (ref, d.value) in lookup:
|
||||
found = True
|
||||
yield (link, ref, d)
|
||||
|
||||
if not found:
|
||||
print(f"{dom.name} missing link {link} {d.value}")
|
||||
if not found:
|
||||
print(f"{dom.name} missing link {link} {d.value}")
|
||||
|
|
|
@ -63,6 +63,7 @@ def run(args: List[str], env: Dict[str, str]) -> int:
|
|||
print(rpsl, file=f)
|
||||
|
||||
print(f"Created: {rpsl.config_file}", file=sys.stderr)
|
||||
env["RPSL_DIR"] = rpsl_dir
|
||||
rpsl_index.run(args, env)
|
||||
|
||||
return 0
|
||||
|
|
|
@ -6,10 +6,10 @@ Usage: rpsl whois [text]
|
|||
"""
|
||||
|
||||
import sys
|
||||
from ipaddress import ip_network
|
||||
from typing import List, Dict, Optional
|
||||
from itertools import chain
|
||||
from typing import List, Dict, Optional, Set, Tuple
|
||||
|
||||
from dn42.rpsl import RPSL, Config
|
||||
from dn42.rpsl import RPSL, Config, FileDOM, as_net6
|
||||
from dn42.utils import shift, exists
|
||||
|
||||
|
||||
|
@ -41,15 +41,46 @@ def run(args: List[str], env: Dict[str, str]) -> int:
|
|||
|
||||
ip = None
|
||||
try:
|
||||
ip = ip_network(text)
|
||||
ip = as_net6(text)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if ip is not None:
|
||||
print(f"Searching network {text}...")
|
||||
return 0
|
||||
principle = [] # type: List[FileDOM]
|
||||
related_nets = [] # type: List[FileDOM]
|
||||
related_idx = set() # type: Set[Tuple[str, str]]
|
||||
|
||||
for dom in rpsl.find(text, schema):
|
||||
if ip is not None:
|
||||
print(f"# Searching network {text}...")
|
||||
nets = list(rpsl.find_network(text))
|
||||
last_net = nets[-1]
|
||||
dom = rpsl.load_file(str(last_net.net))
|
||||
principle.append(dom)
|
||||
related_idx.add(dom.index)
|
||||
ok, route = last_net.in_routes(ip)
|
||||
if ok:
|
||||
dom = rpsl.load_file(str(route))
|
||||
principle.append(dom)
|
||||
related_idx.add(dom.index)
|
||||
|
||||
for net in nets[:-1]:
|
||||
dom = rpsl.load_file(str(net.net))
|
||||
related_nets.append(dom)
|
||||
else:
|
||||
for dom in rpsl.find(text, schema):
|
||||
principle.append(dom)
|
||||
related_idx.add(dom.index)
|
||||
|
||||
print("# Found objects")
|
||||
for dom in principle:
|
||||
print(dom)
|
||||
|
||||
if len(related_nets) > 0:
|
||||
print("# Related Networks")
|
||||
for dom in related_nets:
|
||||
print(dom)
|
||||
|
||||
print("# Related objects")
|
||||
lis = set(chain.from_iterable(rpsl.related(i) for i in related_idx))
|
||||
for dom in rpsl.load_files(sorted(lis)):
|
||||
print(dom)
|
||||
return 0
|
||||
|
|
|
@ -1,70 +0,0 @@
|
|||
#!/usr/bin/env python3
|
||||
"""Scans Registry at given path for issues"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from typing import Dict
|
||||
|
||||
from dom.filedom import FileDOM
|
||||
from dom.schema import SchemaDOM
|
||||
|
||||
|
||||
def index_files(path: str):
|
||||
"""generate list of dom files"""
|
||||
for root, _, files in os.walk(path):
|
||||
if root == path:
|
||||
continue
|
||||
|
||||
for f in files:
|
||||
if f[0] == ".":
|
||||
continue
|
||||
|
||||
dom = FileDOM.from_file(os.path.join(root, f))
|
||||
|
||||
yield dom
|
||||
|
||||
|
||||
def run(path: str = "."):
|
||||
"""run main script"""
|
||||
idx = index_files(path)
|
||||
|
||||
lookups = {} # type: Dict[str, FileDOM]
|
||||
schemas = {} # type: Dict[str, SchemaDOM]
|
||||
files = []
|
||||
|
||||
print(r"Reading Files...", end="\r", flush=True, file=sys.stderr)
|
||||
|
||||
for (i, dom) in enumerate(idx):
|
||||
if not dom.valid:
|
||||
print("E", end="", flush=True)
|
||||
continue
|
||||
|
||||
key, value = dom.index
|
||||
lookups[key] = value
|
||||
files.append(dom)
|
||||
|
||||
if dom.schema == "schema":
|
||||
schema = SchemaDOM(dom)
|
||||
schemas[schema.ref] = schema
|
||||
|
||||
if i % 120 == 0:
|
||||
print(
|
||||
f"Reading Files: files: {len(files)} schemas: {len(schemas)}",
|
||||
end="\r", flush=True, file=sys.stderr)
|
||||
|
||||
print(
|
||||
f"Reading Files: done! files: {len(files)}, schemas: {len(schemas)}",
|
||||
file=sys.stderr)
|
||||
|
||||
for dom in files:
|
||||
s = schemas.get(dom.rel)
|
||||
if s is None:
|
||||
print(f"{dom.src} schema not found for {dom.rel}")
|
||||
continue
|
||||
|
||||
status = s.check_file(dom, lookups)
|
||||
status.print()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run(sys.argv[1] if len(sys.argv) > 1 else os.getcwd())
|
Loading…
Add table
Reference in a new issue