diff options
| -rw-r--r-- | patchtree/context.py | 122 |
1 files changed, 99 insertions, 23 deletions
diff --git a/patchtree/context.py b/patchtree/context.py index 57200a5..2721f9b 100644 --- a/patchtree/context.py +++ b/patchtree/context.py @@ -1,22 +1,107 @@ from argparse import Namespace -from typing import Union, IO, cast -from pathlib import Path as DiskPath -from zipfile import is_zipfile, Path as ZipPath +from typing import IO, cast +from pathlib import Path +from zipfile import ZipInfo, is_zipfile from tempfile import TemporaryFile from os import path from sys import stdout, stderr from subprocess import run +from zipfile import ZipFile +from stat import S_IFDIR, S_IFREG +ZIP_CREATE_SYSTEM_UNX = 3 -Path = Union[DiskPath, ZipPath] + +class FS: + target: str + + def __init__(self, target: str): + self.target = target + + def get_dir(self, dir: str) -> list[str]: + raise NotImplementedError() + + def get_content(self, file: str) -> str | None: + raise NotImplementedError() + + def get_mode(self, file: str) -> int: + raise NotImplementedError() + + +class DiskFS(FS): + path: Path + + def __init__(self, target: str): + super(DiskFS, self).__init__(target) + self.path = Path(target) + + def get_dir(self, dir: str) -> list[str]: + here = self.path.joinpath(dir) + return [path.name for path in here.iterdir()] + + def get_content(self, file: str) -> str | None: + here = self.path.joinpath(file) + if not here.exists(): + return None + return here.read_bytes().decode() + + def get_mode(self, file: str) -> int: + here = self.path.joinpath(file) + if not here.exists(): + return 0 + return here.stat().st_mode + + +class ZipFS(FS): + zip: ZipFile + files: dict[Path, ZipInfo] = {} + + def __init__(self, target: str): + super(ZipFS, self).__init__(target) + self.zip = ZipFile(target) + for info in self.zip.infolist(): + self.files[Path(info.filename)] = info + + def get_info(self, path: str) -> ZipInfo | None: + return self.files.get(Path(path), None) + + def get_dir(self, dir: str) -> list[str]: + items: set[str] = set() + dir = path.normpath("/" + dir) + for zip_dir in self.zip.namelist(): + zip_dir = path.normpath("/" + zip_dir) + if not zip_dir.startswith(dir): + continue + if zip_dir == dir: + continue + relative = path.relpath(zip_dir, dir) + top_level = relative.split("/")[0] + items.add(top_level) + return list(items) + + def get_content(self, file: str) -> str | None: + info = self.get_info(file) + if info is None: + return None + return self.zip.read(info).decode() + + def get_mode(self, file: str) -> int: + info = self.get_info(file) + if info is None: + return 0 + if info.create_system == ZIP_CREATE_SYSTEM_UNX: + return (info.external_attr >> 16) & 0xFFFF + if info.is_dir(): + return 0o755 | S_IFDIR + return 0o644 | S_IFREG class Context: - fs: Path + fs: FS output: IO options: Namespace - cache: DiskPath + cache: Path def __init__(self, options: Namespace): self.options = options @@ -24,7 +109,7 @@ class Context: self.output = self.get_output() if self.options.in_place: - location = cast(DiskPath, self.fs) + location = cast(DiskFS, self.fs).path self.cache = location.joinpath(".patchtree.diff") if self.cache.exists(): run( @@ -40,7 +125,7 @@ class Context: if self.options.in_place: self.output.seek(0) patch = self.output.read() - location = cast(DiskPath, self.fs) + location = cast(DiskFS, self.fs).path if len(patch) > 0: self.cache.write_text(patch) run( @@ -51,36 +136,27 @@ class Context: self.output.close() def get_dir(self, dir: str) -> list[str]: - here = self.fs.joinpath(dir) - return [path.name for path in here.iterdir()] + return self.fs.get_dir(dir) def get_content(self, file: str) -> str | None: - here = self.fs.joinpath(file) - if not here.exists(): - return None - return here.read_bytes().decode() + return self.fs.get_content(file) def get_mode(self, file: str) -> int: - # TODO - return 0 - here = self.fs.joinpath(file) - if not here.exists(): - return 0 - return here.stat().st_mode + return self.fs.get_mode(file) - def get_fs(self) -> Path: + def get_fs(self) -> FS: target: str = self.options.target if not path.exists(target): raise Exception(f"cannot open `{target}'") if path.isdir(target): - return DiskPath(target) + return DiskFS(target) if is_zipfile(target): if self.options.in_place: raise Exception("cannot edit zip in-place!") - return ZipPath(target) + return ZipFS(target) raise Exception("cannot read `{target}'") |