import abc
import base64
import collections
import contextlib
import dataclasses
import html.parser
import itertools
import os.path
import typing
import urllib.parse
import aiofiles.os
import httpx
from pypi_browser import packaging
class PythonRepository(abc.ABC):
@abc.abstractmethod
async def files_for_package(self, package_name: str) -> dict[str, str]:
"""Return mapping from filename to file URL for files in a package."""
class HTMLAnchorParser(html.parser.HTMLParser):
anchors: set[str]
def __init__(self) -> None:
super().__init__()
self.anchors = set()
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
if tag == 'a':
if href := dict(attrs).get('href'):
self.anchors.add(href)
@dataclasses.dataclass(frozen=True)
class SimpleRepository(PythonRepository):
""""Simple" PyPI registry serving either JSON or HTML files."""
pypi_url: str
async def files_for_package(self, package_name: str) -> dict[str, str]:
async with httpx.AsyncClient() as client:
resp = await client.get(
f'{self.pypi_url}/{package_name}',
follow_redirects=True,
headers={
'Accept': ', '.join((
'application/vnd.pypi.simple.v1+json',
'application/vnd.pypi.simple.v1+html;q=0.2',
'text/html;q=0.01',
)),
},
)
if resp.status_code == 404:
raise PackageDoesNotExist(package_name)
def clean_url(url: str) -> str:
parsed = urllib.parse.urlparse(urllib.parse.urljoin(str(resp.url), url))
return parsed._replace(fragment='').geturl()
if resp.headers.get('Content-Type') == 'application/vnd.pypi.simple.v1+json':
result = resp.json()
return {file_['filename']: clean_url(file_['url']) for file_ in result['files']}
else:
parser = HTMLAnchorParser()
parser.feed(resp.text)
return {
(urllib.parse.urlparse(url).path).split('/')[-1]: clean_url(url)
for url in parser.anchors
}
@dataclasses.dataclass(frozen=True)
class LegacyJsonRepository(PythonRepository):
"""Non-standardized JSON API compatible with pypi.org's /pypi/*/json endpoints."""
pypi_url: str
async def files_for_package(self, package_name: str) -> dict[str, str]:
async with httpx.AsyncClient() as client:
resp = await client.get(
f'{self.pypi_url}/pypi/{package_name}/json',
follow_redirects=True,
)
if resp.status_code == 404:
raise PackageDoesNotExist(package_name)
resp.raise_for_status()
return {
file_['filename']: urllib.parse.urljoin(str(resp.url), file_['url'])
for file_ in itertools.chain.from_iterable(resp.json()['releases'].values())
}
@dataclasses.dataclass(frozen=True)
class PyPIConfig:
repo: PythonRepository
cache_path: str
class PackageDoesNotExist(Exception):
pass
async def files_by_version(config: PyPIConfig, package: str) -> dict[str | None, set[str]]:
ret = collections.defaultdict(set)
for filename in await config.repo.files_for_package(package):
try:
version = packaging.guess_version_from_filename(filename)
except ValueError:
# Possible with some very poorly-formed packages that used to be
# allowed on PyPI. Just skip them when this happens.
pass
else:
ret[version].add(filename)
return ret
class CannotFindFileError(Exception):
pass
def _storage_path(config: PyPIConfig, package: str, filename: str) -> str:
return os.path.join(
config.cache_path,
# Base64-encoding the names to calculate the storage path just to be
# extra sure to avoid any path traversal vulnerabilities.
base64.urlsafe_b64encode(package.encode('utf8')).decode('ascii'),
base64.urlsafe_b64encode(filename.encode('utf8')).decode('ascii'),
)
@contextlib.asynccontextmanager
async def _atomic_file(path: str) -> typing.AsyncIterator[aiofiles.threadpool.binary.AsyncBufferedIOBase]:
async with aiofiles.tempfile.NamedTemporaryFile('wb', dir=os.path.dirname(path), delete=False) as f:
tmp_path = typing.cast(str, f.name)
try:
yield f
except BaseException:
await aiofiles.os.remove(tmp_path)
raise
else:
# This is atomic since the temporary file was created in the same directory.
await aiofiles.os.rename(tmp_path, path)
async def downloaded_file_path(config: PyPIConfig, package: str, filename: str) -> str:
"""Return path on filesystem to downloaded PyPI file.
May be instant if the file is already cached; otherwise it will download
it and may take a while.
"""
stored_path = _storage_path(config, package, filename)
if await aiofiles.os.path.exists(stored_path):
return stored_path
filename_to_url = await config.repo.files_for_package(package)
try:
url = filename_to_url[filename]
except KeyError:
raise CannotFindFileError(package, filename)
await aiofiles.os.makedirs(os.path.dirname(stored_path), exist_ok=True)
async with httpx.AsyncClient() as client:
async with _atomic_file(stored_path) as f:
async with client.stream('GET', url) as resp:
resp.raise_for_status()
async for chunk in resp.aiter_bytes():
await f.write(chunk)
return stored_path