pypi_browser/packaging.py

import asyncio
import base64
import contextlib
import enum
import os.path
import re
import stat
import typing
import zipfile
from dataclasses import dataclass


def pep426_normalize(package_name: str) -> str:
    return re.sub(r'[-_.]+', '-', package_name).lower()


class UnsupportedPackageType(Exception):
    pass


class PackageType(enum.Enum):
    SDIST = enum.auto()
    WHEEL = enum.auto()
    EGG = enum.auto()


class PackageFormat(enum.Enum):
    ZIPFILE = enum.auto()
    TARBALL = enum.auto()
    TARBALL_GZ = enum.auto()
    TARBALL_BZ2 = enum.auto()


@dataclass(frozen=True)
class PackageEntry:
    path: str
    mode: str
    size: int


def _package_entries_from_zipfile(path: str) -> typing.Set[PackageEntry]:
    with zipfile.ZipFile(path) as zf:
        return {
            PackageEntry(
                path=entry.filename,
                size=entry.file_size,
                mode=stat.filemode(entry.external_attr >> 16),
            )
            for entry in zf.infolist()
            if not entry.is_dir()
        }


ArchiveFile = typing.Union[zipfile.ZipExtFile]


class AsyncArchiveFile:

    file_: ArchiveFile

    def __init__(self, file_: ArchiveFile) -> None:
        self.file_ = file_

    async def __aenter__(self) -> 'AsyncArchiveFile':
        return self

    async def __aexit__(self, exc_t, exc_v, exc_tb) -> None:
        await asyncio.to_thread(self.file_.close)

    async def read(self, n_bytes: typing.Optional[int] = None) -> bytes:
        return await asyncio.to_thread(self.file_.read, n_bytes)


@dataclass(frozen=True)
class Package:
    package_type: PackageType
    package_format: PackageFormat
    path: str

    @classmethod
    def from_path(cls, path: str) -> 'Package':
        name = base64.b64decode(os.path.basename(path).encode('ascii')).decode('utf8')

        if name.endswith('.whl'):
            package_type = PackageType.WHEEL
            package_format = PackageFormat.ZIPFILE
        elif name.endswith('.zip'):
            package_type = PackageType.SDIST
            package_format = PackageFormat.ZIPFILE
        elif name.endswith('.egg'):
            package_type = PackageType.EGG
            package_format = PackageFormat.ZIPFILE
        else:
            # TODO: Add support for tarballs
            raise UnsupportedPackageType(name)

        return cls(
            package_type=package_type,
            package_format=package_format,
            path=path,
        )

    async def entries(self) -> typing.Set[PackageEntry]:
        if self.package_format is PackageFormat.ZIPFILE:
            return await asyncio.to_thread(_package_entries_from_zipfile, self.path)
        else:
            raise AssertionError(self.package_format)

    @contextlib.asynccontextmanager
    async def open_from_archive(self, path: str) -> str:
        if self.package_format is PackageFormat.ZIPFILE:
            zf = await asyncio.to_thread(zipfile.ZipFile, self.path)
            archive_file = await asyncio.to_thread(zf.open, path)
            try:
                async with AsyncArchiveFile(archive_file) as zip_archive_file:
                    yield zip_archive_file
            finally:
                await asyncio.to_thread(zf.close)
        else:
            raise AssertionError(self.package_format)
Metadata
View Raw File