aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoek Le Blansch <loek@pipeframe.xyz>2025-10-25 10:27:23 +0200
committerLoek Le Blansch <loek@pipeframe.xyz>2025-10-25 10:27:23 +0200
commitc12e268727a44dfcbe696e6116777da305fc1046 (patch)
tree11641d4b431d7b1253f03c1609f228eba226c287
parent5aac30996d6f4c99a752e6d1e83573e8962d4eea (diff)
better zip support
-rw-r--r--patchtree/context.py122
1 files changed, 99 insertions, 23 deletions
diff --git a/patchtree/context.py b/patchtree/context.py
index 57200a5..2721f9b 100644
--- a/patchtree/context.py
+++ b/patchtree/context.py
@@ -1,22 +1,107 @@
from argparse import Namespace
-from typing import Union, IO, cast
-from pathlib import Path as DiskPath
-from zipfile import is_zipfile, Path as ZipPath
+from typing import IO, cast
+from pathlib import Path
+from zipfile import ZipInfo, is_zipfile
from tempfile import TemporaryFile
from os import path
from sys import stdout, stderr
from subprocess import run
+from zipfile import ZipFile
+from stat import S_IFDIR, S_IFREG
+ZIP_CREATE_SYSTEM_UNX = 3
-Path = Union[DiskPath, ZipPath]
+
+class FS:
+ target: str
+
+ def __init__(self, target: str):
+ self.target = target
+
+ def get_dir(self, dir: str) -> list[str]:
+ raise NotImplementedError()
+
+ def get_content(self, file: str) -> str | None:
+ raise NotImplementedError()
+
+ def get_mode(self, file: str) -> int:
+ raise NotImplementedError()
+
+
+class DiskFS(FS):
+ path: Path
+
+ def __init__(self, target: str):
+ super(DiskFS, self).__init__(target)
+ self.path = Path(target)
+
+ def get_dir(self, dir: str) -> list[str]:
+ here = self.path.joinpath(dir)
+ return [path.name for path in here.iterdir()]
+
+ def get_content(self, file: str) -> str | None:
+ here = self.path.joinpath(file)
+ if not here.exists():
+ return None
+ return here.read_bytes().decode()
+
+ def get_mode(self, file: str) -> int:
+ here = self.path.joinpath(file)
+ if not here.exists():
+ return 0
+ return here.stat().st_mode
+
+
+class ZipFS(FS):
+ zip: ZipFile
+ files: dict[Path, ZipInfo] = {}
+
+ def __init__(self, target: str):
+ super(ZipFS, self).__init__(target)
+ self.zip = ZipFile(target)
+ for info in self.zip.infolist():
+ self.files[Path(info.filename)] = info
+
+ def get_info(self, path: str) -> ZipInfo | None:
+ return self.files.get(Path(path), None)
+
+ def get_dir(self, dir: str) -> list[str]:
+ items: set[str] = set()
+ dir = path.normpath("/" + dir)
+ for zip_dir in self.zip.namelist():
+ zip_dir = path.normpath("/" + zip_dir)
+ if not zip_dir.startswith(dir):
+ continue
+ if zip_dir == dir:
+ continue
+ relative = path.relpath(zip_dir, dir)
+ top_level = relative.split("/")[0]
+ items.add(top_level)
+ return list(items)
+
+ def get_content(self, file: str) -> str | None:
+ info = self.get_info(file)
+ if info is None:
+ return None
+ return self.zip.read(info).decode()
+
+ def get_mode(self, file: str) -> int:
+ info = self.get_info(file)
+ if info is None:
+ return 0
+ if info.create_system == ZIP_CREATE_SYSTEM_UNX:
+ return (info.external_attr >> 16) & 0xFFFF
+ if info.is_dir():
+ return 0o755 | S_IFDIR
+ return 0o644 | S_IFREG
class Context:
- fs: Path
+ fs: FS
output: IO
options: Namespace
- cache: DiskPath
+ cache: Path
def __init__(self, options: Namespace):
self.options = options
@@ -24,7 +109,7 @@ class Context:
self.output = self.get_output()
if self.options.in_place:
- location = cast(DiskPath, self.fs)
+ location = cast(DiskFS, self.fs).path
self.cache = location.joinpath(".patchtree.diff")
if self.cache.exists():
run(
@@ -40,7 +125,7 @@ class Context:
if self.options.in_place:
self.output.seek(0)
patch = self.output.read()
- location = cast(DiskPath, self.fs)
+ location = cast(DiskFS, self.fs).path
if len(patch) > 0:
self.cache.write_text(patch)
run(
@@ -51,36 +136,27 @@ class Context:
self.output.close()
def get_dir(self, dir: str) -> list[str]:
- here = self.fs.joinpath(dir)
- return [path.name for path in here.iterdir()]
+ return self.fs.get_dir(dir)
def get_content(self, file: str) -> str | None:
- here = self.fs.joinpath(file)
- if not here.exists():
- return None
- return here.read_bytes().decode()
+ return self.fs.get_content(file)
def get_mode(self, file: str) -> int:
- # TODO
- return 0
- here = self.fs.joinpath(file)
- if not here.exists():
- return 0
- return here.stat().st_mode
+ return self.fs.get_mode(file)
- def get_fs(self) -> Path:
+ def get_fs(self) -> FS:
target: str = self.options.target
if not path.exists(target):
raise Exception(f"cannot open `{target}'")
if path.isdir(target):
- return DiskPath(target)
+ return DiskFS(target)
if is_zipfile(target):
if self.options.in_place:
raise Exception("cannot edit zip in-place!")
- return ZipPath(target)
+ return ZipFS(target)
raise Exception("cannot read `{target}'")