add initial dom scripts

This commit is contained in:
Jon Lundy 2020-06-08 13:12:59 -06:00
parent 0f9d455bb6
commit 855bb22022
No known key found for this signature in database
GPG key ID: C63E6D61F3035024
8 changed files with 1046 additions and 2 deletions

6
.gitignore vendored
View file

@ -1,5 +1,7 @@
_MTN
lib/
whoisd/
utils/
!utils/schema-check/*.py
/utils/
!/utils/schema-check
!/utils/registry
__pycache__

View file

View file

@ -0,0 +1,176 @@
"""FileDOM parse and formating"""
import re
from dataclasses import dataclass
from typing import Sequence, NamedTuple, List, Dict, Optional, Union
import ipaddress
import log
@dataclass
class Value:
"""Dom Value"""
value: str
def __eq__(self, other: str) -> bool:
return self.value == other
def __str__(self) -> str:
return self.value
def lines(self) -> List[str]:
"""return value split into lines"""
return self.value.splitlines()
def fields(self) -> List[str]:
"""return value split into fields"""
return self.value.split()
def as_ip(self) -> Union[ipaddress.IPv4Address, ipaddress.IPv6Address]:
"""return value as an ip address"""
return ipaddress.ip_address(self.value)
def as_net(self) -> Union[ipaddress.IPv4Network, ipaddress.IPv6Network]:
"""return value as an ip network"""
return ipaddress.ip_network(self.value)
def as_key(self) -> str:
"""Format as key name"""
return self.value.replace("/", "_").replace(" ", "")
class Row(NamedTuple):
"""DOM Row"""
key: str
value: Value
lineno: int
src: str = None
def loc(self) -> str:
"""format as location"""
s = f"{self.src} Line {self.lineno} "
s += "" if self.key == "" else f"Key [{self.key}]:"
return s
class FileDOM:
"""Parses a reg file"""
def __init__(self, src: Optional[str] = None):
self.valid = False
self.dom = [] # type: List[Row]
self.keys = {} # type: Dict[str, int]
self.multi = {} # type: Dict[str, int]
self.mntner = [] # type: List[str]
self.schema = None # type: Optional[str]
self.src = src
def parse(self, input_str: Sequence[str], src: Optional[str] = None):
"""Parse an input string generator"""
dom = []
keys = {}
multi = {}
mntner = []
last_multi = None
self.valid = True
self.src = self.src if src is None else src
for lineno, i in enumerate(input_str, 1):
# print(lineno, i)
if re.match(r'[ \t]', i):
if len(dom) == 0:
log.error(f"File {src} does not parse properly")
self.valid = False
return
dom[-1][1] += "\n" + i.strip()
if dom[-1][0] not in multi:
multi[dom[-1][0]] = []
if last_multi is None:
multi[dom[-1][0]].append(lineno)
last_multi = dom[-1][0]
else:
if i[0] == '+':
dom[-1][1] += "\n"
if dom[-1][0] not in multi:
multi[dom[-1][0]] = []
if last_multi is None:
multi[dom[-1][0]].append(lineno)
last_multi = dom[-1][0]
i = i.split(":")
if len(i) < 2:
continue
dom.append([i[0].strip(), ':'.join(
i[1:]).strip(), lineno - 1])
if i[0].strip() not in keys:
keys[i[0].strip()] = []
keys[i[0].strip()].append(len(dom) - 1)
last_multi = None
if dom[-1][0] == 'mnt-by':
mntner.append(dom[-1][1])
self.dom = [Row(k, Value(v), n) for k, v, n in dom]
self.keys = keys
self.multi = multi
self.mntner = mntner
self.schema = self.dom[0].key
def __str__(self):
length = 19
for i in self.dom:
if len(i.key) > length:
length = len(i.key) + 2
s = ""
for i in self.dom:
sp = i.value.lines()
s += i.key + ":" + " " * (length - len(i.key)) + sp[0] + "\n"
for m in sp[1:]:
if m == "":
s += "+\n"
continue
s += " " * (length + 1) + m + "\n"
return s
def get(self, key, index=0, default=None):
"""Get a key value"""
if key not in self.keys:
return default
if index >= len(self.keys[key]) or index <= -len(self.keys[key]):
return default
return self.dom[self.keys[key][index]].value
def put(self, key, value, index=0, append=False):
"""Put a value"""
if key not in self.keys:
self.keys[key] = []
i = (self.keys[key][index:index+1] or (None,))[0]
if i is None or append:
i = len(self.dom)
self.dom.append(Row(key, Value(value), i))
elif i is not None:
self.dom[i] = Row(key, Value(value), i)
if index not in self.keys[key]:
self.keys[key].append(i)
def read_file(fn: str) -> FileDOM:
"""Parses FileDOM from file"""
with open(fn, mode='r', encoding='utf-8') as f:
return FileDOM().parse(f.readlines())

View file

@ -0,0 +1,244 @@
"""Schema DOM"""
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, List, Tuple
import log
from .filedom import FileDOM, Row
SCHEMA_NAMESPACE = "dn42."
class Level(Enum):
"""State error level"""
info = 1
warning = 2
error = 3
@dataclass
class State:
"""State of schema check
"""
state: bool = True
msgs: List[Tuple[Level, Row, str]] = field(default_factory=list)
def __eq__(self, other: bool) -> bool:
return self.state == other
def __bool__(self):
return self.state
def __str__(self) -> str:
return "PASS" if self.state else "FAIL"
def print(self):
"""print out state info"""
for (level, row, msg) in self.msgs:
if level == Level.info:
log.info(f"{row.loc()} {msg}")
elif level == Level.warning:
log.warning(f"{row.loc()} {msg}")
elif level == Level.error:
log.error(f"{row.loc()} {msg}")
def info(self, r: Row, s: str):
"""Add warning"""
self.msgs.append((Level.info, r, s))
def warning(self, r: Row, s: str):
"""Add warning"""
self.msgs.append((Level.warning, r, s))
def error(self, r: Row, s: str):
"""Add error"""
self.state = False
self.msgs.append((Level.error, r, s))
class SchemaDOM:
"""Schema DOM"""
def __init__(self, src: Optional[str] = None):
self.valid = False
self.name = None
self.ref = None
self.primary = None
self.type = None
self.src = src
self.schema = {}
def parse(self, f: FileDOM):
"""Parse a FileDOM into a SchemaDOM"""
self.src = self.src if f.src is None else f.src
schema = {}
for row in f.dom:
if row.key == 'ref':
self.ref = str(row.value)
elif row.key == 'schema':
self.name = str(row.value)
if row.key != 'key':
continue
lines = row.value.fields()
key = lines.pop(0)
schema[key] = set()
for i in lines:
if i == ">":
break
schema[key].add(i)
schema = self._process_schema(schema)
self.valid = True
self.schema = schema
return schema
def _process_schema(self, schema):
for k, v in schema.items():
if 'schema' in v:
self.type = k
if 'primary' in v:
self.primary = k
schema[k].add("oneline")
if "multiline" in v:
schema[k].remove("multiline")
schema[k].add("single")
if "multiple" in v:
schema[k].remove("multiple")
schema[k].add("required")
if "optional" in v:
schema[k].remove("optional")
if "recommend" in v:
schema[k].remove("recommend")
if "deprecate" in v:
schema[k].remove("deprecate")
if 'oneline' not in v:
schema[k].add("multiline")
if 'single' not in v:
schema[k].add("multiple")
return schema
def check_file(self, f: FileDOM, lookups=None) -> State:
"""Check a FileDOM for correctness(tm)"""
state = State()
if not f.valid:
state.error(Row("", "", 0, f.src), "file does not parse")
state = self._check_file_structure(state, f)
state = self._check_file_values(state, f, lookups)
state = inetnum_check(state, f)
print("CHECK\t%-54s\t%s\tMNTNERS: %s" %
(f.src, state, ','.join(f.mntner)))
return state
def _check_file_structure(self, state: State, f: FileDOM) -> State:
for k, v in self.schema.items():
row = Row(k, "", 0, f.src)
if 'required' in v and k not in f.keys:
state.error(row, "not found and is required")
elif 'recommend' in v and k not in f.keys:
state.info(row, "not found and is recommended")
if 'schema' in v and SCHEMA_NAMESPACE + f.dom[0].key != self.ref:
state.error(row, "not found and is required as the first line")
if 'single' in v and k in f.keys and len(f.keys[k]) > 1:
state.warning(row, "first defined here and has repeated keys")
for i in f.keys[k][1:]:
state.error(row, f"repeated on {i} can only appear once")
if 'oneline' in v and k in f.multi:
for i in f.keys[k]:
state.error(row, "can not have multiple lines")
return state
def _check_file_values(self,
state: State,
f: FileDOM,
lookups: Optional[List[Tuple[str, str]]] = None
) -> State:
for row in f.dom:
c = row.value.as_key()
src = "None" if f.src is None else f.src
if row.key == self.primary and not src.endswith(c):
state.error(row,
f"primary [{row.value}]" +
f" does not match filename [{src}].")
if row.key.startswith("x-"):
state.info(row, "is user defined")
elif row.key not in self.schema:
state.error(row, "not in schema")
continue
else:
if 'deprecate' in self.schema[row.key]:
state.info(row, "was found and is deprecated")
if lookups is not None:
state = self._check_file_lookups(state, row, lookups)
return state
def _check_file_lookups(self,
state: State,
row: Row,
lookups: List[Tuple[str, str]] = None
) -> State:
for o in self.schema[row.key]:
if o.startswith("lookup="):
refs = o.split("=", 2)[1].split(",")
val = row.value.fields()[0]
found = False
for ref in refs:
if (ref, val) in lookups:
found = True
if not found:
state.error(row,
f"references object {val} " +
f"in {refs} but does not exist.")
return state
def read_file(src: str) -> SchemaDOM:
"""Parses SchemaDOM from file"""
with open(src, mode='r', encoding='utf-8') as f:
dom = FileDOM(src=src)
dom.parse(f.readlines())
return SchemaDOM().parse(dom)
def inetnum_check(state: State, dom: FileDOM) -> State:
"""Sanity Check for checking the inet[6]num value"""
if dom.schema == "inetnum" or dom.schema == "inet6num":
cidr = dom.get("cidr").as_net()
Lnet = cidr.network_address.exploded
Hnet = cidr.broadcast_address.exploded
cidr_range = f"{Lnet}-{Hnet}"
file_range = dom.get(dom.schema)
file_range = re.sub(r"\s+", "", str(file_range), flags=re.UNICODE)
if cidr_range != file_range:
state.error(Row("", "", 0, dom.src),
f"inetnum range [{file_range}] " +
f"does not match: [{cidr_range}]")
return state

View file

@ -0,0 +1,136 @@
#!/usr/bin/env python3
"""Test FileDOM"""
import unittest
import inspect
from pprint import pprint
from .filedom import FileDOM
class TestFileDOM(unittest.TestCase):
"""Test FileDOM"""
def test_parse(self):
"""Test Parsing"""
s = """
person: Xuu
remarks: test
+
Multi-Line
contact: xmpp:xuu@xmpp.dn42
contact: mail:xuu@dn42.us
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
nic-hdl: XUU-DN42
mnt-by: XUU-MNT
source: DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
self.assertTrue(dom.valid)
self.assertEqual(dom.schema, "person")
self.assertEqual(dom.get("person"), "Xuu")
self.assertEqual(dom.get("contact"), "xmpp:xuu@xmpp.dn42")
self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us")
self.assertIsNone(dom.get("xxx"))
self.assertEqual(dom.get("xxx", default="default"), "default")
self.assertEqual(str(dom), s)
def test_put_values(self):
"""Test putting values"""
s = """
person: Xuu
remarks: test
contact: xmpp:xuu@xmpp.dn42
contact: mail:xuu@dn42.us
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
nic-hdl: XUU-DN42
mnt-by: XUU-MNT
source: DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
dom.put("source", "SOURIS")
self.assertEqual(dom.get("source"), "SOURIS")
dom.put("contact", "mail:me@sour.is", append=True)
self.assertEqual(str(dom.get("contact")), "xmpp:xuu@xmpp.dn42")
self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us")
self.assertEqual(dom.get("contact", index=2), "mail:me@sour.is")
def test_parse_ip6address(self):
"""Test network ip address parsing"""
s = """
inet6num: fd00:0000:0000:0000:0000:0000:0000:0000 - fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff
cidr: fd00::/8
netname: ROOT-DN42-ULA
descr: DN42 ULA Address Space
status: ALLOCATED
policy: open
org: ORG-DN42
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
cidr = dom.get("cidr").as_net()
self.assertEqual(cidr.compressed, "fd00::/8")
self.assertEqual(
cidr.exploded, "fd00:0000:0000:0000:0000:0000:0000:0000/8")
end = cidr.broadcast_address.exploded
start = cidr.network_address.exploded
self.assertEqual(dom.get("inet6num"), f"{start} - {end}")
def test_parse_ip4address(self):
"""Test network ip address parsing"""
s = """
inetnum: 172.20.0.0 - 172.23.255.255
cidr: 172.20.0.0/14
netname: ROOT-DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
cidr = dom.get("cidr").as_net()
self.assertEqual(cidr.compressed, "172.20.0.0/14")
self.assertEqual(
cidr.exploded, "172.20.0.0/14")
end = cidr.broadcast_address.exploded
start = cidr.network_address.exploded
self.assertEqual(dom.get("inetnum"), f"{start} - {end}")
@unittest.skip
def test_bad_parse(self):
"""bad parse stuff"""
s = """
person: Xuu
EXTRA
:
source: DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
pprint(dom.dom)
self.assertEqual(str(dom), s)
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,280 @@
"""Test SchemaDOM"""
import inspect
import unittest
from .schema import SchemaDOM
from .filedom import FileDOM
def clean(s: str) -> str:
"Clean input for use"
return inspect.cleandoc(s) + "\n"
test_files = [
("SCHEMA-SCHEMA", clean(
r"""
schema: SCHEMA-SCHEMA
ref: dn42.schema
key: schema required single primary schema > [name]
key: ref required single > [schema]
key: key required multiple > [key-name]
{required|optional|recommend|deprecate}
{single|multiple} {primary|} {schema|}
lookup=str '>' [spec]...
key: mnt-by required multiple lookup=dn42.mntner > [mntner]
key: remarks optional multiple > [text]...
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
remarks: # option descriptions
Attribute names must match /[a-zA-Z]([a-zA-Z0-9_\-]*[a-zA-Z0-9])?/.
+
required
: object required to have at least one
optional
: object not required to have at least one
+
single
: only one of this type allowed
multiple
: more than one of this type allowed
+
primary
: use field as lookup key for lookup
* only one allowed per schema
* does not allow newlines
+
schema
: use field name as the name of the schema
* only one allowed per schema
* does not allow newlines
+
lookup
: schema match to use for related record
""" # noqa: E501
)),
("INETNUM-SCHEMA", clean(
r"""
schema: INETNUM-SCHEMA
ref: dn42.inetnum
key: inetnum required single schema
key: cidr required single primary
key: netname required single
key: nserver optional multiple > [domain-name]
key: country optional multiple
key: descr optional single
key: status optional single > {ALLOCATED|ASSIGNED} {PI|PA|}
key: policy optional single > {open|closed|ask|reserved}
key: admin-c optional multiple lookup=dn42.person,dn42.role
key: tech-c optional multiple lookup=dn42.person,dn42.role
key: zone-c optional multiple lookup=dn42.person,dn42.role
key: mnt-by optional multiple lookup=dn42.mntner
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
)),
("ROLE-SCHEMA", clean(
r"""
schema: ROLE-SCHEMA
ref: dn42.role
key: role required single schema
key: nic-hdl required single primary
key: mnt-by required multiple lookup=dn42.mntner
key: org optional multiple lookup=dn42.organisation
key: admin-c optional multiple lookup=dn42.person
key: tech-c optional multiple lookup=dn42.person
key: abuse-c optional multiple lookup=dn42.person
key: abuse-mailbox optional multiple
key: descr optional single
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
)),
("PERSON-SCHEMA", clean(
r"""
schema: PERSON-SCHEMA
ref: dn42.person
key: person required single schema
key: nic-hdl required single primary
key: mnt-by required multiple lookup=dn42.mntner
key: org optional multiple lookup=dn42.organisation
key: nick optional multiple
key: pgp-fingerprint optional multiple
key: www optional multiple
key: e-mail optional multiple
key: contact optional multiple
key: abuse-mailbox optional multiple
key: phone optional multiple
key: fax-no optional multiple
key: address optional multiple
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
)),
("MNTNER-SCHEMA", clean(
r"""
schema: MNTNER-SCHEMA
ref: dn42.mntner
key: mntner required single primary schema
key: descr optional single
key: mnt-by required multiple lookup=dn42.mntner
key: admin-c optional multiple lookup=dn42.person,dn42.role
key: tech-c optional multiple lookup=dn42.person,dn42.role
key: auth optional multiple > [method] [value]...
key: org optional multiple lookup=dn42.organisation
key: abuse-mailbox optional single
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
)),
("REGISTRY-SCHEMA", clean(
r"""
schema: REGISTRY-SCHEMA
ref: dn42.registry
key: registry required single primary schema
key: url required multiple
key: descr optional multiple
key: mnt-by required multiple lookup=dn42.mntner
key: admin-c optional multiple lookup=dn42.person,dn42.role
key: tech-c optional multiple lookup=dn42.person,dn42.role
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
)),
("172.21.64.0_29", clean(
r"""
inetnum: 172.21.64.0 - 172.21.64.7
cidr: 172.21.64.0/29
netname: XUU-TEST-NET
descr: Xuu TestNet
country: US
admin-c: SOURIS-DN42
tech-c: SOURIS-DN42
mnt-by: XUU-MNT
nserver: lavana.sjc.xuu.dn42
nserver: kapha.mtr.xuu.dn42
nserver: rishi.bre.xuu.dn42
status: ALLOCATED
remarks: This is a transfernet.
source: DN42
"""
)),
("SOURIS-DN42", clean(
r"""
role: Souris Organization Role
abuse-mailbox: abuse@sour.is
admin-c: XUU-DN42
tech-c: XUU-DN42
nic-hdl: SOURIS-DN42
mnt-by: XUU-MNT
source: DN42
"""
)),
("XUU-DN42", clean(
r"""
person: Xuu
remarks: test
contact: xmpp:xuu@xmpp.dn42
contact: mail:xuu@dn42.us
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
nic-hdl: XUU-DN42
mnt-by: XUU-MNT
source: DN42
"""
)),
("XUU-MNT", clean(
r"""
mntner: XUU-MNT
descr: Xuu Maintenance Object
admin-c: SOURIS-DN42
tech-c: SOURIS-DN42
mnt-by: XUU-MNT
source: DN42
"""
)),
("DN42-MNT", clean(
r"""
mntner: DN42-MNT
descr: mntner for owning objects in the name of whole dn42.
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
)),
("DN42", clean(
r"""
registry: DN42
url: https://git.dn42.us/dn42/registry
mnt-by: DN42-MNT
source: DN42
"""
)),
]
class TestSchema(unittest.TestCase):
"""Test SchemaDOM
"""
def test_schema_parse(self):
"""Test schema parsing
"""
d = FileDOM(src="schema/SCHEMA-SCHEMA")
d.parse(test_files[0][1].splitlines())
self.assertEqual(str(d), test_files[0][1])
self.assertTrue(d.valid)
s = SchemaDOM()
s.parse(d)
self.assertTrue(s.valid)
state = s.check_file(d)
self.assertTrue(state)
state.print()
def test_schema_all(self):
"""Test schema failures
"""
schemas = {}
for (fname, text) in {
row for row in test_files if row[0].endswith("-SCHEMA")}:
dom = FileDOM(src=fname)
dom.parse(text.splitlines())
schema = SchemaDOM()
schema.parse(dom)
schemas[schema.ref] = schema
files = []
idx = {}
for (fname, text) in test_files:
dom = FileDOM(src=fname)
dom.parse(text.splitlines())
files.append(dom)
name = dom.src.split("/")[-1].replace("_", "/")
idx[(f"dn42.{dom.schema}", name)] = (dom.src, ",".join(dom.mntner))
for dom in files:
s = schemas["dn42." + dom.schema]
state = s.check_file(dom, idx)
self.assertTrue(state)
state.print()

206
utils/registry/log.py Normal file
View file

@ -0,0 +1,206 @@
"""Simple Logger"""
from __future__ import print_function
import os
import sys
import inspect
import datetime
import traceback
from enum import IntEnum
OUTPUT = sys.stderr
LEVEL = ["CRIT", "ERR ", "WARN", "NOTE", "INFO", "DBUG", "...."]
CLEVEL = ["\x1B[41mCRIT\x1B[0m",
"\x1B[31mERR \x1B[0m",
"\x1B[33mWARN\x1B[0m",
"\x1B[32mNOTE\x1B[0m",
"\x1B[34mINFO\x1B[0m",
"\x1B[90mDBUG\x1B[0m",
"\x1B[90m....\x1B[0m"]
MSG = "{0} {1} {2} {3} {4} {5} :: {6}"
CMSG = "[{1}]\x1B[90m {2} {3}:{5} [{4}]\x1B[0m {6}\x1B[0m"
CMULTI = "[{1}]\x1B[90m {2}\x1B[0m"
class Level(IntEnum):
"""Log Level enumeration"""
VERB_CRITICAL = 0
VERB_ERROR = 1
VERB_WARN = 2
VERB_NOTICE = 3
VERB_INFO = 4
VERB_DEBUG = 5
VERB_NONE = -1
class Log:
"""Logger"""
log_dir = ""
log_pfx = "main"
level_console = Level.VERB_ERROR
level_file = Level.VERB_NONE
level_full = False
count = [0, 0, 0, 0, 0, 0]
def __init__(self):
self.prog_name = sys.argv[0].rsplit("/", 1)[-1]
self.prog_name = self.prog_name.split(".", 1)[0]
self.log_pfx = self.prog_name
def __del__(self):
if self.level_console >= 5:
crit, err, warn, note, inf, dbug = tuple(self.count)
os.write(1, "[\x1B[90m\x1B[90mDBUG\x1B[90m] Log Counters" +
f" crit:{crit}" +
f" err:{err}" +
f" warn: {warn}" +
f" note: {note}" +
f" info: {inf}" +
f" dbug: {dbug}\x1B[0m\n")
def set_dir(self, name: str):
"""Set output directory"""
if not os.path.isdir(name):
os.makedirs(name)
self.log_dir = name
def output(self, level: Level, message: str, frame=1):
"""Write a message to console or log, conditionally."""
if level < 0 or level > 5:
level = 5
self.count[level] += 1
# function_name = inspect.stack()[1][3]
cur_date = datetime.datetime.now()
(frame, file, ln, fn, _, _) = inspect.getouterframes(
inspect.currentframe())[frame]
message = str(message).split("\n")
cmsg = CMSG if self.level_full else CMULTI
if self.level_console >= level:
if len(message) == 1:
if self.level_full:
arg = (str(cur_date),
CLEVEL[level],
self.prog_name,
file, fn, ln, message[0])
else:
arg = str(cur_date), CLEVEL[level], message[0]
print(cmsg.format(*arg), file=OUTPUT)
else:
if self.level_full:
arg = str(cur_date), CLEVEL[
level], self.prog_name, file, fn, ln, ""
print(cmsg.format(*arg), file=OUTPUT)
for line in message:
print(CMULTI.format(str(cur_date),
CLEVEL[Level.VERB_NONE], line),
file=OUTPUT)
if self.level_file >= level:
self.set_dir("./logs")
log_file_name = os.path.join(
self.log_dir,
self.log_pfx + str(cur_date.strftime('%Y-%m-%d')) + ".txt")
with open(log_file_name, "a") as logger:
logger.write(MSG.format(str(cur_date),
LEVEL[level],
self.prog_name,
file, fn, ln, message[0]) + "\n")
for line in message[1:]:
logger.write(MSG.format(str(cur_date),
LEVEL[Level.VERB_NONE],
self.prog_name,
file, fn, ln, line) + "\n")
def fatal(self, message: str):
"""Log a fatal error"""
self.output(Level.VERB_CRITICAL, message, 2)
sys.exit(1)
def critical(self, message: str):
"""Log a critical error"""
self.output(Level.VERB_CRITICAL, message, 2)
def error(self, message: str):
"""Log a normal error"""
self.output(Level.VERB_ERROR, message, 2)
def warning(self, message: str):
"""Log a warning"""
self.output(Level.VERB_WARN, message, 2)
def notice(self, message: str):
"""Log a notice"""
self.output(Level.VERB_NOTICE, message, 2)
def info(self, message: str):
"""Log an informational"""
self.output(Level.VERB_INFO, message, 2)
def debug(self, message: str):
"""Log a debug"""
self.output(Level.VERB_DEBUG, message, 2)
default = Log()
fatal = default.fatal
critical = default.critical
error = default.error
warning = default.warning
notice = default.notice
info = default.info
debug = default.debug
class LogException:
"""Catches an exception to log it"""
stop = None
def __init__(self, stop: bool = True):
self.stop = stop
def __enter__(self, stop: bool = True):
pass
def __exit__(self, exc_type, value, trace) -> bool:
if exc_type is None:
return True
if exc_type is SystemExit and value.args == (0,):
return True
log_string, _ = fmt_exception(exc_type, value, trace)
default.output(Level.VERB_CRITICAL, 'Failure\n\n' + log_string, 2)
if self.stop is False:
return False
fatal("ABORTING EXECUTION")
return False
def fmt_exception(exc_type, exc_value, exc_traceback):
"""format exception to string"""
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
log_string = ''.join(line for line in lines)
email_string = ''.join('<br />' + line for line in lines)
return log_string, email_string
exception = LogException

0
utils/registry/run.py Normal file
View file