Support calling out to apt-file.

This commit is contained in:
Jelmer Vernooij 2021-03-29 17:53:58 +01:00
parent 3bcfc950cb
commit 5f511b2b80
No known key found for this signature in database
GPG key ID: 579C160D4C9E23E8
3 changed files with 77 additions and 15 deletions

View file

@ -221,7 +221,6 @@ import sys
script_name = %(script_name)s script_name = %(script_name)s
save_argv = sys.argv.copy()
g = {"__file__": script_name, "__name__": "__main__"} g = {"__file__": script_name, "__name__": "__main__"}
try: try:
core._setup_stop_after = "init" core._setup_stop_after = "init"

View file

@ -28,7 +28,7 @@ from .. import DetailedFailure, UnidentifiedError
from ..session import Session, run_with_tee, get_user from ..session import Session, run_with_tee, get_user
from .file_search import ( from .file_search import (
FileSearcher, FileSearcher,
AptCachedContentsFileSearcher, get_apt_contents_file_searcher,
GENERATED_FILE_SEARCHER, GENERATED_FILE_SEARCHER,
get_packages_for_paths, get_packages_for_paths,
) )
@ -76,7 +76,7 @@ class AptManager(object):
def searchers(self): def searchers(self):
if self._searchers is None: if self._searchers is None:
self._searchers = [ self._searchers = [
AptCachedContentsFileSearcher.from_session(self.session), get_apt_contents_file_searcher(self.session),
GENERATED_FILE_SEARCHER, GENERATED_FILE_SEARCHER,
] ]
return self._searchers return self._searchers
@ -90,7 +90,6 @@ class AptManager(object):
def get_packages_for_paths(self, paths, regex=False, case_insensitive=False): def get_packages_for_paths(self, paths, regex=False, case_insensitive=False):
logging.debug("Searching for packages containing %r", paths) logging.debug("Searching for packages containing %r", paths)
# TODO(jelmer): Make sure we use whatever is configured in self.session
return get_packages_for_paths( return get_packages_for_paths(
paths, self.searchers(), regex=regex, case_insensitive=case_insensitive paths, self.searchers(), regex=regex, case_insensitive=case_insensitive
) )

View file

@ -21,11 +21,13 @@ from datetime import datetime
from debian.deb822 import Release from debian.deb822 import Release
import os import os
import re import re
import subprocess
from typing import Iterator, List from typing import Iterator, List
import logging import logging
from .. import USER_AGENT from .. import USER_AGENT
from ..session import Session
class FileSearcher(object): class FileSearcher(object):
@ -158,7 +160,68 @@ def load_apt_cache_file(url, cache_dir):
raise FileNotFoundError(url) raise FileNotFoundError(url)
class AptCachedContentsFileSearcher(FileSearcher): class AptFileFileSearcher(FileSearcher):
CACHE_IS_EMPTY_PATH = '/usr/share/apt-file/is-cache-empty'
def __init__(self, session: Session):
self.session = session
@classmethod
def has_cache(cls, session: Session) -> bool:
if not os.path.exists(session.external_path(cls.CACHE_IS_EMPTY_PATH)):
return True
try:
session.check_call([cls.CACHE_IS_EMPTY_PATH])
except subprocess.CalledProcessError as e:
if e.returncode == 1:
return True
raise
else:
return False
@classmethod
def from_session(cls, session):
logging.info('Using apt-file to search apt contents')
if not os.path.exists(session.external_path(cls.CACHE_IS_EMPTY_PATH)):
from .apt import AptManager
AptManager.from_session(session).install(['apt-file'])
if not cls.has_cache(session):
session.check_call(['apt-file', 'update'], user='root')
return cls(session)
def search_files(self, path, regex=False, case_insensitive=False):
args = []
if regex:
args.append('-x')
else:
args.append('-F')
if case_insensitive:
args.append('-i')
args.append(path)
try:
output = self.session.check_output(['/usr/bin/apt-file', 'search'] + args)
except subprocess.CalledProcessError as e:
if e.returncode == 1:
# No results
return
if e.returncode == 3:
raise Exception('apt-file cache is empty')
raise
for line in output.splitlines(False):
pkg, path = line.split(b': ')
yield pkg.decode('utf-8')
def get_apt_contents_file_searcher(session):
if AptFileFileSearcher.has_cache(session):
return AptFileFileSearcher.from_session(session)
return RemoteContentsFileSearcher.from_session(session)
class RemoteContentsFileSearcher(FileSearcher):
def __init__(self): def __init__(self):
self._db = {} self._db = {}
@ -268,12 +331,12 @@ class GeneratedFileSearcher(FileSearcher):
with open(path, "r") as f: with open(path, "r") as f:
for line in f: for line in f:
(path, pkg) = line.strip().split(None, 1) (path, pkg) = line.strip().split(None, 1)
self._db[path] = pkg self._db.append(path, pkg)
def search_files( def search_files(
self, path: str, regex: bool = False, case_insensitive: bool = False self, path: str, regex: bool = False, case_insensitive: bool = False
) -> Iterator[str]: ) -> Iterator[str]:
for p, pkg in sorted(self._db.items()): for p, pkg in self._db:
if regex: if regex:
flags = 0 flags = 0
if case_insensitive: if case_insensitive:
@ -290,16 +353,16 @@ class GeneratedFileSearcher(FileSearcher):
# TODO(jelmer): read from a file # TODO(jelmer): read from a file
GENERATED_FILE_SEARCHER = GeneratedFileSearcher( GENERATED_FILE_SEARCHER = GeneratedFileSearcher(
{ [
"/etc/locale.gen": "locales", ("/etc/locale.gen", "locales"),
# Alternative # Alternative
"/usr/bin/rst2html": "python3-docutils", ("/usr/bin/rst2html", "python3-docutils"),
# aclocal is a symlink to aclocal-1.XY # aclocal is a symlink to aclocal-1.XY
"/usr/bin/aclocal": "automake", ("/usr/bin/aclocal", "automake"),
"/usr/bin/automake": "automake", ("/usr/bin/automake", "automake"),
# maven lives in /usr/share # maven lives in /usr/share
"/usr/bin/mvn": "maven", ("/usr/bin/mvn", "maven"),
} ]
) )
@ -322,6 +385,7 @@ def get_packages_for_paths(
def main(argv): def main(argv):
import argparse import argparse
from ..session.plain import PlainSession
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("path", help="Path to search for.", type=str, nargs="*") parser.add_argument("path", help="Path to search for.", type=str, nargs="*")
@ -334,7 +398,7 @@ def main(argv):
else: else:
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
main_searcher = AptCachedContentsFileSearcher() main_searcher = get_apt_contents_file_searcher(PlainSession())
main_searcher.load_local() main_searcher.load_local()
searchers = [main_searcher, GENERATED_FILE_SEARCHER] searchers = [main_searcher, GENERATED_FILE_SEARCHER]