Factor out load_contents_url.

This commit is contained in:
Jelmer Vernooij 2021-03-16 04:34:58 +00:00
parent 1d6b83b2be
commit ceb052332b

View file

@ -29,6 +29,9 @@ from .. import DetailedFailure, UnidentifiedError
from ..session import Session, run_with_tee from ..session import Session, run_with_tee
USER_AGENT = "Ognibuild"
def run_apt(session: Session, args: List[str]) -> None: def run_apt(session: Session, args: List[str]) -> None:
"""Run apt.""" """Run apt."""
args = ["apt", "-y"] + args args = ["apt", "-y"] + args
@ -159,6 +162,40 @@ def contents_urls_from_sourceslist(sl, arch):
) )
def load_contents_url(url):
from urllib.error import HTTPError
from urllib.request import urlopen, Request
for ext in [".xz", ".gz", ""]:
try:
request = Request(
url + ext, headers={"User-Agent": USER_AGENT})
response = urlopen(request)
except HTTPError as e:
if e.status == 404:
continue
raise
break
else:
raise ContentsFileNotFound(url)
if ext == ".gz":
import gzip
f = gzip.GzipFile(fileobj=response)
elif ext == ".xz":
import lzma
from io import BytesIO
f = BytesIO(lzma.decompress(response.read()))
elif response.headers.get_content_type() == "text/plain":
f = response
else:
raise Exception(
"Unknown content type %r" % response.headers.get_content_type()
)
return f
class AptContentsFileSearcher(FileSearcher): class AptContentsFileSearcher(FileSearcher):
def __init__(self): def __init__(self):
self._db = {} self._db = {}
@ -245,41 +282,8 @@ class AptContentsFileSearcher(FileSearcher):
contents_urls_from_sourceslist(sl, get_build_architecture())) contents_urls_from_sourceslist(sl, get_build_architecture()))
return cls.from_urls(urls, cache_dirs=cache_dirs) return cls.from_urls(urls, cache_dirs=cache_dirs)
@staticmethod
def _get(url):
from urllib.request import urlopen, Request
request = Request(url, headers={"User-Agent": "Debian Janitor"})
return urlopen(request)
def load_url(self, url, allow_cache=True): def load_url(self, url, allow_cache=True):
from urllib.error import HTTPError f = load_contents_url(url)
for ext in [".xz", ".gz", ""]:
try:
response = self._get(url + ext)
except HTTPError as e:
if e.status == 404:
continue
raise
break
else:
raise ContentsFileNotFound(url)
if ext == ".gz":
import gzip
f = gzip.GzipFile(fileobj=response)
elif ext == ".xz":
import lzma
from io import BytesIO
f = BytesIO(lzma.decompress(response.read()))
elif response.headers.get_content_type() == "text/plain":
f = response
else:
raise Exception(
"Unknown content type %r" % response.headers.get_content_type()
)
self.load_file(f) self.load_file(f)