From 6aaa2df421548628898262a5398710a549d24984 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jelmer=20Vernoo=C4=B3?= Date: Tue, 16 Mar 2021 15:26:52 +0000 Subject: [PATCH] Factor out file search. --- ognibuild/__init__.py | 3 + ognibuild/debian/apt.py | 236 +------------------------ ognibuild/debian/file_search.py | 293 ++++++++++++++++++++++++++++++++ 3 files changed, 299 insertions(+), 233 deletions(-) create mode 100644 ognibuild/debian/file_search.py diff --git a/ognibuild/__init__.py b/ognibuild/__init__.py index 6e210c2..366f1f8 100644 --- a/ognibuild/__init__.py +++ b/ognibuild/__init__.py @@ -20,6 +20,9 @@ import os import stat +USER_AGENT = "Ognibuild" + + class DetailedFailure(Exception): def __init__(self, retcode, argv, error): self.retcode = retcode diff --git a/ognibuild/debian/apt.py b/ognibuild/debian/apt.py index 32b2a7e..eb318b6 100644 --- a/ognibuild/debian/apt.py +++ b/ognibuild/debian/apt.py @@ -18,7 +18,7 @@ import logging import re -from typing import List, Iterator, Optional, Set +from typing import List, Optional, Set import os from buildlog_consultant.apt import ( @@ -27,9 +27,7 @@ from buildlog_consultant.apt import ( from .. import DetailedFailure, UnidentifiedError from ..session import Session, run_with_tee - - -USER_AGENT = "Ognibuild" +from .file_search import FileSearcher, AptCachedContentsFileSearcher, GENERATED_FILE_SEARCHER, get_package_for_paths def run_apt(session: Session, args: List[str]) -> None: @@ -46,11 +44,6 @@ def run_apt(session: Session, args: List[str]) -> None: raise UnidentifiedError(retcode, args, lines, secondary=match) -class FileSearcher(object): - def search_files(self, path: str, regex: bool = False) -> Iterator[str]: - raise NotImplementedError(self.search_files) - - class AptManager(object): session: Session @@ -64,7 +57,7 @@ class AptManager(object): def searchers(self): if self._searchers is None: self._searchers = [ - AptContentsFileSearcher.from_session(self.session), + AptCachedContentsFileSearcher.from_session(self.session), GENERATED_FILE_SEARCHER, ] return self._searchers @@ -108,226 +101,3 @@ class AptManager(object): def satisfy_command(self, deps: List[str]) -> List[str]: return ["apt", "satisfy"] + deps - - -class ContentsFileNotFound(Exception): - """The contents file was not found.""" - - -def read_contents_file(f): - for line in f: - (path, rest) = line.rsplit(maxsplit=1) - package = rest.split(b"/")[-1] - decoded_path = "/" + path.decode("utf-8", "surrogateescape") - yield decoded_path, package.decode("utf-8") - - -def url_to_cache_filename(url): - from urllib.parse import urlparse - parsed = urlparse(url) - return parsed.hostname + parsed.path.replace("/", "_") - - -def contents_urls_from_sourceslist(sl, arch): - # TODO(jelmer): Verify signatures, etc. - arches = [(arch, True), ("all", False)] - for source in sl.list: - if source.invalid or source.disabled: - continue - if source.type == "deb-src": - continue - if source.type != "deb": - logging.warning("Invalid line in sources: %r", source) - continue - base_url = source.uri.rstrip("/") - name = source.dist.rstrip("/") - components = source.comps - if components: - dists_url = base_url + "/dists" - else: - dists_url = base_url - if components: - for component in components: - for arch, mandatory in arches: - yield ( - "%s/%s/%s/Contents-%s" - % (dists_url, name, component, arch), - mandatory, - ) - else: - for arch, mandatory in arches: - yield ( - "%s/%s/Contents-%s" % (dists_url, name.rstrip("/"), arch), - mandatory, - ) - - -def load_contents_url(url): - from urllib.error import HTTPError - from urllib.request import urlopen, Request - - for ext in [".xz", ".gz", ""]: - try: - request = Request( - url + ext, headers={"User-Agent": USER_AGENT}) - response = urlopen(request) - except HTTPError as e: - if e.status == 404: - continue - raise - break - else: - raise ContentsFileNotFound(url) - if ext == ".gz": - import gzip - - f = gzip.GzipFile(fileobj=response) - elif ext == ".xz": - import lzma - from io import BytesIO - - f = BytesIO(lzma.decompress(response.read())) - elif response.headers.get_content_type() == "text/plain": - f = response - else: - raise Exception( - "Unknown content type %r" % response.headers.get_content_type() - ) - return f - - -class AptContentsFileSearcher(FileSearcher): - def __init__(self): - self._db = {} - - @classmethod - def from_session(cls, session): - logging.info("Loading apt contents information") - # TODO(jelmer): what about sources.list.d? - from aptsources.sourceslist import SourcesList - - sl = SourcesList() - sl.load(os.path.join(session.location, "etc/apt/sources.list")) - return cls.from_sources_list( - sl, - cache_dirs=[ - os.path.join(session.location, "var/lib/apt/lists"), - "/var/lib/apt/lists", - ], - ) - - def __setitem__(self, path, package): - self._db[path] = package - - def search_files(self, path, regex=False): - if regex: - c = re.compile(path) - for p, pkg in sorted(self._db.items()): - if c.match(p): - yield pkg - else: - try: - return self._db[path] - except KeyError: - pass - - def load_file(self, f): - for path, package in read_contents_file(f): - self[path] = package - - @classmethod - def _load_cache_file(cls, url, cache_dir): - fn = url_to_cache_filename(url) - p = os.path.join(cache_dir, fn + ".lz4") - if not os.path.exists(p): - return None - logging.debug("Loading cached contents file %s", p) - import lz4.frame - - return lz4.frame.open(p, mode="rb") - - @classmethod - def from_urls(cls, urls, cache_dirs=None): - self = cls() - for url, mandatory in urls: - for cache_dir in cache_dirs or []: - f = cls._load_cache_file(url, cache_dir) - if f is not None: - self.load_file(f) - break - else: - if not mandatory and self._db: - logging.debug( - "Not attempting to fetch optional contents " "file %s", url - ) - else: - logging.debug("Fetching contents file %s", url) - try: - self.load_url(url) - except ContentsFileNotFound: - if mandatory: - logging.warning("Unable to fetch contents file %s", url) - else: - logging.debug( - "Unable to fetch optional contents file %s", url - ) - return self - - @classmethod - def from_sources_list(cls, sl, cache_dirs=None): - # TODO(jelmer): Use aptsources.sourceslist.SourcesList - from .build import get_build_architecture - - urls = list( - contents_urls_from_sourceslist(sl, get_build_architecture())) - return cls.from_urls(urls, cache_dirs=cache_dirs) - - def load_url(self, url, allow_cache=True): - f = load_contents_url(url) - self.load_file(f) - - -class GeneratedFileSearcher(FileSearcher): - def __init__(self, db): - self._db = db - - def search_files(self, path: str, regex: bool = False) -> Iterator[str]: - for p, pkg in sorted(self._db.items()): - if regex: - if re.match(path, p): - yield pkg - else: - if path == p: - yield pkg - - -# TODO(jelmer): read from a file -GENERATED_FILE_SEARCHER = GeneratedFileSearcher( - { - "/etc/locale.gen": "locales", - # Alternative - "/usr/bin/rst2html": "/usr/share/docutils/scripts/python3/rst2html", - } -) - - -def get_package_for_paths( - paths: List[str], searchers: List[FileSearcher], regex: bool = False -) -> Optional[str]: - candidates: Set[str] = set() - for path in paths: - for searcher in searchers: - candidates.update(searcher.search_files(path, regex=regex)) - if candidates: - break - if len(candidates) == 0: - logging.debug("No packages found that contain %r", paths) - return None - if len(candidates) > 1: - logging.warning( - "More than 1 packages found that contain %r: %r", path, candidates - ) - # Euhr. Pick the one with the shortest name? - return sorted(candidates, key=len)[0] - else: - return candidates.pop() diff --git a/ognibuild/debian/file_search.py b/ognibuild/debian/file_search.py new file mode 100644 index 0000000..5a0b105 --- /dev/null +++ b/ognibuild/debian/file_search.py @@ -0,0 +1,293 @@ +#!/usr/bin/python +# Copyright (C) 2019-2020 Jelmer Vernooij +# encoding: utf-8 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +from datetime import datetime +import os +import re +from typing import Iterator, List, Optional, Set +import logging + + +from .. import USER_AGENT + + +class FileSearcher(object): + def search_files(self, path: str, regex: bool = False) -> Iterator[str]: + raise NotImplementedError(self.search_files) + + +class ContentsFileNotFound(Exception): + """The contents file was not found.""" + + +def read_contents_file(f): + for line in f: + (path, rest) = line.rsplit(maxsplit=1) + yield path, rest + + +def url_to_cache_filename(url): + from urllib.parse import urlparse + parsed = urlparse(url) + return parsed.hostname + parsed.path.replace("/", "_") + + +def contents_urls_from_sourceslist(sl, arch): + # TODO(jelmer): Verify signatures, etc. + arches = [(arch, True), ("all", False)] + for source in sl.list: + if source.invalid or source.disabled: + continue + if source.type == "deb-src": + continue + if source.type != "deb": + logging.warning("Invalid line in sources: %r", source) + continue + base_url = source.uri.rstrip("/") + name = source.dist.rstrip("/") + components = source.comps + if components: + dists_url = base_url + "/dists" + else: + dists_url = base_url + if components: + for component in components: + for arch, mandatory in arches: + yield ( + "%s/%s/%s/Contents-%s" + % (dists_url, name, component, arch), + mandatory, + ) + else: + for arch, mandatory in arches: + yield ( + "%s/%s/Contents-%s" % (dists_url, name.rstrip("/"), arch), + mandatory, + ) + + +def load_contents_url(url): + from urllib.error import HTTPError + from urllib.request import urlopen, Request + + for ext in [".xz", ".gz", ""]: + try: + request = Request( + url + ext, headers={"User-Agent": USER_AGENT}) + response = urlopen(request) + except HTTPError as e: + if e.status == 404: + continue + raise + break + else: + raise ContentsFileNotFound(url) + if ext == ".gz": + import gzip + + f = gzip.GzipFile(fileobj=response) + elif ext == ".xz": + import lzma + from io import BytesIO + + f = BytesIO(lzma.decompress(response.read())) + elif response.headers.get_content_type() == "text/plain": + f = response + else: + raise Exception( + "Unknown content type %r" % response.headers.get_content_type() + ) + return f + + +def load_apt_cache_file(cache_dir, url): + fn = url_to_cache_filename(url) + p = os.path.join(cache_dir, fn + ".lz4") + if not os.path.exists(p): + return None + logging.debug("Loading cached contents file %s", p) + #return os.popen('/usr/lib/apt/apt-helper cat-file %s' % p) + import lz4.frame + return lz4.frame.open(p, mode="rb") + + +class AptCachedContentsFileSearcher(FileSearcher): + def __init__(self): + self._db = {} + + @classmethod + def from_session(cls, session): + logging.info("Loading apt contents information") + + self = cls() + self.load_from_session(session) + return self + + def load_local(self): + # TODO(jelmer): what about sources.list.d? + from aptsources.sourceslist import SourcesList + + sl = SourcesList() + sl.load("/etc/apt/sources.list") + + from .build import get_build_architecture + + urls = list( + contents_urls_from_sourceslist(sl, get_build_architecture())) + cache_dirs = set(["/var/lib/apt/lists"]) + self._load_urls(urls, cache_dirs) + + def load_from_session(self, session): + # TODO(jelmer): what about sources.list.d? + from aptsources.sourceslist import SourcesList + + sl = SourcesList() + sl.load(os.path.join(session.location, "etc/apt/sources.list")) + + from .build import get_build_architecture + + urls = list( + contents_urls_from_sourceslist(sl, get_build_architecture())) + cache_dirs = [ + os.path.join(session.location, "var/lib/apt/lists"), + "/var/lib/apt/lists", + ] + self._load_urls(urls, cache_dirs) + + def _load_urls(self, urls, cache_dirs): + for url, mandatory in urls: + for cache_dir in cache_dirs: + f = load_apt_cache_file(cache_dir, url) + if f is not None: + self.load_file(f, url) + break + else: + if not mandatory and self._db: + logging.debug( + "Not attempting to fetch optional contents " "file %s", url + ) + else: + logging.debug("Fetching contents file %s", url) + try: + f = load_contents_url(url) + self.load_file(f, url) + except ContentsFileNotFound: + if mandatory: + logging.warning("Unable to fetch contents file %s", url) + else: + logging.debug( + "Unable to fetch optional contents file %s", url + ) + + def __setitem__(self, path, package): + self._db[path] = package + + def search_files(self, path, regex=False): + path = path.lstrip('/').encode('utf-8', 'surrogateescape') + if regex: + c = re.compile(path) + ret = [] + for p, rest in self._db.items(): + if c.match(p): + pkg = rest.split(b"/")[-1] + ret.append((p, pkg.decode('utf-8'))) + for p, pkg in sorted(ret): + yield pkg + else: + try: + yield self._db[path].split(b"/")[-1].decode('utf-8') + except KeyError: + pass + + def load_file(self, f, url): + start_time = datetime.now() + for path, rest in read_contents_file(f.readlines()): + self[path] = rest + logging.debug('Read %s in %s', url, datetime.now() - start_time) + + +class GeneratedFileSearcher(FileSearcher): + def __init__(self, db): + self._db = db + + def search_files(self, path: str, regex: bool = False) -> Iterator[str]: + for p, pkg in sorted(self._db.items()): + if regex: + if re.match(path, p): + yield pkg + else: + if path == p: + yield pkg + + +# TODO(jelmer): read from a file +GENERATED_FILE_SEARCHER = GeneratedFileSearcher( + { + "/etc/locale.gen": "locales", + # Alternative + "/usr/bin/rst2html": "/usr/share/docutils/scripts/python3/rst2html", + } +) + + +def get_package_for_paths( + paths: List[str], searchers: List[FileSearcher], regex: bool = False +) -> Optional[str]: + candidates: Set[str] = set() + for path in paths: + for searcher in searchers: + candidates.update(searcher.search_files(path, regex=regex)) + if candidates: + break + if len(candidates) == 0: + logging.debug("No packages found that contain %r", paths) + return None + if len(candidates) > 1: + logging.warning( + "More than 1 packages found that contain %r: %r", path, candidates + ) + # Euhr. Pick the one with the shortest name? + return sorted(candidates, key=len)[0] + else: + return candidates.pop() + + +def main(argv): + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('path', help='Path to search for.', type=str, nargs='*') + parser.add_argument('--regex', '-x', help='Search for regex.', action='store_true') + parser.add_argument('--debug', action='store_true') + args = parser.parse_args() + + if args.debug: + logging.basicConfig(level=logging.DEBUG) + else: + logging.basicConfig(level=logging.INFO) + + main_searcher = AptCachedContentsFileSearcher() + main_searcher.load_local() + searchers = [main_searcher, GENERATED_FILE_SEARCHER] + + package = get_package_for_paths(args.path, searchers=searchers, regex=args.regex) + print(package) + + +if __name__ == '__main__': + import sys + sys.exit(main(sys.argv))