Update registry scripts

This commit is contained in:
Simon Marsh 2026-01-23 10:45:44 +00:00
parent f8199a0902
commit 36e3570111
No known key found for this signature in database
GPG key ID: E9B4156C1659C079
6 changed files with 258 additions and 56 deletions

View file

@ -1,19 +1,90 @@
#!/bin/sh
#!/bin/sh -e
###########################################################################
#
# dn42 registry - object validation
#
###########################################################################
if [ "$#" -eq "0" ]
then
echo "Usage: $0 YOUR-MNT"
exit
mntner="$1"
if [ -z "$mntner" ]
then
>&2 echo "Usage: $0 YOUR-MNT"
exit 1
fi
BASE="$(readlink -f "$0" 2>/dev/null || python -c 'import os,sys;print(os.path.realpath(sys.argv[1]))' "$0")"
BASE="$(dirname "$BASE")"
cd "$BASE" || exit 1
check_script='utils/schema-check/dn42_schema_local.py'
###########################################################################
# determine registry directory
#
# this will fail if the script is in the PATH or is sourced but those
# both seem unlikely. In any case if it does fail an env var can be used
# to override the check
rdir="$REGDIR"
if [ -z "$rdir" ]
then
rdir=$(cd -- "$(dirname -- "$0")" && pwd)
fi
if ! [ -x "${rdir}/${check_script}" ]
then
>&2 cat <<EOF
ERROR: Unable to automatically find the registry directory,
or the script '$check_script' is not executable
You can set the directory manually using the
REGDIR environment variable.
For example:
REGDIR='path/to/registry' $0 $mntner
EOF
exit 1
fi
# switch to the registry directory
cd "$rdir"
###########################################################################
# perform the validation
if [ "$mntner" = "--all" ]
then
# check everything
if ! "$check_script" -v scan data/
then
>&2 echo 'Schema validation failed!'
exit 1
fi
if [ "$1" = "--all" ]; then
utils/schema-check/dn42-schema.py -v scan data/ || ( echo "Schema validation failed, please check above!" ; exit 1 )
else
utils/schema-check/dn42-schema.py -v scan data/ -f "data/mntner/$1" || ( echo "Schema validation for mntner object failed, please check above!" ; exit 1 )
utils/schema-check/dn42-schema.py -v scan data/ -m "$1" || ( echo "Schema validation for related objects failed, please check above!" ; exit 1 )
# check single mntner
mfile="data/mntner/$mntner"
if ! [ -f "$mfile" ]
then
>&2 echo "MNTNER object does not exist: $mfile"
exit 1
fi
if ! "$check_script" -v scan data/ -f "$mfile"
then
>&2 echo 'Schema validation for mntner object failed!'
exit 1
fi
if ! "$check_script" -v scan data/ -m "$mntner"
then
>&2 echo 'Schema validation for related objects failed!'
exit 1
fi
fi
# all good
exit 0
###########################################################################
# end of file

113
check-pol
View file

@ -1,18 +1,105 @@
#!/usr/bin/env bash
set -o pipefail
#!/bin/sh -e
###########################################################################
#
# dn42 registry - policy checks
#
###########################################################################
if [ $# -eq 0 ]
then
echo "Usage: $0 COMMIT YOUR-MNT"
exit
commit="$1"
mntner="$2"
if [ -z "$commit" ] || [ -z "$mntner" ]
then
>&2 echo "Usage: $0 COMMIT YOUR-MNT"
exit 1
fi
BASE="$(readlink -f "$0" 2>/dev/null || python -c 'import os,sys;print(os.path.realpath(sys.argv[1]))' "$0")"
BASE="$(dirname "$BASE")"
cd "$BASE" || exit 1
check_script='utils/schema-check/dn42_schema_local.py'
exitcode=0
git diff --name-only "$1" | while IFS='/' read -ra LINE; do
if [[ "${LINE[0]}" = "data" && -n "${LINE[2]}" ]]; then
utils/schema-check/dn42-schema.py -v policy "${LINE[1]}" "${LINE[2]}" "$2"
###########################################################################
# determine registry directory
#
# this will fail if the script is in the PATH or is sourced but those
# both seem unlikely. In any case if it does fail an env var can be used
# to override the check
rdir="$REGDIR"
if [ -z "$rdir" ]
then
rdir=$(cd -- "$(dirname -- "$0")" && pwd)
fi
done
if ! [ -x "${rdir}/${check_script}" ]
then
>&2 cat <<EOF
ERROR: Unable to automatically find the registry directory,
or the script '$check_script' is not executable
You can set the directory manually using the
REGDIR environment variable.
For example:
REGDIR='path/to/registry' $0 $commit $mntner
EOF
exit 1
fi
# switch to the registry directory
cd "$rdir"
###########################################################################
# find each changed file, using git diff, and then run the policy
# check against each object that has changed
#
# the shell loop is a bit contrived but is required to maintain POSIX
# compatibility and avoid the need for subshells
# loop through each file that has changed
while IFS= read -r filename
do
# extract the object type and name from the filename
IFS='/'
# shellcheck disable=SC2086
set -- $filename
IFS=
path="$1"
type="$2"
object="$3"
# check the file really is a registry object
# (including if it still exists, as it may have been deleted)
if [ -f "$filename" ] && [ "$path" = 'data' ] && \
[ -n "$type" ] && [ -n "$object" ]
then
# run the check script
if ! "$check_script" -v policy \
"$type" "$object" "$mntner" "$commit"
then
# update exit code on failure
exitcode=1
fi
fi
done <<EOF
$(git diff --name-only "$commit")
EOF
###########################################################################
# output a message and set exit code on failure
if [ "$exitcode" -ne 0 ]
then
>&2 echo 'FAILED: check the output for details'
exit "$exitcode"
fi
# all good
exit 0
###########################################################################
# end of file

View file

@ -1,15 +1,54 @@
#!/bin/sh
#!/bin/sh -e
###########################################################################
#
# dn42 registry - object formatting
#
###########################################################################
if [ "$#" -eq "0" ]
then
echo "Usage: $0 YOUR-MNT"
exit
mntner="$1"
if [ -z "$mntner" ]
then
>&2 echo "Usage: $0 YOUR-MNT"
exit 1
fi
BASE="$(readlink -f "$0" 2>/dev/null || python -c 'import os,sys;print(os.path.realpath(sys.argv[1]))' "$0")"
BASE="$(dirname "$BASE")"
check_script='utils/schema-check/dn42_schema_local.py'
grep -lrE "(\s|:)$1(\s|\$)" "$BASE/data/" | while read -r line; do
utils/schema-check/dn42-schema.py fmt -i "$line"
done
###########################################################################
# determine registry directory
#
# this will fail if the script is in the PATH or is sourced but those
# both seem unlikely. In any case if it does fail an env var can be used
# to override the check
rdir="$REGDIR"
if [ -z "$rdir" ]
then
rdir=$(cd -- "$(dirname -- "$0")" && pwd)
fi
if ! [ -x "${rdir}/${check_script}" ]
then
>&2 cat <<EOF
ERROR: Unable to automatically find the registry directory,
or the script '$check_script' is not executable
You can set the directory manually using the
REGDIR environment variable.
For example:
REGDIR='path/to/registry' $0 $mntner
EOF
exit 1
fi
###########################################################################
grep -lrE "(\s|:)$mntner(\s|\$)" "${rdir}/data/" | \
while read -r line; do
"$check_script" fmt -i "$line"
done
###########################################################################
# end of file

View file

@ -13,6 +13,6 @@ cd "$BASE" || exit 1
git diff --name-only "$1" | while IFS='/' read -ra LINE; do
if [[ "${LINE[0]}" = "data" && -n "${LINE[2]}" ]]; then
utils/schema-check/dn42_schema_local.py -v policy "${LINE[1]}" "${LINE[2]}" "$2" "$1"
utils/schema-check/dn42-schema.py -v policy "${LINE[1]}" "${LINE[2]}" "$2"
fi
done

View file

@ -22,6 +22,11 @@ SCHEMA_NAMESPACE = "dn42."
REGISTRY_URL = "git@git.dn42.dev:dn42/registry.git" if not "REG_URL" in os.environ else os.environ["REG_URL"]
REGISTRY_COMMIT = "dn42registry/master"
# CLEVEL contains terminal escape codes for coloring log levels (overwriting it with LEVEL which doesn't)
log.CLEVEL = log.LEVEL if "DN42REG_NO_COLOR" in os.environ else log.CLEVEL
log.CMSG = log.MSG if "DN42REG_NO_COLOR" in os.environ else log.CMSG # not actually used in this context
log.CMULTI = "[{1}] {2}" if "DN42REG_NO_COLOR" in os.environ else log.CMULTI
class SchemaDOM:
"schema"
@ -529,8 +534,8 @@ def _unexpand_ipv6(addr:str) -> str:
addr = addr[:-4]+":"
while addr.endswith("0000::"):
addr = addr.replace("0000::",":")
return "::" if addr == "0000:" else addr
return "::" if addr == "0000:" else addr
@cache()
def _get_parent_inetnums(inetnum:str, fields:list=[], family:str=None, commit:str=REGISTRY_COMMIT) -> list[list[str]]:
@ -540,11 +545,11 @@ def _get_parent_inetnums(inetnum:str, fields:list=[], family:str=None, commit:st
family = "ipv4" if "." in inetnum else "ipv6"
if family == "ipv6" and inetnum.startswith("0000"):
family = "ipv4"
inetnum = pretty_ip(inetnum.split("/")[0])+ "/" + str(int(inetnum.split("/")[1])-96)
inetnum = pretty_ip(inetnum.split("/")[0])+ "/" + str(int(inetnum.split("/")[1])-96)
out = []
if family == "ipv4":
netlevel = 1
netlevel = 1
# cause 0.0.0.0/0 = ::ffff:0:0/96 \subset ::/0
blk0_6 = find(fields=fields,filters={"@type":"net","cidr":"::/0"}, commit=commit)[0]
blk0_4 = find(fields=fields,filters={"@type":"net","cidr":"0.0.0.0/0"}, commit=commit)[0]
@ -555,7 +560,7 @@ def _get_parent_inetnums(inetnum:str, fields:list=[], family:str=None, commit:st
blk0_6 = [["@family","ipv6"]] + blk0_6
blk0_4 = [["@family","ipv4"]] + blk0_4
# TODO: implement other "@fields"
netlevel += 2
out=[blk0_6,blk0_4]
for i in range(1,subnet_len + 1): #ipv4 subnet length
@ -573,7 +578,7 @@ def _get_parent_inetnums(inetnum:str, fields:list=[], family:str=None, commit:st
netlevel += 1
out.append(blk_filtered)
elif family == "ipv6":
netlevel = 1 # cause start counting at 1 ...
blk0 = find(fields=fields,filters={"@type":"net","cidr":"::/0"}, commit=commit)[0]
@ -583,7 +588,7 @@ def _get_parent_inetnums(inetnum:str, fields:list=[], family:str=None, commit:st
if "@family" in fields or fields == []:
blk0 = [["@family","ipv6"]] + blk0
# TODO: implement other "@fields"
netlevel += 1
out=[blk0]
for i in range(1,subnet_len + 1): #ipv6 subnet length (max=64)
@ -618,11 +623,11 @@ def _get_parent_as_blocks(as_min:str, as_max:str, fields:list=[], commit:str = R
if not (block_min_int <= as_min_int and as_max_int <= block_max_int):
continue
block_content = _get_file_content_upstream(f"data/as-block/{block}", commit = commit)
if block_content == [""]:
if block_content == [""]:
continue #shouldn't happen
block_filtered = _get_values_of_filecontent(block_content, fields=fields, filters={"@type":"as-block","@name":block})
if "@as-min" in fields:
block_filtered = [["@as-min",block_min]] + block_filtered
if "@as-max" in fields:
@ -673,8 +678,8 @@ def find_new(fields: list = None, filters: dict = None, commit:str = REGISTRY_CO
if "@name" in filters.keys():
obj_name = filters["@name"]
elif "cidr" in filters.keys():
obj_name = filters["@name"] = filters["cidr"].replace("/", "_")
else:
obj_name = filters["@name"] = filters["cidr"].replace("/", "_")
else:
obj_name = filters["route"].replace("/", "_") if "route" in filters.keys() else filters["route6"].replace("/","_")
if obj_type == "net":
@ -703,11 +708,11 @@ def find_new(fields: list = None, filters: dict = None, commit:str = REGISTRY_CO
elif {"@netmin","@netmask"}.issubset(filters.keys()) and filters["@type"]=="net":
# assumes @netmin="lt=<lowest_ip>", @netmask="<[1..128]: if ipv4: 96+v4_subnetlen>"
netmin = pretty_ip(filters["@netmin"].split("=")[1])
inetnum = netmin + "/" + str(int(filters["@netmask"].split("=")[1])-96) if "." in netmin else netmin + "/" + filters["@netmask"].split("=")[1]
inetnum = netmin + "/" + str(int(filters["@netmask"].split("=")[1])-96) if "." in netmin else netmin + "/" + filters["@netmask"].split("=")[1]
out = _get_parent_inetnums(inetnum, fields=fields, commit=commit)
return out
elif {"@as-min","@as-max"}.issubset(filters.keys()) and filters["@type"] == "as-block":
# assumes @as-min="le=<smallest_asn>", @as-max="ge=<largest_asn>"
as_min = filters["@as-min"].split("=")[1]
@ -715,7 +720,7 @@ def find_new(fields: list = None, filters: dict = None, commit:str = REGISTRY_CO
out = _get_parent_as_blocks(as_min, as_max, fields, commit = commit)
return out
elif {"@family"} == filters.keys():
# works for everything except if "@netlevel" is in fields
ip_family = filters["@family"]
@ -726,11 +731,11 @@ def find_new(fields: list = None, filters: dict = None, commit:str = REGISTRY_CO
for net in nets:
if net =="" or net.startswith("tree"):
continue
net_content = _get_file_content_upstream(f"data/{obj_type}/{net}", commit = commit)
if net_content == [""]:
if net_content == [""]:
continue #shouldn't happen
net_filtered = _get_values_of_filecontent(net_content, fields=fields, filters={"@type":"net","@name":net})
cidr = _get_values_of_filecontent(net_content, fields=["cidr"], filters={"@type":"net","@name":net})[0][1]
net_min, net_max, net_mask = inetrange(cidr) if ip_family == "ipv4" else inet6range(cidr)
@ -1326,14 +1331,14 @@ def run(args):
print(str(dom))
elif args["command"] == "policy":
# check if the commit to check against is old
now = int(time.time())
try:
commit_time = int(_run_command(f"git show -s --date=unix --format=%cd {args['commit']}".split(" ")).strip()) # should return only the unix timestamp of the commit
except ValueError as e:
log.fatal(f"could not determine time of the provided commit: {args['commit']}")
if now - commit_time > 60*60*24*14: # more than two weeks(14 days)
log.warning(f"the commit to check against is older than 14 days, consider fetching/using a newer commit")