diff --git a/.gitignore b/.gitignore index 706f97b13..e5a5e2856 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ _MTN lib/ whoisd/ -utils/ -!utils/schema-check/*.py +/utils/ +!/utils/schema-check +!/utils/registry +__pycache__ \ No newline at end of file diff --git a/utils/registry/dom/__init__.py b/utils/registry/dom/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/utils/registry/dom/filedom.py b/utils/registry/dom/filedom.py new file mode 100644 index 000000000..2f3449de5 --- /dev/null +++ b/utils/registry/dom/filedom.py @@ -0,0 +1,176 @@ +"""FileDOM parse and formating""" + +import re +from dataclasses import dataclass +from typing import Sequence, NamedTuple, List, Dict, Optional, Union +import ipaddress + +import log + + +@dataclass +class Value: + """Dom Value""" + value: str + + def __eq__(self, other: str) -> bool: + return self.value == other + + def __str__(self) -> str: + return self.value + + def lines(self) -> List[str]: + """return value split into lines""" + return self.value.splitlines() + + def fields(self) -> List[str]: + """return value split into fields""" + return self.value.split() + + def as_ip(self) -> Union[ipaddress.IPv4Address, ipaddress.IPv6Address]: + """return value as an ip address""" + return ipaddress.ip_address(self.value) + + def as_net(self) -> Union[ipaddress.IPv4Network, ipaddress.IPv6Network]: + """return value as an ip network""" + return ipaddress.ip_network(self.value) + + def as_key(self) -> str: + """Format as key name""" + return self.value.replace("/", "_").replace(" ", "") + + +class Row(NamedTuple): + """DOM Row""" + key: str + value: Value + lineno: int + src: str = None + + def loc(self) -> str: + """format as location""" + s = f"{self.src} Line {self.lineno} " + s += "" if self.key == "" else f"Key [{self.key}]:" + return s + + +class FileDOM: + """Parses a reg file""" + + def __init__(self, src: Optional[str] = None): + self.valid = False + self.dom = [] # type: List[Row] + self.keys = {} # type: Dict[str, int] + self.multi = {} # type: Dict[str, int] + self.mntner = [] # type: List[str] + self.schema = None # type: Optional[str] + self.src = src + + def parse(self, input_str: Sequence[str], src: Optional[str] = None): + """Parse an input string generator""" + dom = [] + keys = {} + multi = {} + mntner = [] + last_multi = None + self.valid = True + self.src = self.src if src is None else src + + for lineno, i in enumerate(input_str, 1): + # print(lineno, i) + if re.match(r'[ \t]', i): + if len(dom) == 0: + log.error(f"File {src} does not parse properly") + self.valid = False + return + + dom[-1][1] += "\n" + i.strip() + + if dom[-1][0] not in multi: + multi[dom[-1][0]] = [] + + if last_multi is None: + multi[dom[-1][0]].append(lineno) + last_multi = dom[-1][0] + + else: + if i[0] == '+': + dom[-1][1] += "\n" + + if dom[-1][0] not in multi: + multi[dom[-1][0]] = [] + + if last_multi is None: + multi[dom[-1][0]].append(lineno) + last_multi = dom[-1][0] + + i = i.split(":") + if len(i) < 2: + continue + + dom.append([i[0].strip(), ':'.join( + i[1:]).strip(), lineno - 1]) + + if i[0].strip() not in keys: + keys[i[0].strip()] = [] + + keys[i[0].strip()].append(len(dom) - 1) + + last_multi = None + + if dom[-1][0] == 'mnt-by': + mntner.append(dom[-1][1]) + + self.dom = [Row(k, Value(v), n) for k, v, n in dom] + self.keys = keys + self.multi = multi + self.mntner = mntner + self.schema = self.dom[0].key + + def __str__(self): + length = 19 + for i in self.dom: + if len(i.key) > length: + length = len(i.key) + 2 + s = "" + for i in self.dom: + sp = i.value.lines() + + s += i.key + ":" + " " * (length - len(i.key)) + sp[0] + "\n" + for m in sp[1:]: + if m == "": + s += "+\n" + continue + s += " " * (length + 1) + m + "\n" + + return s + + def get(self, key, index=0, default=None): + """Get a key value""" + if key not in self.keys: + return default + if index >= len(self.keys[key]) or index <= -len(self.keys[key]): + return default + + return self.dom[self.keys[key][index]].value + + def put(self, key, value, index=0, append=False): + """Put a value""" + if key not in self.keys: + self.keys[key] = [] + + i = (self.keys[key][index:index+1] or (None,))[0] + if i is None or append: + i = len(self.dom) + self.dom.append(Row(key, Value(value), i)) + elif i is not None: + self.dom[i] = Row(key, Value(value), i) + + if index not in self.keys[key]: + self.keys[key].append(i) + + +def read_file(fn: str) -> FileDOM: + """Parses FileDOM from file""" + with open(fn, mode='r', encoding='utf-8') as f: + return FileDOM().parse(f.readlines()) diff --git a/utils/registry/dom/schema.py b/utils/registry/dom/schema.py new file mode 100644 index 000000000..dd182584e --- /dev/null +++ b/utils/registry/dom/schema.py @@ -0,0 +1,244 @@ +"""Schema DOM""" +import re +from dataclasses import dataclass, field +from enum import Enum +from typing import Optional, List, Tuple + +import log + +from .filedom import FileDOM, Row + + +SCHEMA_NAMESPACE = "dn42." + + +class Level(Enum): + """State error level""" + info = 1 + warning = 2 + error = 3 + + +@dataclass +class State: + """State of schema check + """ + state: bool = True + msgs: List[Tuple[Level, Row, str]] = field(default_factory=list) + + def __eq__(self, other: bool) -> bool: + return self.state == other + + def __bool__(self): + return self.state + + def __str__(self) -> str: + return "PASS" if self.state else "FAIL" + + def print(self): + """print out state info""" + for (level, row, msg) in self.msgs: + if level == Level.info: + log.info(f"{row.loc()} {msg}") + elif level == Level.warning: + log.warning(f"{row.loc()} {msg}") + elif level == Level.error: + log.error(f"{row.loc()} {msg}") + + def info(self, r: Row, s: str): + """Add warning""" + self.msgs.append((Level.info, r, s)) + + def warning(self, r: Row, s: str): + """Add warning""" + self.msgs.append((Level.warning, r, s)) + + def error(self, r: Row, s: str): + """Add error""" + self.state = False + self.msgs.append((Level.error, r, s)) + + +class SchemaDOM: + """Schema DOM""" + def __init__(self, src: Optional[str] = None): + self.valid = False + self.name = None + self.ref = None + self.primary = None + self.type = None + self.src = src + self.schema = {} + + def parse(self, f: FileDOM): + """Parse a FileDOM into a SchemaDOM""" + + self.src = self.src if f.src is None else f.src + + schema = {} + for row in f.dom: + if row.key == 'ref': + self.ref = str(row.value) + elif row.key == 'schema': + self.name = str(row.value) + + if row.key != 'key': + continue + + lines = row.value.fields() + key = lines.pop(0) + + schema[key] = set() + for i in lines: + if i == ">": + break + + schema[key].add(i) + + schema = self._process_schema(schema) + + self.valid = True + self.schema = schema + return schema + + def _process_schema(self, schema): + for k, v in schema.items(): + if 'schema' in v: + self.type = k + + if 'primary' in v: + self.primary = k + schema[k].add("oneline") + if "multiline" in v: + schema[k].remove("multiline") + schema[k].add("single") + if "multiple" in v: + schema[k].remove("multiple") + schema[k].add("required") + if "optional" in v: + schema[k].remove("optional") + if "recommend" in v: + schema[k].remove("recommend") + if "deprecate" in v: + schema[k].remove("deprecate") + + if 'oneline' not in v: + schema[k].add("multiline") + if 'single' not in v: + schema[k].add("multiple") + + return schema + + def check_file(self, f: FileDOM, lookups=None) -> State: + """Check a FileDOM for correctness(tm)""" + state = State() + + if not f.valid: + state.error(Row("", "", 0, f.src), "file does not parse") + + state = self._check_file_structure(state, f) + state = self._check_file_values(state, f, lookups) + state = inetnum_check(state, f) + + print("CHECK\t%-54s\t%s\tMNTNERS: %s" % + (f.src, state, ','.join(f.mntner))) + + return state + + def _check_file_structure(self, state: State, f: FileDOM) -> State: + for k, v in self.schema.items(): + row = Row(k, "", 0, f.src) + if 'required' in v and k not in f.keys: + state.error(row, "not found and is required") + elif 'recommend' in v and k not in f.keys: + state.info(row, "not found and is recommended") + + if 'schema' in v and SCHEMA_NAMESPACE + f.dom[0].key != self.ref: + state.error(row, "not found and is required as the first line") + + if 'single' in v and k in f.keys and len(f.keys[k]) > 1: + state.warning(row, "first defined here and has repeated keys") + for i in f.keys[k][1:]: + state.error(row, f"repeated on {i} can only appear once") + + if 'oneline' in v and k in f.multi: + for i in f.keys[k]: + state.error(row, "can not have multiple lines") + + return state + + def _check_file_values(self, + state: State, + f: FileDOM, + lookups: Optional[List[Tuple[str, str]]] = None + ) -> State: + for row in f.dom: + c = row.value.as_key() + + src = "None" if f.src is None else f.src + if row.key == self.primary and not src.endswith(c): + state.error(row, + f"primary [{row.value}]" + + f" does not match filename [{src}].") + + if row.key.startswith("x-"): + state.info(row, "is user defined") + + elif row.key not in self.schema: + state.error(row, "not in schema") + continue + else: + if 'deprecate' in self.schema[row.key]: + state.info(row, "was found and is deprecated") + + if lookups is not None: + state = self._check_file_lookups(state, row, lookups) + + return state + + def _check_file_lookups(self, + state: State, + row: Row, + lookups: List[Tuple[str, str]] = None + ) -> State: + for o in self.schema[row.key]: + if o.startswith("lookup="): + refs = o.split("=", 2)[1].split(",") + val = row.value.fields()[0] + found = False + for ref in refs: + if (ref, val) in lookups: + found = True + if not found: + state.error(row, + f"references object {val} " + + f"in {refs} but does not exist.") + return state + + +def read_file(src: str) -> SchemaDOM: + """Parses SchemaDOM from file""" + with open(src, mode='r', encoding='utf-8') as f: + dom = FileDOM(src=src) + dom.parse(f.readlines()) + + return SchemaDOM().parse(dom) + + +def inetnum_check(state: State, dom: FileDOM) -> State: + """Sanity Check for checking the inet[6]num value""" + if dom.schema == "inetnum" or dom.schema == "inet6num": + cidr = dom.get("cidr").as_net() + Lnet = cidr.network_address.exploded + Hnet = cidr.broadcast_address.exploded + + cidr_range = f"{Lnet}-{Hnet}" + file_range = dom.get(dom.schema) + file_range = re.sub(r"\s+", "", str(file_range), flags=re.UNICODE) + + if cidr_range != file_range: + state.error(Row("", "", 0, dom.src), + f"inetnum range [{file_range}] " + + f"does not match: [{cidr_range}]") + + return state diff --git a/utils/registry/dom/test_filedom.py b/utils/registry/dom/test_filedom.py new file mode 100644 index 000000000..f3ca42e5e --- /dev/null +++ b/utils/registry/dom/test_filedom.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +"""Test FileDOM""" +import unittest +import inspect +from pprint import pprint + +from .filedom import FileDOM + + +class TestFileDOM(unittest.TestCase): + """Test FileDOM""" + + def test_parse(self): + """Test Parsing""" + s = """ + person: Xuu + remarks: test + + + Multi-Line + contact: xmpp:xuu@xmpp.dn42 + contact: mail:xuu@dn42.us + pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F + nic-hdl: XUU-DN42 + mnt-by: XUU-MNT + source: DN42 + """ + s = inspect.cleandoc(s)+"\n" + + dom = FileDOM() + dom.parse(s.splitlines()) + + self.assertTrue(dom.valid) + self.assertEqual(dom.schema, "person") + self.assertEqual(dom.get("person"), "Xuu") + self.assertEqual(dom.get("contact"), "xmpp:xuu@xmpp.dn42") + self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us") + self.assertIsNone(dom.get("xxx")) + self.assertEqual(dom.get("xxx", default="default"), "default") + self.assertEqual(str(dom), s) + + def test_put_values(self): + """Test putting values""" + s = """ + person: Xuu + remarks: test + contact: xmpp:xuu@xmpp.dn42 + contact: mail:xuu@dn42.us + pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F + nic-hdl: XUU-DN42 + mnt-by: XUU-MNT + source: DN42 + """ + s = inspect.cleandoc(s)+"\n" + + dom = FileDOM() + dom.parse(s.splitlines()) + + dom.put("source", "SOURIS") + self.assertEqual(dom.get("source"), "SOURIS") + + dom.put("contact", "mail:me@sour.is", append=True) + self.assertEqual(str(dom.get("contact")), "xmpp:xuu@xmpp.dn42") + self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us") + self.assertEqual(dom.get("contact", index=2), "mail:me@sour.is") + + def test_parse_ip6address(self): + """Test network ip address parsing""" + s = """ + inet6num: fd00:0000:0000:0000:0000:0000:0000:0000 - fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff + cidr: fd00::/8 + netname: ROOT-DN42-ULA + descr: DN42 ULA Address Space + status: ALLOCATED + policy: open + org: ORG-DN42 + mnt-by: DN42-MNT + source: DN42 + """ # noqa: E501 + + s = inspect.cleandoc(s)+"\n" + + dom = FileDOM() + dom.parse(s.splitlines()) + + cidr = dom.get("cidr").as_net() + self.assertEqual(cidr.compressed, "fd00::/8") + self.assertEqual( + cidr.exploded, "fd00:0000:0000:0000:0000:0000:0000:0000/8") + + end = cidr.broadcast_address.exploded + start = cidr.network_address.exploded + + self.assertEqual(dom.get("inet6num"), f"{start} - {end}") + + def test_parse_ip4address(self): + """Test network ip address parsing""" + s = """ + inetnum: 172.20.0.0 - 172.23.255.255 + cidr: 172.20.0.0/14 + netname: ROOT-DN42 + """ + + s = inspect.cleandoc(s)+"\n" + + dom = FileDOM() + dom.parse(s.splitlines()) + + cidr = dom.get("cidr").as_net() + self.assertEqual(cidr.compressed, "172.20.0.0/14") + self.assertEqual( + cidr.exploded, "172.20.0.0/14") + + end = cidr.broadcast_address.exploded + start = cidr.network_address.exploded + + self.assertEqual(dom.get("inetnum"), f"{start} - {end}") + + @unittest.skip + def test_bad_parse(self): + """bad parse stuff""" + s = """ + person: Xuu + EXTRA + : + source: DN42 + """ + s = inspect.cleandoc(s)+"\n" + + dom = FileDOM() + dom.parse(s.splitlines()) + pprint(dom.dom) + self.assertEqual(str(dom), s) + + +if __name__ == '__main__': + unittest.main() diff --git a/utils/registry/dom/test_schema.py b/utils/registry/dom/test_schema.py new file mode 100644 index 000000000..b0ceb0f6c --- /dev/null +++ b/utils/registry/dom/test_schema.py @@ -0,0 +1,280 @@ +"""Test SchemaDOM""" + +import inspect +import unittest + +from .schema import SchemaDOM +from .filedom import FileDOM + + +def clean(s: str) -> str: + "Clean input for use" + return inspect.cleandoc(s) + "\n" + + +test_files = [ + ("SCHEMA-SCHEMA", clean( + r""" + schema: SCHEMA-SCHEMA + ref: dn42.schema + key: schema required single primary schema > [name] + key: ref required single > [schema] + key: key required multiple > [key-name] + {required|optional|recommend|deprecate} + {single|multiple} {primary|} {schema|} + lookup=str '>' [spec]... + key: mnt-by required multiple lookup=dn42.mntner > [mntner] + key: remarks optional multiple > [text]... + key: source required single lookup=dn42.registry + mnt-by: DN42-MNT + source: DN42 + remarks: # option descriptions + Attribute names must match /[a-zA-Z]([a-zA-Z0-9_\-]*[a-zA-Z0-9])?/. + + + required + : object required to have at least one + optional + : object not required to have at least one + + + single + : only one of this type allowed + multiple + : more than one of this type allowed + + + primary + : use field as lookup key for lookup + * only one allowed per schema + * does not allow newlines + + + schema + : use field name as the name of the schema + * only one allowed per schema + * does not allow newlines + + + lookup + : schema match to use for related record + """ # noqa: E501 + )), + ("INETNUM-SCHEMA", clean( + r""" + schema: INETNUM-SCHEMA + ref: dn42.inetnum + key: inetnum required single schema + key: cidr required single primary + key: netname required single + key: nserver optional multiple > [domain-name] + key: country optional multiple + key: descr optional single + key: status optional single > {ALLOCATED|ASSIGNED} {PI|PA|} + key: policy optional single > {open|closed|ask|reserved} + key: admin-c optional multiple lookup=dn42.person,dn42.role + key: tech-c optional multiple lookup=dn42.person,dn42.role + key: zone-c optional multiple lookup=dn42.person,dn42.role + key: mnt-by optional multiple lookup=dn42.mntner + key: remarks optional multiple + key: source required single lookup=dn42.registry + mnt-by: DN42-MNT + source: DN42 + """ # noqa: E501 + )), + ("ROLE-SCHEMA", clean( + r""" + schema: ROLE-SCHEMA + ref: dn42.role + key: role required single schema + key: nic-hdl required single primary + key: mnt-by required multiple lookup=dn42.mntner + key: org optional multiple lookup=dn42.organisation + key: admin-c optional multiple lookup=dn42.person + key: tech-c optional multiple lookup=dn42.person + key: abuse-c optional multiple lookup=dn42.person + key: abuse-mailbox optional multiple + key: descr optional single + key: remarks optional multiple + key: source required single lookup=dn42.registry + mnt-by: DN42-MNT + source: DN42 + """ # noqa: E501 + )), + ("PERSON-SCHEMA", clean( + r""" + schema: PERSON-SCHEMA + ref: dn42.person + key: person required single schema + key: nic-hdl required single primary + key: mnt-by required multiple lookup=dn42.mntner + key: org optional multiple lookup=dn42.organisation + key: nick optional multiple + key: pgp-fingerprint optional multiple + key: www optional multiple + key: e-mail optional multiple + key: contact optional multiple + key: abuse-mailbox optional multiple + key: phone optional multiple + key: fax-no optional multiple + key: address optional multiple + key: remarks optional multiple + key: source required single lookup=dn42.registry + mnt-by: DN42-MNT + source: DN42 + """ # noqa: E501 + )), + ("MNTNER-SCHEMA", clean( + r""" + schema: MNTNER-SCHEMA + ref: dn42.mntner + key: mntner required single primary schema + key: descr optional single + key: mnt-by required multiple lookup=dn42.mntner + key: admin-c optional multiple lookup=dn42.person,dn42.role + key: tech-c optional multiple lookup=dn42.person,dn42.role + key: auth optional multiple > [method] [value]... + key: org optional multiple lookup=dn42.organisation + key: abuse-mailbox optional single + key: remarks optional multiple + key: source required single lookup=dn42.registry + mnt-by: DN42-MNT + source: DN42 + """ # noqa: E501 + )), + ("REGISTRY-SCHEMA", clean( + r""" + schema: REGISTRY-SCHEMA + ref: dn42.registry + key: registry required single primary schema + key: url required multiple + key: descr optional multiple + key: mnt-by required multiple lookup=dn42.mntner + key: admin-c optional multiple lookup=dn42.person,dn42.role + key: tech-c optional multiple lookup=dn42.person,dn42.role + key: source required single lookup=dn42.registry + mnt-by: DN42-MNT + source: DN42 + """ # noqa: E501 + )), + ("172.21.64.0_29", clean( + r""" + inetnum: 172.21.64.0 - 172.21.64.7 + cidr: 172.21.64.0/29 + netname: XUU-TEST-NET + descr: Xuu TestNet + country: US + admin-c: SOURIS-DN42 + tech-c: SOURIS-DN42 + mnt-by: XUU-MNT + nserver: lavana.sjc.xuu.dn42 + nserver: kapha.mtr.xuu.dn42 + nserver: rishi.bre.xuu.dn42 + status: ALLOCATED + remarks: This is a transfernet. + source: DN42 + """ + )), + ("SOURIS-DN42", clean( + r""" + role: Souris Organization Role + abuse-mailbox: abuse@sour.is + admin-c: XUU-DN42 + tech-c: XUU-DN42 + nic-hdl: SOURIS-DN42 + mnt-by: XUU-MNT + source: DN42 + """ + )), + ("XUU-DN42", clean( + r""" + person: Xuu + remarks: test + contact: xmpp:xuu@xmpp.dn42 + contact: mail:xuu@dn42.us + pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F + nic-hdl: XUU-DN42 + mnt-by: XUU-MNT + source: DN42 + """ + )), + ("XUU-MNT", clean( + r""" + mntner: XUU-MNT + descr: Xuu Maintenance Object + admin-c: SOURIS-DN42 + tech-c: SOURIS-DN42 + mnt-by: XUU-MNT + source: DN42 + """ + )), + ("DN42-MNT", clean( + r""" + mntner: DN42-MNT + descr: mntner for owning objects in the name of whole dn42. + mnt-by: DN42-MNT + source: DN42 + """ # noqa: E501 + )), + ("DN42", clean( + r""" + registry: DN42 + url: https://git.dn42.us/dn42/registry + mnt-by: DN42-MNT + source: DN42 + """ + )), + ] + + +class TestSchema(unittest.TestCase): + """Test SchemaDOM + """ + + def test_schema_parse(self): + """Test schema parsing + """ + d = FileDOM(src="schema/SCHEMA-SCHEMA") + d.parse(test_files[0][1].splitlines()) + + self.assertEqual(str(d), test_files[0][1]) + self.assertTrue(d.valid) + + s = SchemaDOM() + s.parse(d) + + self.assertTrue(s.valid) + + state = s.check_file(d) + + self.assertTrue(state) + state.print() + + def test_schema_all(self): + """Test schema failures + """ + + schemas = {} + for (fname, text) in { + row for row in test_files if row[0].endswith("-SCHEMA")}: + dom = FileDOM(src=fname) + dom.parse(text.splitlines()) + + schema = SchemaDOM() + schema.parse(dom) + + schemas[schema.ref] = schema + + files = [] + idx = {} + + for (fname, text) in test_files: + dom = FileDOM(src=fname) + dom.parse(text.splitlines()) + + files.append(dom) + + name = dom.src.split("/")[-1].replace("_", "/") + idx[(f"dn42.{dom.schema}", name)] = (dom.src, ",".join(dom.mntner)) + + for dom in files: + s = schemas["dn42." + dom.schema] + state = s.check_file(dom, idx) + + self.assertTrue(state) + state.print() diff --git a/utils/registry/log.py b/utils/registry/log.py new file mode 100644 index 000000000..52dab50aa --- /dev/null +++ b/utils/registry/log.py @@ -0,0 +1,206 @@ +"""Simple Logger""" + +from __future__ import print_function + +import os +import sys +import inspect +import datetime +import traceback +from enum import IntEnum + +OUTPUT = sys.stderr + +LEVEL = ["CRIT", "ERR ", "WARN", "NOTE", "INFO", "DBUG", "...."] +CLEVEL = ["\x1B[41mCRIT\x1B[0m", + "\x1B[31mERR \x1B[0m", + "\x1B[33mWARN\x1B[0m", + "\x1B[32mNOTE\x1B[0m", + "\x1B[34mINFO\x1B[0m", + "\x1B[90mDBUG\x1B[0m", + "\x1B[90m....\x1B[0m"] + +MSG = "{0} {1} {2} {3} {4} {5} :: {6}" +CMSG = "[{1}]\x1B[90m {2} {3}:{5} [{4}]\x1B[0m {6}\x1B[0m" +CMULTI = "[{1}]\x1B[90m {2}\x1B[0m" + + +class Level(IntEnum): + """Log Level enumeration""" + VERB_CRITICAL = 0 + VERB_ERROR = 1 + VERB_WARN = 2 + VERB_NOTICE = 3 + VERB_INFO = 4 + VERB_DEBUG = 5 + VERB_NONE = -1 + + +class Log: + """Logger""" + log_dir = "" + log_pfx = "main" + + level_console = Level.VERB_ERROR + level_file = Level.VERB_NONE + level_full = False + + count = [0, 0, 0, 0, 0, 0] + + def __init__(self): + self.prog_name = sys.argv[0].rsplit("/", 1)[-1] + self.prog_name = self.prog_name.split(".", 1)[0] + self.log_pfx = self.prog_name + + def __del__(self): + if self.level_console >= 5: + crit, err, warn, note, inf, dbug = tuple(self.count) + os.write(1, "[\x1B[90m\x1B[90mDBUG\x1B[90m] Log Counters" + + f" crit:{crit}" + + f" err:{err}" + + f" warn: {warn}" + + f" note: {note}" + + f" info: {inf}" + + f" dbug: {dbug}\x1B[0m\n") + + def set_dir(self, name: str): + """Set output directory""" + if not os.path.isdir(name): + os.makedirs(name) + self.log_dir = name + + def output(self, level: Level, message: str, frame=1): + """Write a message to console or log, conditionally.""" + if level < 0 or level > 5: + level = 5 + + self.count[level] += 1 + + # function_name = inspect.stack()[1][3] + cur_date = datetime.datetime.now() + + (frame, file, ln, fn, _, _) = inspect.getouterframes( + inspect.currentframe())[frame] + + message = str(message).split("\n") + cmsg = CMSG if self.level_full else CMULTI + + if self.level_console >= level: + + if len(message) == 1: + if self.level_full: + arg = (str(cur_date), + CLEVEL[level], + self.prog_name, + file, fn, ln, message[0]) + else: + arg = str(cur_date), CLEVEL[level], message[0] + + print(cmsg.format(*arg), file=OUTPUT) + else: + if self.level_full: + arg = str(cur_date), CLEVEL[ + level], self.prog_name, file, fn, ln, "" + print(cmsg.format(*arg), file=OUTPUT) + + for line in message: + print(CMULTI.format(str(cur_date), + CLEVEL[Level.VERB_NONE], line), + file=OUTPUT) + + if self.level_file >= level: + self.set_dir("./logs") + log_file_name = os.path.join( + self.log_dir, + self.log_pfx + str(cur_date.strftime('%Y-%m-%d')) + ".txt") + + with open(log_file_name, "a") as logger: + logger.write(MSG.format(str(cur_date), + LEVEL[level], + self.prog_name, + file, fn, ln, message[0]) + "\n") + for line in message[1:]: + logger.write(MSG.format(str(cur_date), + LEVEL[Level.VERB_NONE], + self.prog_name, + file, fn, ln, line) + "\n") + + def fatal(self, message: str): + """Log a fatal error""" + self.output(Level.VERB_CRITICAL, message, 2) + sys.exit(1) + + def critical(self, message: str): + """Log a critical error""" + self.output(Level.VERB_CRITICAL, message, 2) + + def error(self, message: str): + """Log a normal error""" + self.output(Level.VERB_ERROR, message, 2) + + def warning(self, message: str): + """Log a warning""" + self.output(Level.VERB_WARN, message, 2) + + def notice(self, message: str): + """Log a notice""" + self.output(Level.VERB_NOTICE, message, 2) + + def info(self, message: str): + """Log an informational""" + self.output(Level.VERB_INFO, message, 2) + + def debug(self, message: str): + """Log a debug""" + self.output(Level.VERB_DEBUG, message, 2) + + +default = Log() + +fatal = default.fatal +critical = default.critical +error = default.error +warning = default.warning +notice = default.notice +info = default.info +debug = default.debug + + +class LogException: + """Catches an exception to log it""" + stop = None + + def __init__(self, stop: bool = True): + self.stop = stop + + def __enter__(self, stop: bool = True): + pass + + def __exit__(self, exc_type, value, trace) -> bool: + + if exc_type is None: + return True + + if exc_type is SystemExit and value.args == (0,): + return True + + log_string, _ = fmt_exception(exc_type, value, trace) + default.output(Level.VERB_CRITICAL, 'Failure\n\n' + log_string, 2) + + if self.stop is False: + return False + + fatal("ABORTING EXECUTION") + return False + + +def fmt_exception(exc_type, exc_value, exc_traceback): + """format exception to string""" + lines = traceback.format_exception(exc_type, exc_value, exc_traceback) + log_string = ''.join(line for line in lines) + email_string = ''.join('
' + line for line in lines) + + return log_string, email_string + + +exception = LogException diff --git a/utils/registry/run.py b/utils/registry/run.py new file mode 100644 index 000000000..e69de29bb