move init to look to schema/SCHEMA-SCHEMA

This commit is contained in:
Jonathan Lundy 2020-06-26 12:11:04 -06:00
parent 78421585a5
commit af46a7c4ff
No known key found for this signature in database
GPG key ID: C63E6D61F3035024
9 changed files with 74 additions and 47 deletions

View file

@ -1,5 +1,6 @@
schema: DNS-SCHEMA
ref: dn42.domain
dir-name: dns
key: domain required single primary schema
key: nserver required multiple > [domain-name] [ip-addr]
key: descr optional single
@ -8,7 +9,7 @@ key: admin-c optional multiple lookup=dn42.person,dn42.role
key: tech-c optional multiple lookup=dn42.person,dn42.role
key: org optional multiple lookup=dn42.organisation
key: country optional single
key: ds-rdata optional multiple
key: ds-rdata optional multiple
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT

View file

@ -1,16 +1,15 @@
schema: SCHEMA-SCHEMA
namespace: dn42
ref: dn42.schema
primary-key: inetnum cidr
primary-key: inet6num cidr
primary-key: role nic-hdl
primary-key: person nic-hdl
namespace-owners: mntner
ref: dn42.schema
owners: mntner
key: schema required single primary schema > [name]
key: ref required single > [schema]
key: namespace required single
key: primary-key optional multiple > [schema] [key]
key: namespace-owners required single > [schema]
key: dir-name optional single
key: owners optional single > [schema]
key: mnt-by required multiple lookup=dn42.mntner > [mntner]
key: remarks optional multiple > [text]...
key: source required single lookup=dn42.registry

View file

@ -26,12 +26,12 @@ class Config:
@property
def schema(self) -> str:
"Get schema type name"
return self._dom.get("namespace-schema", default="schema").value
return str(self._dom.get("schema", default="schema"))
@property
def owners(self) -> str:
"Get owner type name"
return self._dom.get("namespace-owner", default="mntner").value
return str(self._dom.get("owner", default="mntner"))
@property
def source(self) -> str:
@ -111,35 +111,33 @@ class Config:
def build(cls, # pylint: disable=too-many-arguments
path: str,
namespace: str = "dn42",
schema: str = "schema",
owners: str = "mntner",
default_owner: str = "DN42-MNT",
schema: str = "schema",
source: str = "DN42",
dir_name: Optional[Set[Tuple[str, str]]] = None,
primary_keys: Optional[Set[Tuple[str, str]]] = None,
network_owners: Optional[Set[Tuple[str, str]]] = None,
source: str = "DN42") -> FileDOM:
) -> FileDOM:
"Build config from parameters"
FileDOM.namespace = namespace
dom = FileDOM()
dom.src = os.path.join(path, ".rpsl/config")
dom.put("namespace", namespace)
dom.put("namespace-schema", schema)
dom.put("namespace-owners", owners)
dom.put("default-owner", default_owner)
dom.put("owners", owners)
dom.put("schema", schema)
dom.put("source", source)
for (k, v) in dir_name:
dom.put("dir-name", f"{k} {v}", append=True)
for (k, v) in primary_keys:
dom.put("primary-key", f"{k} {v}", append=True)
for (k, v) in network_owners:
dom.put("network-owner", f"{v} {k}", append=True)
dom.put("mnt-by", default_owner)
dom.put("source", source)
return cls(dom)
def __init__(self, dom: FileDOM):
self._dom = dom
self.path = os.path.dirname(os.path.dirname(dom.src))
# TODO: bit of a smelly side effect. Need something better.
FileDOM.namespace = self.namespace
FileDOM.primary_keys = self.primary_keys
def __str__(self):
return self._dom.__str__()

View file

@ -74,9 +74,6 @@ class Row(NamedTuple):
class FileDOM:
"""Parses a reg file"""
namespace: str = "dn42"
primary_keys: Dict[str, str] = {}
def __init__(self,
text: Optional[Sequence[str]] = None,
src: Optional[str] = None):
@ -85,6 +82,9 @@ class FileDOM:
self.keys = {} # type: Dict[str, int]
self.multi = {} # type: Dict[str, int]
self.mntner = [] # type: List[str]
self.namespace = ""
self.primary_key = ""
self.src = src
if text is not None:
@ -145,6 +145,8 @@ class FileDOM:
mntner.append(dom[-1][1])
self.dom = [Row(k, Value(v), n, self.src) for k, v, n in dom]
if len(self.dom) > 1:
self.primary_key = self.dom[0].key
self.keys = keys
self.multi = multi
self.mntner = mntner
@ -163,8 +165,8 @@ class FileDOM:
@property
def name(self) -> str:
"""return the friendly name for file"""
if self.schema in FileDOM.primary_keys:
return self.get(FileDOM.primary_keys[self.schema]).value
if self.primary_key != "":
return self.get(self.primary_key).value
if len(self.dom) < 1:
return "none"
@ -178,13 +180,13 @@ class FileDOM:
@property
def rel(self) -> str:
"generate rel for schema ref"
return f"{FileDOM.namespace}.{self.schema}"
return f"{self.namespace}.{self.schema}"
@property
def index(self) -> Tuple[Tuple[str, str], Tuple[str, str]]:
"""generate index key/value pair"""
name = self.src.split("/")[-1].replace("_", "/")
return ((f"{FileDOM.namespace}.{self.schema}", name),
return ((f"{self.namespace}.{self.schema}", name),
(self.src, ",".join(self.mntner)))
def __str__(self):
@ -195,7 +197,9 @@ class FileDOM:
s = ""
for i in self.dom:
sp = i.value.lines
if len(sp) == 0:
s += i.key + ":" + " " * (length - len(i.key)) + "\n"
continue
s += i.key + ":" + " " * (length - len(i.key)) + sp[0] + "\n"
for m in sp[1:]:
if m == "":
@ -245,16 +249,17 @@ class FileDOM:
return dom
def index_files(path: str) -> FileDOM:
def index_files(path: str,
namespace: str,
primary_keys: Dict[str, str]) -> FileDOM:
"""generate list of dom files"""
for root, _, files in os.walk(path):
if root == path:
continue
if root.endswith(".rpsl"):
dom = FileDOM.from_file(os.path.join(root, "config"))
yield dom
continue
for f in files:
dom = FileDOM.from_file(os.path.join(root, f))
dom.namespace = namespace
yield dom

View file

@ -78,7 +78,12 @@ class RPSL:
def load_file(self, fn: str) -> FileDOM:
"load file"
fn = os.path.join(self._config.path, fn)
return FileDOM.from_file(fn)
fo = FileDOM.from_file(fn)
fo.namespace = self._config.namespace
fo.primary_keys = self._config.primary_keys
return fo
def links(self, key: Tuple[str, str]) -> List[Tuple[str, str]]:
"get links"
return self._links.get(key, [])

View file

@ -80,6 +80,15 @@ class SchemaDOM:
"return schema links"
return self._links
@property
def namespace(self) -> str:
"get namespace"
ns = "default"
ref = self._dom.get("ref")
if ref is not None:
ns = ref.value.split(".")[0]
return ns
def parse(self, f: FileDOM):
"""Parse a FileDOM into a SchemaDOM"""
self.src = self.src if f.src is None else f.src

View file

@ -38,7 +38,9 @@ def run(args: List[str], env: Dict[str, str]) -> int:
print(r"Reading Files...", end="\r", flush=True, file=sys.stderr)
idx = index_files(path)
idx = index_files(path,
namespace=config.namespace,
primary_keys=config.primary_keys)
lookup, schemas, files, nets = build_index(idx, rspl=config)
print(

View file

@ -25,10 +25,8 @@ Group = TypeVar("Group", set, tuple)
parser = argparse.ArgumentParser()
parser.add_argument("--namespace", type=str, default=None)
parser.add_argument("--schema", type=str, default="schema")
parser.add_argument("--owners", type=str, default="mntner")
parser.add_argument("--default-owner", type=str, default="DN42-MNT")
parser.add_argument("--source", type=str, default="DN42")
parser.add_argument("--schema", type=str, default="schema")
parser.add_argument("--force", action='store_true')
@ -44,18 +42,18 @@ def run(args: List[str], env: Dict[str, str]) -> int:
return 1
rpsl_dir = env.get("WORKING_DIR")
schema_dir = os.path.join(rpsl_dir, "schema")
schema_dir = os.path.join(rpsl_dir, opts.schema)
network_owners, primary_keys, dir_name = {}, {}, {}
network_owners, primary_keys = {}, {}
if os.path.exists(schema_dir):
network_owners, primary_keys = _parse_schema(schema_dir)
print(rpsl_dir)
ns, network_owners, primary_keys, dir_name = \
_parse_schema(schema_dir, opts.namespace)
rpsl = Config.build(path=rpsl_dir,
namespace=opts.namespace,
namespace=ns,
schema=opts.schema,
owners=opts.owners,
source=opts.source,
default_owner=opts.default_owner,
dir_name=dir_name,
network_owners=network_owners,
primary_keys=primary_keys)
@ -75,17 +73,27 @@ def _read_schemas(path: str) -> Generator[SchemaDOM, None, None]:
yield schema
def _parse_schema(path: str) -> Tuple[Group, Group]:
def _parse_schema(path: str, ns: str) -> Tuple[str, Group, Group, Group]:
schemas = _read_schemas(path)
namespace = ns
network_owner = set() # type: Set[str, str]
primary_key = set() # type: Set[str, str]
dir_name = set() # type: Set[str, str]
for s in schemas:
if s.type == "schema":
if s.namespace != namespace:
namespace = s.namespace
for i in s.dom.get_all("network-owner"):
network_owner.add((s.type, i.value))
d = s.dom.get("dir-name")
if d is not None:
dir_name.add((s.type, d.value))
if s.primary != s.type:
primary_key.add((s.type, s.primary))
return network_owner, primary_key
return namespace, network_owner, primary_key, dir_name

View file

@ -40,7 +40,7 @@ def run(args: List[str], env: Dict[str, str]) -> int:
return 1
rpsl = RPSL(config)
files = _file_gen(path, opts, wd=env.get("WORKING_DIR"))
files = _file_gen(path, opts, wd=env.get("WORKING_DIR"), config=config)
if opts.add_index:
files, g = [], files
@ -56,11 +56,11 @@ def run(args: List[str], env: Dict[str, str]) -> int:
return 0 if status else 1
def _file_gen(path, opts: argparse.Namespace, wd: str):
def _file_gen(path, opts: argparse.Namespace, wd: str, config: Config):
if opts.scan_dir is not None:
path = os.path.join(wd, opts.scan_dir)
elif opts.scan_file is not None:
path = os.path.join(wd, opts.scan_file)
return TransactDOM.from_file(path).files
return index_files(path)
return index_files(path, config.namespace, config.primary_keys)