diff options
| author | Loek Le Blansch <loek.le-blansch.pv@renesas.com> | 2025-11-20 08:16:02 +0100 |
|---|---|---|
| committer | Loek Le Blansch <loek.le-blansch.pv@renesas.com> | 2025-11-20 08:16:02 +0100 |
| commit | be2c078c4fca5446127b924dd1dbc97c7b1636f4 (patch) | |
| tree | 4579237ded65a888702839bc936243611dcb765e | |
| parent | bbe15d0a21e04c59527ab5453d8bb49c952e4a30 (diff) | |
very broken WIP
| -rw-r--r-- | doc/dev/processor.rst | 14 | ||||
| -rw-r--r-- | patchtree/__init__.py | 2 | ||||
| -rw-r--r-- | patchtree/cli.py | 40 | ||||
| -rw-r--r-- | patchtree/config.py | 67 | ||||
| -rw-r--r-- | patchtree/context.py | 315 | ||||
| -rw-r--r-- | patchtree/diff.py | 32 | ||||
| -rw-r--r-- | patchtree/patch.py | 75 | ||||
| -rw-r--r-- | patchtree/patchspec.py | 34 | ||||
| -rw-r--r-- | patchtree/process.py | 274 | ||||
| -rw-r--r-- | patchtree/target.py | 80 | ||||
| -rw-r--r-- | todo.txt | 16 |
11 files changed, 642 insertions, 307 deletions
diff --git a/doc/dev/processor.rst b/doc/dev/processor.rst index 018d216..68de828 100644 --- a/doc/dev/processor.rst +++ b/doc/dev/processor.rst @@ -18,7 +18,7 @@ Identity The identity processor is used to "touch" files, change the mode of existing files, or add arbitrary identifiers to patchset source filenames by passing arbitrary arguments. -:Class: :any:`ProcessIdentity` +:Class: :any:`IdentityProcess` :Identifier: ``id`` :Input: Ignored. :Output: @@ -37,7 +37,7 @@ The Coccinelle processor uses Coccinelle to apply patch(es) in the SmPL (Semanti In order to use this processor, Coccinelle must be installed and ``spatch`` must be available in ``$PATH``. -:Class: :any:`ProcessCoccinelle` +:Class: :any:`CoccinelleProcess` :Identifier: ``cocci`` :Input: Coccinelle's SmPL input. :Output: The contents of the target file after being processed by Coccinelle (not the diff returned by Coccinelle). @@ -51,7 +51,7 @@ Jinja template The Jinja processor passes the input through the Jinja2 templating engine. -:Class: :any:`ProcessJinja2` +:Class: :any:`Jinja2Process` :Identifier: ``jinja`` :Input: Jinja template code. :Output: The input after being processed by Jinja. @@ -59,8 +59,8 @@ The Jinja processor passes the input through the Jinja2 templating engine. .. note:: - Template variables are generated through the :any:`get_template_vars <ProcessJinja2.get_template_vars>` method. - This method returns an empty dict by default, and is meant to be implemented by implementing a custom class that derives from ProcessJinja2 and registering it through the :ref:`configuration file <ptconfig>`. + Template variables are generated through the :any:`get_template_vars <Jinja2Process.get_template_vars>` method. + This method returns an empty dict by default, and is meant to be implemented by implementing a custom class that derives from Jinja2Process and registering it through the :ref:`configuration file <ptconfig>`. .. _process_exe: @@ -70,7 +70,7 @@ Executable The executable processor runs the input as an executable, passes the target file to its standard input, and returns its standard output. -:Class: :any:`ProcessExec` +:Class: :any:`ExecProcess` :Identifier: ``exec`` :Input: Executable script. @@ -90,7 +90,7 @@ Merge The merge processor merges the input with the target file, such that changes are combined with the target instead of replacing the target. -:Class: :any:`ProcessMerge` +:Class: :any:`MergeProcess` :Identifier: ``merge`` :Input: Content to merge. :Output: Merged changes. diff --git a/patchtree/__init__.py b/patchtree/__init__.py index 2e304a6..3950ed5 100644 --- a/patchtree/__init__.py +++ b/patchtree/__init__.py @@ -1,4 +1,4 @@ from .config import Config, Header from .diff import Diff, File from .context import Context -from .process import ProcessCoccinelle, ProcessJinja2, ProcessIdentity, ProcessExec, ProcessMerge +from .process import CoccinelleProcess, Jinja2Process, TouchProcess, ExecProcess, MergeProcess diff --git a/patchtree/cli.py b/patchtree/cli.py index 1c38d30..6e24bb4 100644 --- a/patchtree/cli.py +++ b/patchtree/cli.py @@ -1,7 +1,10 @@ from dataclasses import fields from sys import stderr from pathlib import Path -from argparse import ArgumentTypeError +from argparse import ArgumentTypeError, Namespace +from tempfile import TemporaryFile +from logging import basicConfig as log_config, addLevelName as log_add_level_name +from logging import DEBUG, INFO, WARNING, ERROR, CRITICAL from .context import Context from .config import Config @@ -62,8 +65,8 @@ def parse_arguments(config: Config) -> Context: type=int, ) parser.add_argument( - "-s", - "--shebang", + "-S", + "--no-shebang", help="output shebang in resulting patch", action="store_true", ) @@ -100,8 +103,8 @@ def parse_arguments(config: Config) -> Context: if options.context is not None: config.diff_context = options.context - if options.shebang: - config.output_shebang = True + if options.no_shebang: + config.no_shebang = True if len(options.patch) == 0: options.patch = config.default_patch_sources @@ -109,25 +112,28 @@ def parse_arguments(config: Config) -> Context: try: return config.context(config, options) except Exception as e: - parser.error(str(e)) + raise e + # parser.error(str(e)) def main(): + log_add_level_name(DEBUG, "DBG") + log_add_level_name(INFO, "INF") + log_add_level_name(WARNING, "WRN") + log_add_level_name(ERROR, "ERR") + log_add_level_name(CRITICAL, "CRT") + log_config( + stream=stderr, + level=INFO, + format="[%(levelname)s] %(message)s", + ) + config = load_config() context = parse_arguments(config) - if len(context.inputs) == 0: - print("no files to patch!", file=stderr) - return 0 - - config.header(config, context) - - for file in context.inputs: - patch = config.patch(config, file) - patch.write(context) - - context.close() + patch = context.make_patch() + print(patch, end="") return 0 diff --git a/patchtree/config.py b/patchtree/config.py index a4d0912..78f1fad 100644 --- a/patchtree/config.py +++ b/patchtree/config.py @@ -6,16 +6,16 @@ from importlib import metadata from pathlib import Path from .context import Context -from .patch import Patch +from .target import Target from .process import * from .diff import * DEFAULT_PROCESSORS: dict[str, type[Process]] = { - "id": ProcessIdentity, - "cocci": ProcessCoccinelle, - "jinja": ProcessJinja2, - "exec": ProcessExec, - "merge": ProcessMerge, + "touch": TouchProcess, + "cocci": CoccinelleProcess, + "jinja": Jinja2Process, + "exec": ExecProcess, + "merge": MergeProcess, } @@ -25,7 +25,7 @@ class Header: The header is formatted as - * shebang (optional) + * shebang * patchtree version info * extra version info (empty by default) * license (empty by default) @@ -44,32 +44,37 @@ class Header: self.config = config self.context = context - self.write_shebang() - self.write_version() - self.write_version_extra() - self.write_license() - - def write_shebang(self): + def write(self) -> str: + return "".join( + ( + self.write_shebang(), + self.write_version(), + self.write_version_extra(), + self.write_license(), + ) + ) + + def write_shebang(self) -> str: """ - Write a shebang line to apply the output patch if the --shebang option was passed. + Write a shebang line to apply the output patch unless the --no-shebang option was passed. """ - if not self.config.output_shebang: - return + if self.config.no_shebang: + return "" cmd = ["/usr/bin/env", "-S", *self.context.get_apply_cmd()] cmdline = " ".join(cmd) - self.context.output.write(f"#!{cmdline}\n") + return f"#!{cmdline}\n" - def write_version(self): + def write_version(self) -> str: """ Write the patchtree name and version number. """ version = metadata.version("patchtree") - self.context.output.write(f"{self.name} output (version {version})\n") + return f"{self.name} output (version {version})\n" - def write_version_extra(self): + def write_version_extra(self) -> str: """ Write extra version information (empty). @@ -77,16 +82,16 @@ class Header: patchsets. """ - pass + return "" - def write_license(self): + def write_license(self) -> str: """ Write a license if it is defined. """ if self.license is None: - return - self.context.output.write(f"{self.license}\n") + return "" + return f"{self.license}\n" @dataclass @@ -100,8 +105,8 @@ class Config: context: type[Context] = Context """Context class type. Override this to add custom context variables.""" - patch: type[Patch] = Patch - """Patch class type.""" + target: type[Target] = Target + """Target class type.""" argument_parser: type[ArgumentParser] = ArgumentParser """ArgumentParser class type. Override this to add custom arguments.""" @@ -129,8 +134,8 @@ class Config: diff_context: int = 3 """Lines of context to include in the diffs.""" - output_shebang: bool = False - """Whether to output a shebang line with the ``git patch`` command to apply the patch.""" + no_shebang: bool = False + """Whether to suppress the shebang line with the ``git patch`` command to apply the patch.""" default_patch_sources: list[Path] = field(default_factory=list) """List of default sources.""" @@ -138,5 +143,11 @@ class Config: default_root: str | None = None """Default value of the ``-C``/``--root`` argument.""" + patchspec_extensions: tuple[str, ...] = ( + ".yaml", + ".yml", + ) + """File extensions considered for matching standalone patch specifications.""" + def __post_init__(self): self.processors = {**DEFAULT_PROCESSORS, **self.processors} diff --git a/patchtree/context.py b/patchtree/context.py index c2cf33b..7a66b1a 100644 --- a/patchtree/context.py +++ b/patchtree/context.py @@ -1,21 +1,44 @@ from __future__ import annotations -from typing import TYPE_CHECKING, IO, cast +from dataclasses import dataclass +from typing import TYPE_CHECKING, IO, Any, TextIO, cast +from frontmatter.default_handlers import YAMLHandler +import yaml +import frontmatter + +from sys import stderr from argparse import Namespace from pathlib import Path from zipfile import ZipInfo, is_zipfile -from tempfile import TemporaryFile from os import path -from sys import stdout, stderr +from sys import stdout from subprocess import run from zipfile import ZipFile from stat import S_IFDIR, S_IFREG +from logging import Logger, getLogger as get_logger + +from yaml.nodes import MappingNode, ScalarNode + +from patchtree.diff import File + +from .patchspec import ( + FileInputSpec, + LiteralInputSpec, + PatchsetFileInputSpec, + ProcessInputSpec, + TargetFileInputSpec, +) +from .target import Target if TYPE_CHECKING: - from .config import Config + from .config import Config, Header ZIP_CREATE_SYSTEM_UNX = 3 +MODE_NONEXISTANT = 0 +MODE_FILE = 0o644 | S_IFREG +MODE_DIR = 0o755 | S_IFDIR + class FS: """Target filesystem interface.""" @@ -34,6 +57,12 @@ class FS: raise NotImplementedError() + def get_file(self, file_spec: TargetFileInputSpec) -> File: + return File( + content=self.get_content(str(file_spec.path)), + mode=self.get_mode(str(file_spec.path)), + ) + def get_content(self, file: str) -> bytes | str | None: """ Get the content of a file relative to the target. @@ -93,12 +122,23 @@ class ZipFS(FS): files: dict[Path, ZipInfo] = {} """Map of path -> ZipInfo for all files in the archive.""" + dirs: set[Path] = set() + """ + List of directories in this archive. + + Some zip files may not include entries for all directories if they already define entries + for files or subdirectories within. This set keeps all known directories. + """ + def __init__(self, target): super(ZipFS, self).__init__(target) self.zip = ZipFile(str(target)) for info in self.zip.infolist(): - self.files[Path(info.filename)] = info - # todo: index implicit directories in tree + path = Path(info.filename) + self.files[path] = info + while path.parent != path: + self.dirs.add(path.parent) + path = path.parent def get_info(self, path: str) -> ZipInfo | None: """ @@ -135,31 +175,11 @@ class ZipFS(FS): except: return bytes - def is_implicit_dir(self, file: str) -> bool: - """ - Check if there is an implicit directory at ``file``. - - Some zip files may not include entries for all directories if they already define entries for files or - subdirectories within. This function checks if any path that is a subdirectory of ``file`` exists. - - :returns: ``True`` if there is a directory at ``file``, else ``False``. - """ - - parent = Path(file) - for child in self.files: - if parent in child.parents: - return True - return False - def get_mode(self, file): - MODE_NONEXISTANT = 0 - MODE_FILE = 0o644 | S_IFREG - MODE_DIR = 0o755 | S_IFDIR - info = self.get_info(file) if info is None: - # if self.is_implicit_dir(file): - # return MODE_DIR + if file in self.dirs: + return MODE_DIR return MODE_NONEXISTANT if info.create_system == ZIP_CREATE_SYSTEM_UNX: @@ -171,6 +191,74 @@ class ZipFS(FS): return MODE_FILE +class PatchspecLoader(yaml.Loader): + @dataclass + class Marshall: + """ + This is a stupid solution to a stupid problem. PyYAML's load() method for some reason + doesn't accept a loader class instance but a class type. It also doesn't allow you to + pass user data through kwargs, so all of the context is marshalled through the input + stream parameter and unpacked by PatchspecLoader's __init__ method. + """ + + patch: File + input: Path + context: Context + + input: Path + context: Context + + @staticmethod + def tag_file_target(loader: PatchspecLoader, node: ScalarNode): + target_root = loader.input.parent.relative_to(loader.context.root) + return TargetFileInputSpec( + path=target_root.joinpath(Path(loader.construct_scalar(node))), + ) + + @staticmethod + def tag_file_input(loader: PatchspecLoader, node: ScalarNode): + input_root = loader.input.parent + return PatchsetFileInputSpec( + path=input_root.joinpath(Path(loader.construct_scalar(node))), + ) + + @staticmethod + def tag_patchspec(loader: PatchspecLoader, node: MappingNode): + data = loader.construct_mapping(node, deep=True) + return Target.from_spec( + loader.context, loader.input.relative_to(loader.context.root), data + ) + + def __init__(self, marshall: Marshall): + super(PatchspecLoader, self).__init__(marshall.patch.get_str()) + self.context = marshall.context + self.input = marshall.input + + self.add_constructor("!patchspec", self.tag_patchspec) + self.add_constructor("!target", self.tag_file_target) + self.add_constructor("!input", self.tag_file_input) + + @staticmethod + def load(marshall: PatchspecLoader.Marshall) -> Any: + return yaml.load(cast(Any, marshall), Loader=PatchspecLoader) + + +class PatchtreeYAMLHandler(YAMLHandler): + marshall: PatchspecLoader.Marshall + + output: Target | None = None + + def __init__(self, marshall: PatchspecLoader.Marshall): + super(PatchtreeYAMLHandler, self).__init__() + + self.marshall = marshall + + def load(self, fm, **kwargs): + # bypass fm.metadata because it is converted into a dict + self.output = PatchspecLoader.load(self.marshall) + return {} + + class Context: """Global app context / state holder.""" @@ -186,45 +274,36 @@ class Context: The ``root`` member only changes the appearance of paths. All internal logic uses the "real" paths. """ - target: Path - """Path to target.""" + header: Header + + content: list[Target] = [] + """Patch targets (content)""" fs: FS """Target file system interface.""" - output: IO - """Output stream for writing the clean patch.""" - in_place: bool """Whether to apply the changes directly to the target instead of outputting the .patch file.""" config: Config + log: Logger + """Global log instance reference""" + + is_empty: bool = False + def __init__(self, config: Config, options: Namespace): self.config = config + self.log = get_logger(self.__class__.__name__) self.root = options.root - self.target = options.target self.in_place = options.in_place - self.inputs = self.collect_inputs(options) - - self.fs = self.get_fs() - self.output = self.get_output(options) - - if self.in_place: - self.apply(True) - - def close(self): - """Finish writing the clean patch file and close it.""" - - # patch must have a trailing newline - self.output.write("\n") - self.output.flush() - if self.in_place: - self.apply(False) + self.inputs = self.collect_inputs(options) + self.content = self.collect_targets(self.inputs) - self.output.close() + self.fs = self._get_fs(options.target) + self.header = config.header(config, self) def collect_inputs(self, options: Namespace) -> list[Path]: """ @@ -253,26 +332,105 @@ class Context: inputs.add(path) return list(inputs) - def get_dir(self, dir: str) -> list[str]: - """Get a target directory's content (see :any:`FS.get_dir()`)""" - return self.fs.get_dir(dir) - - def get_content(self, file: str) -> bytes | str | None: - """Get a target file's content (see :any:`FS.get_content()`)""" - return self.fs.get_content(file) - - def get_mode(self, file: str) -> int: - """Get a target file's mode (see :any:`FS.get_mode()`)""" - return self.fs.get_mode(file) + def create_target(self, input: Path, meta_inputs: list[Path] = []) -> Target: + file = input.relative_to(self.root) + target: Target | None = None + patch = File( + content=input.read_bytes(), + mode=input.stat().st_mode, + ) - def get_fs(self) -> FS: + try: + patch.get_str() + except: + # binary files can't be patchspecs + target = Target(self, file) + + marshall = PatchspecLoader.Marshall( + patch=patch, + input=input, + context=self, + ) + + # if the input is a yaml file, try to load it + if target is None and input.suffix in self.config.patchspec_extensions: + try: + fake_marshall = marshall + fake_marshall.input = input.parent.joinpath(input.stem) + target = PatchspecLoader.load(fake_marshall) + except Exception as e: + self.log.error(f"while parsing patchspec for {input}") + raise e + if not isinstance(target, Target): + target = None + + # if the input contains frontmatter, try to load it + if target is None: + handler = PatchtreeYAMLHandler(marshall) + try: + post = frontmatter.loads(patch.get_str(), handler=handler) + if not isinstance(handler.output, Target): + raise Exception() + target = handler.output + # strip frontmatter from input content + patch.content = post.content + except: + target = None + + if target is None: + target = Target(self, file) + + target.file = target.file or str() + assert target.file is not None + + for input in ( + i.path for i in target.get_inputs() if isinstance(i, PatchsetFileInputSpec) + ): + meta_inputs.append(input) + + return target + + def collect_targets(self, inputs: list[Path]) -> list[Target]: + meta_inputs: set[Path] = set() + targets: dict[Path, Target] = {} + + for input in inputs: + meta = [] + targets[input] = self.create_target(input, meta) + meta_inputs.update(meta) + + missing = meta_inputs - set(inputs) + if len(missing) > 0: + for input in missing: + self.log.error(f"{str(input)} referenced by patchspec but not in inputs") + raise Exception("missing files") + + # files referenced as meta inputs shouldn't be treated as verbatim files + for key in meta_inputs: + if key not in targets: + continue + del targets[key] + + return sorted(targets.values(), key=lambda target: target.file) + + def get_file(self, spec: ProcessInputSpec) -> File: + if isinstance(spec, TargetFileInputSpec): + return self.fs.get_file(spec) + if isinstance(spec, LiteralInputSpec): + return File(content=spec.content, mode=MODE_FILE) + if isinstance(spec, PatchsetFileInputSpec): + return File( + content=spec.path.read_bytes(), + mode=spec.path.stat().st_mode, + ) + raise Exception(f"unable to read file: {spec}") + + def _get_fs(self, target: Path) -> FS: """ Open the selected target, taking into account the --in-place option. :returns: Target filesystem interface. """ - target = self.target - if not target.exists(): raise Exception(f"cannot open `{target}'") @@ -292,10 +450,10 @@ class Context: :returns: Output stream. """ - if self.in_place: + if options.in_place: if options.out is not None: - print("warning: --out is ignored when using --in-place", file=stderr) - return TemporaryFile("w+") + self.log.warning("--out is ignored when using --in-place") + return TextIO() if options.out is not None: if options.out == "-": @@ -312,11 +470,27 @@ class Context: :returns: Command argument vector. """ - cmd = ["git", "apply", "--allow-empty"] + cmd = ["git", "apply"] + if self.is_empty: + cmd.append("--allow-empty") if self.config.diff_context == 0: cmd.append("--unidiff-zero") return cmd + def make_patch(self) -> str: + patch = "" + for target in self.content: + self.log.info(f"writing patch for `{target.file}'") + patch += target.write() + + self.is_empty = len(patch) == 0 + + patch = self.header.write() + patch + + # patch must have a trailing newline + patch += "\n" + return patch + def apply(self, reverse: bool) -> None: """ Apply the patch in ``self.output`` and update the cache or reverse the patch in the cache. @@ -331,8 +505,7 @@ class Context: return cmd.append("--reverse") else: - self.output.seek(0) - patch = self.output.read() + patch = self.make_patch() cache.write_text(patch) cmd.append(str(cache.absolute())) diff --git a/patchtree/diff.py b/patchtree/diff.py index 4d4a998..f17fcfb 100644 --- a/patchtree/diff.py +++ b/patchtree/diff.py @@ -1,4 +1,5 @@ from __future__ import annotations +from sys import stderr from typing import TYPE_CHECKING from dataclasses import dataclass @@ -10,10 +11,10 @@ if TYPE_CHECKING: @dataclass class File: - content: str | bytes | None + content: str | bytes | None = None """The file's contents, or ``None`` if it does not exist.""" - mode: int + mode: int = 0 """The file's mode as returned by stat(3)'s ``stat.st_mode``.""" def is_binary(self) -> bool: @@ -22,6 +23,16 @@ class File: """ return isinstance(self.content, bytes) + def get_str(self) -> str: + if self.content is None: + return "" + if isinstance(self.content, bytes): + try: + self.content = self.content.decode() + except Exception: + raise Exception("expected text file instead of binary") + return self.content + def lines(self) -> list[str]: """ Get a list of lines in this file. @@ -34,8 +45,10 @@ class File: This function only works for text files. Use :any:`is_binary` to check this safely. """ - assert not isinstance(self.content, bytes) - return (self.content or "").splitlines() + return self.get_str().splitlines() + + def __repr__(self): + return f"{self.__class__.__name__}(mode={self.mode:06o}, content={repr(self.content)})" class Diff: @@ -86,12 +99,17 @@ class Diff: delta += f"new mode {b.mode:06o}\n" if a.content != b.content: - # make sure a file doesn't switch from text to binary or vice versa - assert a.is_binary() == b.is_binary() + binary = False + lines_a = [] + lines_b = [] - if not b.is_binary(): + try: lines_a = a.lines() lines_b = b.lines() + except Exception: + binary = True + + if not binary: diff = unified_diff( lines_a, lines_b, fromfile, tofile, lineterm="", n=self.config.diff_context ) diff --git a/patchtree/patch.py b/patchtree/patch.py deleted file mode 100644 index 85d056d..0000000 --- a/patchtree/patch.py +++ /dev/null @@ -1,75 +0,0 @@ -from __future__ import annotations -from typing import TYPE_CHECKING - -from pathlib import Path - -from .diff import Diff, File -from .process import Process - -if TYPE_CHECKING: - from .context import Context - from .config import Config - - -class Patch: - """A single patched file.""" - - config: Config - - patch: Path - """The patchset input location.""" - - file: str - """The name of the patched file in the target.""" - - processors: list[tuple[type[Process], Process.Args]] = [] - """A list of processors to apply to the input before diffing.""" - - def __init__(self, config: Config, patch: Path): - self.patch = patch - self.config = config - - self.processors.clear() - self.file, *proc_strs = str(patch).split(config.process_delimiter) - for proc_str in proc_strs: - proc_name, *argv = proc_str.split(",") - args = Process.Args(name=proc_name, argv=argv) - proc_cls = config.processors.get(proc_name, None) - if proc_cls is None: - raise Exception(f"unknown processor: `{proc_cls}'") - for arg in argv: - key, value, *_ = (*arg.split("=", 1), None) - args.argd[key] = value - self.processors.insert(0, (proc_cls, args)) - - def write(self, context: Context) -> None: - """ - Apply all processors, compare to the target and write the delta to :any:`Context.output`. - """ - - if context.root is not None: - self.file = str(Path(self.file).relative_to(context.root)) - - diff = Diff(self.config, self.file) - - diff.a = File( - content=context.get_content(self.file), - mode=context.get_mode(self.file), - ) - - diff.b = File( - content=None, - mode=self.patch.stat().st_mode, - ) - b_content = self.patch.read_bytes() - try: - diff.b.content = b_content.decode() - except: - diff.b.content = b_content - - for cls, args in self.processors: - processor = cls(context, args) - diff.b = processor.transform(diff.a, diff.b) - - delta = diff.compare() - context.output.write(delta) diff --git a/patchtree/patchspec.py b/patchtree/patchspec.py new file mode 100644 index 0000000..64f20a1 --- /dev/null +++ b/patchtree/patchspec.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class ProcessInputSpec: + pass + + +@dataclass +class FileInputSpec(ProcessInputSpec): + path: Path + + +@dataclass +class TargetFileInputSpec(FileInputSpec): + pass + + +@dataclass +class PatchsetFileInputSpec(FileInputSpec): + pass + + +@dataclass +class DefaultInputSpec(ProcessInputSpec): + pass + + +@dataclass +class LiteralInputSpec(ProcessInputSpec): + content: str diff --git a/patchtree/process.py b/patchtree/process.py index bd6b802..a7871a8 100644 --- a/patchtree/process.py +++ b/patchtree/process.py @@ -1,61 +1,99 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, Callable +from typing import TYPE_CHECKING, Any, Callable, Hashable, cast from tempfile import mkstemp from jinja2 import Environment from subprocess import Popen, run from pathlib import Path -from os import fdopen, chmod, unlink -from dataclasses import dataclass, field - +from enum import Enum +from shlex import split +from dataclasses import dataclass + +from .patchspec import ( + DefaultInputSpec, + FileInputSpec, + LiteralInputSpec, + ProcessInputSpec, + TargetFileInputSpec, +) from .diff import File if TYPE_CHECKING: from .context import Context + from .target import Target class Process: """ - Processor base interface. + Process base interface. """ - context: Context + target: Target """Patch file context.""" - @dataclass - class Args: - """ - Processor filename arguments. - - See :ref:`processors`. - """ - - name: str - """The name the processor was called with.""" - argv: list[str] = field(default_factory=list) - """The arguments passed to the processor""" - argd: dict[str, str | None] = field(default_factory=dict) - """The key/value arguments passed to the processor""" - - args: Args - """Arguments passed to this processor.""" + input_spec: ProcessInputSpec + target_spec: ProcessInputSpec - def __init__(self, context: Context, args: Args): - self.args = args - self.context = context + def __init__(self, target: Target): + self.target = target - def transform(self, a: File, b: File) -> File: + def transform(self, input: File) -> File: """ - Transform the input file. + Perform the transformation of this processor. - :param a: Content of file to patch. - :param b: Content of patch input in patch tree or output of previous processor. :returns: Processed file. """ raise NotImplementedError() + @staticmethod + def from_spec(target: Target, data: dict[Hashable, Any]) -> Process: + context = target.context + context.log.warning(repr(data)) + if "id" not in data: + raise Exception("missing key 'id'") + id = str(data["id"]) + del data["id"] + + config = context.config + if id not in config.processors: + raise Exception(f"no processor for id {id}") + + process_cls = config.processors[id] + if process_cls.from_spec == Process.from_spec: + raise NotImplementedError( + f"{process_cls.__name__} does not implement the `from_spec` method" + ) + process = process_cls.from_spec(target, data) + + if "input" in data: + if isinstance(data["input"], ProcessInputSpec): + process.input_spec = data["input"] + elif isinstance(data["input"], str): + process.input_spec = LiteralInputSpec(content=data["input"]) + else: + raise Exception(f"type error for key input {type(data['input'])}") + del data["input"] + + if "target" in data: + if isinstance(data["target"], ProcessInputSpec): + process.target_spec = data["target"] + elif isinstance(data["target"], str): + process.target_spec = LiteralInputSpec(content=data["target"]) + else: + raise Exception(f"type error for key target {type(data['target'])}") + del data["target"] + + assert target.file is not None + + process.input_spec = getattr(process, "input_spec", DefaultInputSpec()) + process.target_spec = getattr( + process, "target_spec", TargetFileInputSpec(path=Path(target.file)) + ) + + return process -class ProcessJinja2(Process): + +class Jinja2Process(Process): """ Jinja2 preprocessor. """ @@ -66,116 +104,146 @@ class ProcessJinja2(Process): ) def __init__(self, *args, **kwargs): - super(ProcessJinja2, self).__init__(*args, **kwargs) - - if len(self.args.argv) > 0: - raise Exception("too many arguments") + super(Jinja2Process, self).__init__(*args, **kwargs) - def transform(self, a, b): + def transform(self, input): template_vars = self.get_template_vars() - assert b.content is not None - assert not isinstance(b.content, bytes) - b.content = self.environment.from_string(b.content).render(**template_vars) - return b + input.content = self.environment.from_string(input.get_str()).render(**template_vars) + return input def get_template_vars(self) -> dict[str, Any]: """ Generate template variables. This method returns an empty dict by default, and is meant to be implemented by subclassing the - ProcessJinja2 class. + Jinja2Process class. :returns: A dict of variables defined in the template. """ return {} + @staticmethod + def from_spec(target, data): + return Jinja2Process(target) + -class ProcessCoccinelle(Process): +class CoccinelleProcess(Process): """ Coccinelle transformer. """ def __init__(self, *args, **kwargs): - super(ProcessCoccinelle, self).__init__(*args, **kwargs) + super(CoccinelleProcess, self).__init__(*args, **kwargs) - if len(self.args.argv) > 0: - raise Exception("too many arguments") + def transform(self, input): + context = self.target.context + content_input = context.get_file(self.target_spec).get_str() + content_patch = input.get_str() - def transform(self, a, b): - assert not isinstance(a.content, bytes) - assert not isinstance(b.content, bytes) - content_a = a.content or "" - content_b = b.content or "" + # empty patch -> return input as-is (coccinelle gives errors in this case) + if len(content_patch.strip()) == 0: + return input - if len(content_b.strip()) == 0: - return a + temp_input = Path(mkstemp()[1]) + temp_output = Path(mkstemp()[1]) + temp_patch = Path(mkstemp()[1]) - temp_a = Path(mkstemp()[1]) - temp_b = Path(mkstemp()[1]) - temp_sp = Path(mkstemp()[1]) - - temp_a.write_text(content_a) - temp_sp.write_text(content_b) + temp_input.write_text(content_input) + temp_patch.write_text(content_patch) cmd = ( "spatch", "--very-quiet", "--no-show-diff", "--sp-file", - str(temp_sp), - str(temp_a), + str(temp_patch), + str(temp_input), "-o", - str(temp_b), + str(temp_output), ) coccinelle = Popen(cmd) coccinelle.wait() - b.content = temp_b.read_text() + input.content = temp_output.read_text() - temp_a.unlink() - temp_b.unlink() - temp_sp.unlink() + temp_input.unlink() + temp_output.unlink() + temp_patch.unlink() - return b + return input + + @staticmethod + def from_spec(target, data): + return CoccinelleProcess(target) -class ProcessIdentity(Process): +class TouchProcess(Process): """ - Identity transformer. + Touch transformer. """ - def transform(self, a, b): - return File(content=a.content, mode=b.mode) + mode: int | None = None + def transform(self, input): + input.content = input.content or "" + input.mode = self.mode or input.mode + return input -class ProcessExec(Process): + @staticmethod + def from_spec(target, data): + process = TouchProcess(target) + + if "mode" in data: + if not isinstance(data["mode"], int): + raise TypeError("invalid type of key 'mode'") + process.mode = data["mode"] + del data["mode"] + + return process + + +class ExecProcess(Process): """ Executable transformer. """ + cmd: list[str] = [] + def __init__(self, *args, **kwargs): - super(ProcessExec, self).__init__(*args, **kwargs) + super(ExecProcess, self).__init__(*args, **kwargs) - if len(self.args.argv) > 0: - raise Exception("too many arguments") + def transform(self, input): + assert len(self.cmd) > 0 - def transform(self, a, b): - assert b.content is not None - assert not isinstance(b.content, bytes) + proc = run(self.cmd, text=True, input=input.get_str(), capture_output=True, check=True) + input.content = proc.stdout - fd, exec = mkstemp() - with fdopen(fd, "wt") as f: - f.write(b.content) - chmod(exec, 0o700) + return input - proc = run((str(exec),), text=True, input=a.content, capture_output=True, check=True) - b.content = proc.stdout + @staticmethod + def from_spec(target, data): + process = ExecProcess(target) - unlink(exec) + if "cmd" not in data: + raise Exception("missing property `cmd'") + if not isinstance(data["cmd"], str): + raise TypeError("invalid type of key `cmd'") + process.cmd = split(data["cmd"]) + del data["cmd"] + + return process - return b +# class MergeProcessorSpec(ProcessSpec): -class ProcessMerge(Process): +# def _copy(self, spec: ProcessSpec): +# super(MergeProcessorSpec, self)._copy(spec) + + +# def __repr__(self): +# return f"{self.__class__.__name__}({self.input}, strategy={self.strategy.name})" + + +class MergeProcess(Process): """ Merge transformer. """ @@ -190,27 +258,31 @@ class ProcessMerge(Process): return b - strategies: dict[str, Callable[[ProcessMerge, File, File], File]] = { + strategies: dict[str, Callable[[MergeProcess, File, File], File]] = { "ignore": merge_ignore, } - strategy: Callable[[ProcessMerge, File, File], File] + strategy: Callable[[MergeProcess, File, File], File] | None = None def __init__(self, *args, **kwargs): - super(ProcessMerge, self).__init__(*args, **kwargs) + super(MergeProcess, self).__init__(*args, **kwargs) - argv = self.args.argv - if len(argv) < 1: - raise Exception("not enough arguments") - - if len(argv) > 1: - raise Exception("too many arguments") + def transform(self, input): + context = self.target.context + a = input + b = context.get_file(self.target_spec) + assert self.strategy is not None + return self.strategy(self, a, b) - strategy = argv[0] - if strategy not in self.strategies: - raise Exception(f"unknown merge strategy: `{strategy}'") + @staticmethod + def from_spec(target, data): + process = MergeProcess(target) - self.strategy = self.strategies[strategy] + if "strategy" not in data: + raise Exception("missing property `strategy'") + if data["strategy"] not in process.strategies: + raise Exception(f"unknown strategy {repr(data['strategy'])}") + process.strategy = process.strategies[data["strategy"]] + del data["strategy"] - def transform(self, a, b): - return self.strategy(self, a, b) + return process diff --git a/patchtree/target.py b/patchtree/target.py new file mode 100644 index 0000000..859ed0f --- /dev/null +++ b/patchtree/target.py @@ -0,0 +1,80 @@ +from __future__ import annotations +from typing import TYPE_CHECKING, Any, Hashable + +from pathlib import Path + +from .diff import Diff, File +from .patchspec import ( + DefaultInputSpec, + FileInputSpec, + LiteralInputSpec, + ProcessInputSpec, + TargetFileInputSpec, +) +from .process import Process + +if TYPE_CHECKING: + from .context import Context + from .config import Config + + +class Target: + """A single patched file.""" + + context: Context + + file: str + """The name of the patched file in the target.""" + + patch = File() + + processors: list[Process] = [] + + inputs: list[ProcessInputSpec] = [] + + @staticmethod + def from_spec(context: Context, file: Path, data: dict[Hashable, Any]) -> Target: + target = Target(context, file) + + if "processors" not in data: + data["processors"] = [] + if not isinstance(data["processors"], list): + raise Exception("not a list: 'processors'") + for processor in data["processors"]: + target.processors.append(Process.from_spec(target, processor)) + + return target + + def __init__(self, context: Context, file: Path): + self.context = context + self.file = str(file) + + def get_processors(self) -> list[Process]: + return self.processors + + def get_inputs(self) -> list[ProcessInputSpec]: + return self.inputs + + def write(self) -> str: + """ + Apply all processors, compare to the target and write the delta to :any:`Context.output`. + """ + + config = self.context.config + assert self.file is not None + diff = Diff(config, self.file) + + diff.a = self.context.get_file(TargetFileInputSpec(path=Path(self.file))) + diff.b = self.patch + + processors = self.get_processors() + for processor in processors: + input = diff.b + if not isinstance(processor.input_spec, DefaultInputSpec): + input = self.context.get_file(processor.input_spec) + diff.b = processor.transform(input) + + return diff.compare() + + def __repr__(self): + return f"{self.__class__.__name__}(file={self.file})" diff --git a/todo.txt b/todo.txt new file mode 100644 index 0000000..1ff116d --- /dev/null +++ b/todo.txt @@ -0,0 +1,16 @@ +- with frontmatter + - input = own file content +- separate yaml file + - input = content of file name with .yaml extension removed (if exists in patchset) + OR target file content + +- files without .yaml extension and without frontmatter overwrite/replace any files in the + target directory (including mode) + +- yaml !input or !target to select specific inputs from patchset/target tree (!input + implicitly excludes file from being copied verbatim into target tree) + +- processor input/(target, steal name from git diff source) keys for specifying + primary/secondary processor inputs + +- USE FS FOR INPUT FS AS WELL |