aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoek Le Blansch <loek.le-blansch.pv@renesas.com>2025-11-20 08:16:02 +0100
committerLoek Le Blansch <loek.le-blansch.pv@renesas.com>2025-11-20 08:16:02 +0100
commitbe2c078c4fca5446127b924dd1dbc97c7b1636f4 (patch)
tree4579237ded65a888702839bc936243611dcb765e
parentbbe15d0a21e04c59527ab5453d8bb49c952e4a30 (diff)
very broken WIP
-rw-r--r--doc/dev/processor.rst14
-rw-r--r--patchtree/__init__.py2
-rw-r--r--patchtree/cli.py40
-rw-r--r--patchtree/config.py67
-rw-r--r--patchtree/context.py315
-rw-r--r--patchtree/diff.py32
-rw-r--r--patchtree/patch.py75
-rw-r--r--patchtree/patchspec.py34
-rw-r--r--patchtree/process.py274
-rw-r--r--patchtree/target.py80
-rw-r--r--todo.txt16
11 files changed, 642 insertions, 307 deletions
diff --git a/doc/dev/processor.rst b/doc/dev/processor.rst
index 018d216..68de828 100644
--- a/doc/dev/processor.rst
+++ b/doc/dev/processor.rst
@@ -18,7 +18,7 @@ Identity
The identity processor is used to "touch" files, change the mode of existing files, or add arbitrary identifiers to patchset source filenames by passing arbitrary arguments.
-:Class: :any:`ProcessIdentity`
+:Class: :any:`IdentityProcess`
:Identifier: ``id``
:Input: Ignored.
:Output:
@@ -37,7 +37,7 @@ The Coccinelle processor uses Coccinelle to apply patch(es) in the SmPL (Semanti
In order to use this processor, Coccinelle must be installed and ``spatch`` must be available in ``$PATH``.
-:Class: :any:`ProcessCoccinelle`
+:Class: :any:`CoccinelleProcess`
:Identifier: ``cocci``
:Input: Coccinelle's SmPL input.
:Output: The contents of the target file after being processed by Coccinelle (not the diff returned by Coccinelle).
@@ -51,7 +51,7 @@ Jinja template
The Jinja processor passes the input through the Jinja2 templating engine.
-:Class: :any:`ProcessJinja2`
+:Class: :any:`Jinja2Process`
:Identifier: ``jinja``
:Input: Jinja template code.
:Output: The input after being processed by Jinja.
@@ -59,8 +59,8 @@ The Jinja processor passes the input through the Jinja2 templating engine.
.. note::
- Template variables are generated through the :any:`get_template_vars <ProcessJinja2.get_template_vars>` method.
- This method returns an empty dict by default, and is meant to be implemented by implementing a custom class that derives from ProcessJinja2 and registering it through the :ref:`configuration file <ptconfig>`.
+ Template variables are generated through the :any:`get_template_vars <Jinja2Process.get_template_vars>` method.
+ This method returns an empty dict by default, and is meant to be implemented by implementing a custom class that derives from Jinja2Process and registering it through the :ref:`configuration file <ptconfig>`.
.. _process_exe:
@@ -70,7 +70,7 @@ Executable
The executable processor runs the input as an executable, passes the target file to its standard input, and returns its standard output.
-:Class: :any:`ProcessExec`
+:Class: :any:`ExecProcess`
:Identifier: ``exec``
:Input:
Executable script.
@@ -90,7 +90,7 @@ Merge
The merge processor merges the input with the target file, such that changes are combined with the target instead of replacing the target.
-:Class: :any:`ProcessMerge`
+:Class: :any:`MergeProcess`
:Identifier: ``merge``
:Input: Content to merge.
:Output: Merged changes.
diff --git a/patchtree/__init__.py b/patchtree/__init__.py
index 2e304a6..3950ed5 100644
--- a/patchtree/__init__.py
+++ b/patchtree/__init__.py
@@ -1,4 +1,4 @@
from .config import Config, Header
from .diff import Diff, File
from .context import Context
-from .process import ProcessCoccinelle, ProcessJinja2, ProcessIdentity, ProcessExec, ProcessMerge
+from .process import CoccinelleProcess, Jinja2Process, TouchProcess, ExecProcess, MergeProcess
diff --git a/patchtree/cli.py b/patchtree/cli.py
index 1c38d30..6e24bb4 100644
--- a/patchtree/cli.py
+++ b/patchtree/cli.py
@@ -1,7 +1,10 @@
from dataclasses import fields
from sys import stderr
from pathlib import Path
-from argparse import ArgumentTypeError
+from argparse import ArgumentTypeError, Namespace
+from tempfile import TemporaryFile
+from logging import basicConfig as log_config, addLevelName as log_add_level_name
+from logging import DEBUG, INFO, WARNING, ERROR, CRITICAL
from .context import Context
from .config import Config
@@ -62,8 +65,8 @@ def parse_arguments(config: Config) -> Context:
type=int,
)
parser.add_argument(
- "-s",
- "--shebang",
+ "-S",
+ "--no-shebang",
help="output shebang in resulting patch",
action="store_true",
)
@@ -100,8 +103,8 @@ def parse_arguments(config: Config) -> Context:
if options.context is not None:
config.diff_context = options.context
- if options.shebang:
- config.output_shebang = True
+ if options.no_shebang:
+ config.no_shebang = True
if len(options.patch) == 0:
options.patch = config.default_patch_sources
@@ -109,25 +112,28 @@ def parse_arguments(config: Config) -> Context:
try:
return config.context(config, options)
except Exception as e:
- parser.error(str(e))
+ raise e
+ # parser.error(str(e))
def main():
+ log_add_level_name(DEBUG, "DBG")
+ log_add_level_name(INFO, "INF")
+ log_add_level_name(WARNING, "WRN")
+ log_add_level_name(ERROR, "ERR")
+ log_add_level_name(CRITICAL, "CRT")
+ log_config(
+ stream=stderr,
+ level=INFO,
+ format="[%(levelname)s] %(message)s",
+ )
+
config = load_config()
context = parse_arguments(config)
- if len(context.inputs) == 0:
- print("no files to patch!", file=stderr)
- return 0
-
- config.header(config, context)
-
- for file in context.inputs:
- patch = config.patch(config, file)
- patch.write(context)
-
- context.close()
+ patch = context.make_patch()
+ print(patch, end="")
return 0
diff --git a/patchtree/config.py b/patchtree/config.py
index a4d0912..78f1fad 100644
--- a/patchtree/config.py
+++ b/patchtree/config.py
@@ -6,16 +6,16 @@ from importlib import metadata
from pathlib import Path
from .context import Context
-from .patch import Patch
+from .target import Target
from .process import *
from .diff import *
DEFAULT_PROCESSORS: dict[str, type[Process]] = {
- "id": ProcessIdentity,
- "cocci": ProcessCoccinelle,
- "jinja": ProcessJinja2,
- "exec": ProcessExec,
- "merge": ProcessMerge,
+ "touch": TouchProcess,
+ "cocci": CoccinelleProcess,
+ "jinja": Jinja2Process,
+ "exec": ExecProcess,
+ "merge": MergeProcess,
}
@@ -25,7 +25,7 @@ class Header:
The header is formatted as
- * shebang (optional)
+ * shebang
* patchtree version info
* extra version info (empty by default)
* license (empty by default)
@@ -44,32 +44,37 @@ class Header:
self.config = config
self.context = context
- self.write_shebang()
- self.write_version()
- self.write_version_extra()
- self.write_license()
-
- def write_shebang(self):
+ def write(self) -> str:
+ return "".join(
+ (
+ self.write_shebang(),
+ self.write_version(),
+ self.write_version_extra(),
+ self.write_license(),
+ )
+ )
+
+ def write_shebang(self) -> str:
"""
- Write a shebang line to apply the output patch if the --shebang option was passed.
+ Write a shebang line to apply the output patch unless the --no-shebang option was passed.
"""
- if not self.config.output_shebang:
- return
+ if self.config.no_shebang:
+ return ""
cmd = ["/usr/bin/env", "-S", *self.context.get_apply_cmd()]
cmdline = " ".join(cmd)
- self.context.output.write(f"#!{cmdline}\n")
+ return f"#!{cmdline}\n"
- def write_version(self):
+ def write_version(self) -> str:
"""
Write the patchtree name and version number.
"""
version = metadata.version("patchtree")
- self.context.output.write(f"{self.name} output (version {version})\n")
+ return f"{self.name} output (version {version})\n"
- def write_version_extra(self):
+ def write_version_extra(self) -> str:
"""
Write extra version information (empty).
@@ -77,16 +82,16 @@ class Header:
patchsets.
"""
- pass
+ return ""
- def write_license(self):
+ def write_license(self) -> str:
"""
Write a license if it is defined.
"""
if self.license is None:
- return
- self.context.output.write(f"{self.license}\n")
+ return ""
+ return f"{self.license}\n"
@dataclass
@@ -100,8 +105,8 @@ class Config:
context: type[Context] = Context
"""Context class type. Override this to add custom context variables."""
- patch: type[Patch] = Patch
- """Patch class type."""
+ target: type[Target] = Target
+ """Target class type."""
argument_parser: type[ArgumentParser] = ArgumentParser
"""ArgumentParser class type. Override this to add custom arguments."""
@@ -129,8 +134,8 @@ class Config:
diff_context: int = 3
"""Lines of context to include in the diffs."""
- output_shebang: bool = False
- """Whether to output a shebang line with the ``git patch`` command to apply the patch."""
+ no_shebang: bool = False
+ """Whether to suppress the shebang line with the ``git patch`` command to apply the patch."""
default_patch_sources: list[Path] = field(default_factory=list)
"""List of default sources."""
@@ -138,5 +143,11 @@ class Config:
default_root: str | None = None
"""Default value of the ``-C``/``--root`` argument."""
+ patchspec_extensions: tuple[str, ...] = (
+ ".yaml",
+ ".yml",
+ )
+ """File extensions considered for matching standalone patch specifications."""
+
def __post_init__(self):
self.processors = {**DEFAULT_PROCESSORS, **self.processors}
diff --git a/patchtree/context.py b/patchtree/context.py
index c2cf33b..7a66b1a 100644
--- a/patchtree/context.py
+++ b/patchtree/context.py
@@ -1,21 +1,44 @@
from __future__ import annotations
-from typing import TYPE_CHECKING, IO, cast
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, IO, Any, TextIO, cast
+from frontmatter.default_handlers import YAMLHandler
+import yaml
+import frontmatter
+
+from sys import stderr
from argparse import Namespace
from pathlib import Path
from zipfile import ZipInfo, is_zipfile
-from tempfile import TemporaryFile
from os import path
-from sys import stdout, stderr
+from sys import stdout
from subprocess import run
from zipfile import ZipFile
from stat import S_IFDIR, S_IFREG
+from logging import Logger, getLogger as get_logger
+
+from yaml.nodes import MappingNode, ScalarNode
+
+from patchtree.diff import File
+
+from .patchspec import (
+ FileInputSpec,
+ LiteralInputSpec,
+ PatchsetFileInputSpec,
+ ProcessInputSpec,
+ TargetFileInputSpec,
+)
+from .target import Target
if TYPE_CHECKING:
- from .config import Config
+ from .config import Config, Header
ZIP_CREATE_SYSTEM_UNX = 3
+MODE_NONEXISTANT = 0
+MODE_FILE = 0o644 | S_IFREG
+MODE_DIR = 0o755 | S_IFDIR
+
class FS:
"""Target filesystem interface."""
@@ -34,6 +57,12 @@ class FS:
raise NotImplementedError()
+ def get_file(self, file_spec: TargetFileInputSpec) -> File:
+ return File(
+ content=self.get_content(str(file_spec.path)),
+ mode=self.get_mode(str(file_spec.path)),
+ )
+
def get_content(self, file: str) -> bytes | str | None:
"""
Get the content of a file relative to the target.
@@ -93,12 +122,23 @@ class ZipFS(FS):
files: dict[Path, ZipInfo] = {}
"""Map of path -> ZipInfo for all files in the archive."""
+ dirs: set[Path] = set()
+ """
+ List of directories in this archive.
+
+ Some zip files may not include entries for all directories if they already define entries
+ for files or subdirectories within. This set keeps all known directories.
+ """
+
def __init__(self, target):
super(ZipFS, self).__init__(target)
self.zip = ZipFile(str(target))
for info in self.zip.infolist():
- self.files[Path(info.filename)] = info
- # todo: index implicit directories in tree
+ path = Path(info.filename)
+ self.files[path] = info
+ while path.parent != path:
+ self.dirs.add(path.parent)
+ path = path.parent
def get_info(self, path: str) -> ZipInfo | None:
"""
@@ -135,31 +175,11 @@ class ZipFS(FS):
except:
return bytes
- def is_implicit_dir(self, file: str) -> bool:
- """
- Check if there is an implicit directory at ``file``.
-
- Some zip files may not include entries for all directories if they already define entries for files or
- subdirectories within. This function checks if any path that is a subdirectory of ``file`` exists.
-
- :returns: ``True`` if there is a directory at ``file``, else ``False``.
- """
-
- parent = Path(file)
- for child in self.files:
- if parent in child.parents:
- return True
- return False
-
def get_mode(self, file):
- MODE_NONEXISTANT = 0
- MODE_FILE = 0o644 | S_IFREG
- MODE_DIR = 0o755 | S_IFDIR
-
info = self.get_info(file)
if info is None:
- # if self.is_implicit_dir(file):
- # return MODE_DIR
+ if file in self.dirs:
+ return MODE_DIR
return MODE_NONEXISTANT
if info.create_system == ZIP_CREATE_SYSTEM_UNX:
@@ -171,6 +191,74 @@ class ZipFS(FS):
return MODE_FILE
+class PatchspecLoader(yaml.Loader):
+ @dataclass
+ class Marshall:
+ """
+ This is a stupid solution to a stupid problem. PyYAML's load() method for some reason
+ doesn't accept a loader class instance but a class type. It also doesn't allow you to
+ pass user data through kwargs, so all of the context is marshalled through the input
+ stream parameter and unpacked by PatchspecLoader's __init__ method.
+ """
+
+ patch: File
+ input: Path
+ context: Context
+
+ input: Path
+ context: Context
+
+ @staticmethod
+ def tag_file_target(loader: PatchspecLoader, node: ScalarNode):
+ target_root = loader.input.parent.relative_to(loader.context.root)
+ return TargetFileInputSpec(
+ path=target_root.joinpath(Path(loader.construct_scalar(node))),
+ )
+
+ @staticmethod
+ def tag_file_input(loader: PatchspecLoader, node: ScalarNode):
+ input_root = loader.input.parent
+ return PatchsetFileInputSpec(
+ path=input_root.joinpath(Path(loader.construct_scalar(node))),
+ )
+
+ @staticmethod
+ def tag_patchspec(loader: PatchspecLoader, node: MappingNode):
+ data = loader.construct_mapping(node, deep=True)
+ return Target.from_spec(
+ loader.context, loader.input.relative_to(loader.context.root), data
+ )
+
+ def __init__(self, marshall: Marshall):
+ super(PatchspecLoader, self).__init__(marshall.patch.get_str())
+ self.context = marshall.context
+ self.input = marshall.input
+
+ self.add_constructor("!patchspec", self.tag_patchspec)
+ self.add_constructor("!target", self.tag_file_target)
+ self.add_constructor("!input", self.tag_file_input)
+
+ @staticmethod
+ def load(marshall: PatchspecLoader.Marshall) -> Any:
+ return yaml.load(cast(Any, marshall), Loader=PatchspecLoader)
+
+
+class PatchtreeYAMLHandler(YAMLHandler):
+ marshall: PatchspecLoader.Marshall
+
+ output: Target | None = None
+
+ def __init__(self, marshall: PatchspecLoader.Marshall):
+ super(PatchtreeYAMLHandler, self).__init__()
+
+ self.marshall = marshall
+
+ def load(self, fm, **kwargs):
+ # bypass fm.metadata because it is converted into a dict
+ self.output = PatchspecLoader.load(self.marshall)
+ return {}
+
+
class Context:
"""Global app context / state holder."""
@@ -186,45 +274,36 @@ class Context:
The ``root`` member only changes the appearance of paths. All internal logic uses the "real" paths.
"""
- target: Path
- """Path to target."""
+ header: Header
+
+ content: list[Target] = []
+ """Patch targets (content)"""
fs: FS
"""Target file system interface."""
- output: IO
- """Output stream for writing the clean patch."""
-
in_place: bool
"""Whether to apply the changes directly to the target instead of outputting the .patch file."""
config: Config
+ log: Logger
+ """Global log instance reference"""
+
+ is_empty: bool = False
+
def __init__(self, config: Config, options: Namespace):
self.config = config
+ self.log = get_logger(self.__class__.__name__)
self.root = options.root
- self.target = options.target
self.in_place = options.in_place
- self.inputs = self.collect_inputs(options)
-
- self.fs = self.get_fs()
- self.output = self.get_output(options)
-
- if self.in_place:
- self.apply(True)
-
- def close(self):
- """Finish writing the clean patch file and close it."""
-
- # patch must have a trailing newline
- self.output.write("\n")
- self.output.flush()
- if self.in_place:
- self.apply(False)
+ self.inputs = self.collect_inputs(options)
+ self.content = self.collect_targets(self.inputs)
- self.output.close()
+ self.fs = self._get_fs(options.target)
+ self.header = config.header(config, self)
def collect_inputs(self, options: Namespace) -> list[Path]:
"""
@@ -253,26 +332,105 @@ class Context:
inputs.add(path)
return list(inputs)
- def get_dir(self, dir: str) -> list[str]:
- """Get a target directory's content (see :any:`FS.get_dir()`)"""
- return self.fs.get_dir(dir)
-
- def get_content(self, file: str) -> bytes | str | None:
- """Get a target file's content (see :any:`FS.get_content()`)"""
- return self.fs.get_content(file)
-
- def get_mode(self, file: str) -> int:
- """Get a target file's mode (see :any:`FS.get_mode()`)"""
- return self.fs.get_mode(file)
+ def create_target(self, input: Path, meta_inputs: list[Path] = []) -> Target:
+ file = input.relative_to(self.root)
+ target: Target | None = None
+ patch = File(
+ content=input.read_bytes(),
+ mode=input.stat().st_mode,
+ )
- def get_fs(self) -> FS:
+ try:
+ patch.get_str()
+ except:
+ # binary files can't be patchspecs
+ target = Target(self, file)
+
+ marshall = PatchspecLoader.Marshall(
+ patch=patch,
+ input=input,
+ context=self,
+ )
+
+ # if the input is a yaml file, try to load it
+ if target is None and input.suffix in self.config.patchspec_extensions:
+ try:
+ fake_marshall = marshall
+ fake_marshall.input = input.parent.joinpath(input.stem)
+ target = PatchspecLoader.load(fake_marshall)
+ except Exception as e:
+ self.log.error(f"while parsing patchspec for {input}")
+ raise e
+ if not isinstance(target, Target):
+ target = None
+
+ # if the input contains frontmatter, try to load it
+ if target is None:
+ handler = PatchtreeYAMLHandler(marshall)
+ try:
+ post = frontmatter.loads(patch.get_str(), handler=handler)
+ if not isinstance(handler.output, Target):
+ raise Exception()
+ target = handler.output
+ # strip frontmatter from input content
+ patch.content = post.content
+ except:
+ target = None
+
+ if target is None:
+ target = Target(self, file)
+
+ target.file = target.file or str()
+ assert target.file is not None
+
+ for input in (
+ i.path for i in target.get_inputs() if isinstance(i, PatchsetFileInputSpec)
+ ):
+ meta_inputs.append(input)
+
+ return target
+
+ def collect_targets(self, inputs: list[Path]) -> list[Target]:
+ meta_inputs: set[Path] = set()
+ targets: dict[Path, Target] = {}
+
+ for input in inputs:
+ meta = []
+ targets[input] = self.create_target(input, meta)
+ meta_inputs.update(meta)
+
+ missing = meta_inputs - set(inputs)
+ if len(missing) > 0:
+ for input in missing:
+ self.log.error(f"{str(input)} referenced by patchspec but not in inputs")
+ raise Exception("missing files")
+
+ # files referenced as meta inputs shouldn't be treated as verbatim files
+ for key in meta_inputs:
+ if key not in targets:
+ continue
+ del targets[key]
+
+ return sorted(targets.values(), key=lambda target: target.file)
+
+ def get_file(self, spec: ProcessInputSpec) -> File:
+ if isinstance(spec, TargetFileInputSpec):
+ return self.fs.get_file(spec)
+ if isinstance(spec, LiteralInputSpec):
+ return File(content=spec.content, mode=MODE_FILE)
+ if isinstance(spec, PatchsetFileInputSpec):
+ return File(
+ content=spec.path.read_bytes(),
+ mode=spec.path.stat().st_mode,
+ )
+ raise Exception(f"unable to read file: {spec}")
+
+ def _get_fs(self, target: Path) -> FS:
"""
Open the selected target, taking into account the --in-place option.
:returns: Target filesystem interface.
"""
- target = self.target
-
if not target.exists():
raise Exception(f"cannot open `{target}'")
@@ -292,10 +450,10 @@ class Context:
:returns: Output stream.
"""
- if self.in_place:
+ if options.in_place:
if options.out is not None:
- print("warning: --out is ignored when using --in-place", file=stderr)
- return TemporaryFile("w+")
+ self.log.warning("--out is ignored when using --in-place")
+ return TextIO()
if options.out is not None:
if options.out == "-":
@@ -312,11 +470,27 @@ class Context:
:returns: Command argument vector.
"""
- cmd = ["git", "apply", "--allow-empty"]
+ cmd = ["git", "apply"]
+ if self.is_empty:
+ cmd.append("--allow-empty")
if self.config.diff_context == 0:
cmd.append("--unidiff-zero")
return cmd
+ def make_patch(self) -> str:
+ patch = ""
+ for target in self.content:
+ self.log.info(f"writing patch for `{target.file}'")
+ patch += target.write()
+
+ self.is_empty = len(patch) == 0
+
+ patch = self.header.write() + patch
+
+ # patch must have a trailing newline
+ patch += "\n"
+ return patch
+
def apply(self, reverse: bool) -> None:
"""
Apply the patch in ``self.output`` and update the cache or reverse the patch in the cache.
@@ -331,8 +505,7 @@ class Context:
return
cmd.append("--reverse")
else:
- self.output.seek(0)
- patch = self.output.read()
+ patch = self.make_patch()
cache.write_text(patch)
cmd.append(str(cache.absolute()))
diff --git a/patchtree/diff.py b/patchtree/diff.py
index 4d4a998..f17fcfb 100644
--- a/patchtree/diff.py
+++ b/patchtree/diff.py
@@ -1,4 +1,5 @@
from __future__ import annotations
+from sys import stderr
from typing import TYPE_CHECKING
from dataclasses import dataclass
@@ -10,10 +11,10 @@ if TYPE_CHECKING:
@dataclass
class File:
- content: str | bytes | None
+ content: str | bytes | None = None
"""The file's contents, or ``None`` if it does not exist."""
- mode: int
+ mode: int = 0
"""The file's mode as returned by stat(3)'s ``stat.st_mode``."""
def is_binary(self) -> bool:
@@ -22,6 +23,16 @@ class File:
"""
return isinstance(self.content, bytes)
+ def get_str(self) -> str:
+ if self.content is None:
+ return ""
+ if isinstance(self.content, bytes):
+ try:
+ self.content = self.content.decode()
+ except Exception:
+ raise Exception("expected text file instead of binary")
+ return self.content
+
def lines(self) -> list[str]:
"""
Get a list of lines in this file.
@@ -34,8 +45,10 @@ class File:
This function only works for text files. Use :any:`is_binary` to check this safely.
"""
- assert not isinstance(self.content, bytes)
- return (self.content or "").splitlines()
+ return self.get_str().splitlines()
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}(mode={self.mode:06o}, content={repr(self.content)})"
class Diff:
@@ -86,12 +99,17 @@ class Diff:
delta += f"new mode {b.mode:06o}\n"
if a.content != b.content:
- # make sure a file doesn't switch from text to binary or vice versa
- assert a.is_binary() == b.is_binary()
+ binary = False
+ lines_a = []
+ lines_b = []
- if not b.is_binary():
+ try:
lines_a = a.lines()
lines_b = b.lines()
+ except Exception:
+ binary = True
+
+ if not binary:
diff = unified_diff(
lines_a, lines_b, fromfile, tofile, lineterm="", n=self.config.diff_context
)
diff --git a/patchtree/patch.py b/patchtree/patch.py
deleted file mode 100644
index 85d056d..0000000
--- a/patchtree/patch.py
+++ /dev/null
@@ -1,75 +0,0 @@
-from __future__ import annotations
-from typing import TYPE_CHECKING
-
-from pathlib import Path
-
-from .diff import Diff, File
-from .process import Process
-
-if TYPE_CHECKING:
- from .context import Context
- from .config import Config
-
-
-class Patch:
- """A single patched file."""
-
- config: Config
-
- patch: Path
- """The patchset input location."""
-
- file: str
- """The name of the patched file in the target."""
-
- processors: list[tuple[type[Process], Process.Args]] = []
- """A list of processors to apply to the input before diffing."""
-
- def __init__(self, config: Config, patch: Path):
- self.patch = patch
- self.config = config
-
- self.processors.clear()
- self.file, *proc_strs = str(patch).split(config.process_delimiter)
- for proc_str in proc_strs:
- proc_name, *argv = proc_str.split(",")
- args = Process.Args(name=proc_name, argv=argv)
- proc_cls = config.processors.get(proc_name, None)
- if proc_cls is None:
- raise Exception(f"unknown processor: `{proc_cls}'")
- for arg in argv:
- key, value, *_ = (*arg.split("=", 1), None)
- args.argd[key] = value
- self.processors.insert(0, (proc_cls, args))
-
- def write(self, context: Context) -> None:
- """
- Apply all processors, compare to the target and write the delta to :any:`Context.output`.
- """
-
- if context.root is not None:
- self.file = str(Path(self.file).relative_to(context.root))
-
- diff = Diff(self.config, self.file)
-
- diff.a = File(
- content=context.get_content(self.file),
- mode=context.get_mode(self.file),
- )
-
- diff.b = File(
- content=None,
- mode=self.patch.stat().st_mode,
- )
- b_content = self.patch.read_bytes()
- try:
- diff.b.content = b_content.decode()
- except:
- diff.b.content = b_content
-
- for cls, args in self.processors:
- processor = cls(context, args)
- diff.b = processor.transform(diff.a, diff.b)
-
- delta = diff.compare()
- context.output.write(delta)
diff --git a/patchtree/patchspec.py b/patchtree/patchspec.py
new file mode 100644
index 0000000..64f20a1
--- /dev/null
+++ b/patchtree/patchspec.py
@@ -0,0 +1,34 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from pathlib import Path
+
+
+@dataclass
+class ProcessInputSpec:
+ pass
+
+
+@dataclass
+class FileInputSpec(ProcessInputSpec):
+ path: Path
+
+
+@dataclass
+class TargetFileInputSpec(FileInputSpec):
+ pass
+
+
+@dataclass
+class PatchsetFileInputSpec(FileInputSpec):
+ pass
+
+
+@dataclass
+class DefaultInputSpec(ProcessInputSpec):
+ pass
+
+
+@dataclass
+class LiteralInputSpec(ProcessInputSpec):
+ content: str
diff --git a/patchtree/process.py b/patchtree/process.py
index bd6b802..a7871a8 100644
--- a/patchtree/process.py
+++ b/patchtree/process.py
@@ -1,61 +1,99 @@
from __future__ import annotations
-from typing import TYPE_CHECKING, Any, Callable
+from typing import TYPE_CHECKING, Any, Callable, Hashable, cast
from tempfile import mkstemp
from jinja2 import Environment
from subprocess import Popen, run
from pathlib import Path
-from os import fdopen, chmod, unlink
-from dataclasses import dataclass, field
-
+from enum import Enum
+from shlex import split
+from dataclasses import dataclass
+
+from .patchspec import (
+ DefaultInputSpec,
+ FileInputSpec,
+ LiteralInputSpec,
+ ProcessInputSpec,
+ TargetFileInputSpec,
+)
from .diff import File
if TYPE_CHECKING:
from .context import Context
+ from .target import Target
class Process:
"""
- Processor base interface.
+ Process base interface.
"""
- context: Context
+ target: Target
"""Patch file context."""
- @dataclass
- class Args:
- """
- Processor filename arguments.
-
- See :ref:`processors`.
- """
-
- name: str
- """The name the processor was called with."""
- argv: list[str] = field(default_factory=list)
- """The arguments passed to the processor"""
- argd: dict[str, str | None] = field(default_factory=dict)
- """The key/value arguments passed to the processor"""
-
- args: Args
- """Arguments passed to this processor."""
+ input_spec: ProcessInputSpec
+ target_spec: ProcessInputSpec
- def __init__(self, context: Context, args: Args):
- self.args = args
- self.context = context
+ def __init__(self, target: Target):
+ self.target = target
- def transform(self, a: File, b: File) -> File:
+ def transform(self, input: File) -> File:
"""
- Transform the input file.
+ Perform the transformation of this processor.
- :param a: Content of file to patch.
- :param b: Content of patch input in patch tree or output of previous processor.
:returns: Processed file.
"""
raise NotImplementedError()
+ @staticmethod
+ def from_spec(target: Target, data: dict[Hashable, Any]) -> Process:
+ context = target.context
+ context.log.warning(repr(data))
+ if "id" not in data:
+ raise Exception("missing key 'id'")
+ id = str(data["id"])
+ del data["id"]
+
+ config = context.config
+ if id not in config.processors:
+ raise Exception(f"no processor for id {id}")
+
+ process_cls = config.processors[id]
+ if process_cls.from_spec == Process.from_spec:
+ raise NotImplementedError(
+ f"{process_cls.__name__} does not implement the `from_spec` method"
+ )
+ process = process_cls.from_spec(target, data)
+
+ if "input" in data:
+ if isinstance(data["input"], ProcessInputSpec):
+ process.input_spec = data["input"]
+ elif isinstance(data["input"], str):
+ process.input_spec = LiteralInputSpec(content=data["input"])
+ else:
+ raise Exception(f"type error for key input {type(data['input'])}")
+ del data["input"]
+
+ if "target" in data:
+ if isinstance(data["target"], ProcessInputSpec):
+ process.target_spec = data["target"]
+ elif isinstance(data["target"], str):
+ process.target_spec = LiteralInputSpec(content=data["target"])
+ else:
+ raise Exception(f"type error for key target {type(data['target'])}")
+ del data["target"]
+
+ assert target.file is not None
+
+ process.input_spec = getattr(process, "input_spec", DefaultInputSpec())
+ process.target_spec = getattr(
+ process, "target_spec", TargetFileInputSpec(path=Path(target.file))
+ )
+
+ return process
-class ProcessJinja2(Process):
+
+class Jinja2Process(Process):
"""
Jinja2 preprocessor.
"""
@@ -66,116 +104,146 @@ class ProcessJinja2(Process):
)
def __init__(self, *args, **kwargs):
- super(ProcessJinja2, self).__init__(*args, **kwargs)
-
- if len(self.args.argv) > 0:
- raise Exception("too many arguments")
+ super(Jinja2Process, self).__init__(*args, **kwargs)
- def transform(self, a, b):
+ def transform(self, input):
template_vars = self.get_template_vars()
- assert b.content is not None
- assert not isinstance(b.content, bytes)
- b.content = self.environment.from_string(b.content).render(**template_vars)
- return b
+ input.content = self.environment.from_string(input.get_str()).render(**template_vars)
+ return input
def get_template_vars(self) -> dict[str, Any]:
"""
Generate template variables.
This method returns an empty dict by default, and is meant to be implemented by subclassing the
- ProcessJinja2 class.
+ Jinja2Process class.
:returns: A dict of variables defined in the template.
"""
return {}
+ @staticmethod
+ def from_spec(target, data):
+ return Jinja2Process(target)
+
-class ProcessCoccinelle(Process):
+class CoccinelleProcess(Process):
"""
Coccinelle transformer.
"""
def __init__(self, *args, **kwargs):
- super(ProcessCoccinelle, self).__init__(*args, **kwargs)
+ super(CoccinelleProcess, self).__init__(*args, **kwargs)
- if len(self.args.argv) > 0:
- raise Exception("too many arguments")
+ def transform(self, input):
+ context = self.target.context
+ content_input = context.get_file(self.target_spec).get_str()
+ content_patch = input.get_str()
- def transform(self, a, b):
- assert not isinstance(a.content, bytes)
- assert not isinstance(b.content, bytes)
- content_a = a.content or ""
- content_b = b.content or ""
+ # empty patch -> return input as-is (coccinelle gives errors in this case)
+ if len(content_patch.strip()) == 0:
+ return input
- if len(content_b.strip()) == 0:
- return a
+ temp_input = Path(mkstemp()[1])
+ temp_output = Path(mkstemp()[1])
+ temp_patch = Path(mkstemp()[1])
- temp_a = Path(mkstemp()[1])
- temp_b = Path(mkstemp()[1])
- temp_sp = Path(mkstemp()[1])
-
- temp_a.write_text(content_a)
- temp_sp.write_text(content_b)
+ temp_input.write_text(content_input)
+ temp_patch.write_text(content_patch)
cmd = (
"spatch",
"--very-quiet",
"--no-show-diff",
"--sp-file",
- str(temp_sp),
- str(temp_a),
+ str(temp_patch),
+ str(temp_input),
"-o",
- str(temp_b),
+ str(temp_output),
)
coccinelle = Popen(cmd)
coccinelle.wait()
- b.content = temp_b.read_text()
+ input.content = temp_output.read_text()
- temp_a.unlink()
- temp_b.unlink()
- temp_sp.unlink()
+ temp_input.unlink()
+ temp_output.unlink()
+ temp_patch.unlink()
- return b
+ return input
+
+ @staticmethod
+ def from_spec(target, data):
+ return CoccinelleProcess(target)
-class ProcessIdentity(Process):
+class TouchProcess(Process):
"""
- Identity transformer.
+ Touch transformer.
"""
- def transform(self, a, b):
- return File(content=a.content, mode=b.mode)
+ mode: int | None = None
+ def transform(self, input):
+ input.content = input.content or ""
+ input.mode = self.mode or input.mode
+ return input
-class ProcessExec(Process):
+ @staticmethod
+ def from_spec(target, data):
+ process = TouchProcess(target)
+
+ if "mode" in data:
+ if not isinstance(data["mode"], int):
+ raise TypeError("invalid type of key 'mode'")
+ process.mode = data["mode"]
+ del data["mode"]
+
+ return process
+
+
+class ExecProcess(Process):
"""
Executable transformer.
"""
+ cmd: list[str] = []
+
def __init__(self, *args, **kwargs):
- super(ProcessExec, self).__init__(*args, **kwargs)
+ super(ExecProcess, self).__init__(*args, **kwargs)
- if len(self.args.argv) > 0:
- raise Exception("too many arguments")
+ def transform(self, input):
+ assert len(self.cmd) > 0
- def transform(self, a, b):
- assert b.content is not None
- assert not isinstance(b.content, bytes)
+ proc = run(self.cmd, text=True, input=input.get_str(), capture_output=True, check=True)
+ input.content = proc.stdout
- fd, exec = mkstemp()
- with fdopen(fd, "wt") as f:
- f.write(b.content)
- chmod(exec, 0o700)
+ return input
- proc = run((str(exec),), text=True, input=a.content, capture_output=True, check=True)
- b.content = proc.stdout
+ @staticmethod
+ def from_spec(target, data):
+ process = ExecProcess(target)
- unlink(exec)
+ if "cmd" not in data:
+ raise Exception("missing property `cmd'")
+ if not isinstance(data["cmd"], str):
+ raise TypeError("invalid type of key `cmd'")
+ process.cmd = split(data["cmd"])
+ del data["cmd"]
+
+ return process
- return b
+# class MergeProcessorSpec(ProcessSpec):
-class ProcessMerge(Process):
+# def _copy(self, spec: ProcessSpec):
+# super(MergeProcessorSpec, self)._copy(spec)
+
+
+# def __repr__(self):
+# return f"{self.__class__.__name__}({self.input}, strategy={self.strategy.name})"
+
+
+class MergeProcess(Process):
"""
Merge transformer.
"""
@@ -190,27 +258,31 @@ class ProcessMerge(Process):
return b
- strategies: dict[str, Callable[[ProcessMerge, File, File], File]] = {
+ strategies: dict[str, Callable[[MergeProcess, File, File], File]] = {
"ignore": merge_ignore,
}
- strategy: Callable[[ProcessMerge, File, File], File]
+ strategy: Callable[[MergeProcess, File, File], File] | None = None
def __init__(self, *args, **kwargs):
- super(ProcessMerge, self).__init__(*args, **kwargs)
+ super(MergeProcess, self).__init__(*args, **kwargs)
- argv = self.args.argv
- if len(argv) < 1:
- raise Exception("not enough arguments")
-
- if len(argv) > 1:
- raise Exception("too many arguments")
+ def transform(self, input):
+ context = self.target.context
+ a = input
+ b = context.get_file(self.target_spec)
+ assert self.strategy is not None
+ return self.strategy(self, a, b)
- strategy = argv[0]
- if strategy not in self.strategies:
- raise Exception(f"unknown merge strategy: `{strategy}'")
+ @staticmethod
+ def from_spec(target, data):
+ process = MergeProcess(target)
- self.strategy = self.strategies[strategy]
+ if "strategy" not in data:
+ raise Exception("missing property `strategy'")
+ if data["strategy"] not in process.strategies:
+ raise Exception(f"unknown strategy {repr(data['strategy'])}")
+ process.strategy = process.strategies[data["strategy"]]
+ del data["strategy"]
- def transform(self, a, b):
- return self.strategy(self, a, b)
+ return process
diff --git a/patchtree/target.py b/patchtree/target.py
new file mode 100644
index 0000000..859ed0f
--- /dev/null
+++ b/patchtree/target.py
@@ -0,0 +1,80 @@
+from __future__ import annotations
+from typing import TYPE_CHECKING, Any, Hashable
+
+from pathlib import Path
+
+from .diff import Diff, File
+from .patchspec import (
+ DefaultInputSpec,
+ FileInputSpec,
+ LiteralInputSpec,
+ ProcessInputSpec,
+ TargetFileInputSpec,
+)
+from .process import Process
+
+if TYPE_CHECKING:
+ from .context import Context
+ from .config import Config
+
+
+class Target:
+ """A single patched file."""
+
+ context: Context
+
+ file: str
+ """The name of the patched file in the target."""
+
+ patch = File()
+
+ processors: list[Process] = []
+
+ inputs: list[ProcessInputSpec] = []
+
+ @staticmethod
+ def from_spec(context: Context, file: Path, data: dict[Hashable, Any]) -> Target:
+ target = Target(context, file)
+
+ if "processors" not in data:
+ data["processors"] = []
+ if not isinstance(data["processors"], list):
+ raise Exception("not a list: 'processors'")
+ for processor in data["processors"]:
+ target.processors.append(Process.from_spec(target, processor))
+
+ return target
+
+ def __init__(self, context: Context, file: Path):
+ self.context = context
+ self.file = str(file)
+
+ def get_processors(self) -> list[Process]:
+ return self.processors
+
+ def get_inputs(self) -> list[ProcessInputSpec]:
+ return self.inputs
+
+ def write(self) -> str:
+ """
+ Apply all processors, compare to the target and write the delta to :any:`Context.output`.
+ """
+
+ config = self.context.config
+ assert self.file is not None
+ diff = Diff(config, self.file)
+
+ diff.a = self.context.get_file(TargetFileInputSpec(path=Path(self.file)))
+ diff.b = self.patch
+
+ processors = self.get_processors()
+ for processor in processors:
+ input = diff.b
+ if not isinstance(processor.input_spec, DefaultInputSpec):
+ input = self.context.get_file(processor.input_spec)
+ diff.b = processor.transform(input)
+
+ return diff.compare()
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}(file={self.file})"
diff --git a/todo.txt b/todo.txt
new file mode 100644
index 0000000..1ff116d
--- /dev/null
+++ b/todo.txt
@@ -0,0 +1,16 @@
+- with frontmatter
+ - input = own file content
+- separate yaml file
+ - input = content of file name with .yaml extension removed (if exists in patchset)
+ OR target file content
+
+- files without .yaml extension and without frontmatter overwrite/replace any files in the
+ target directory (including mode)
+
+- yaml !input or !target to select specific inputs from patchset/target tree (!input
+ implicitly excludes file from being copied verbatim into target tree)
+
+- processor input/(target, steal name from git diff source) keys for specifying
+ primary/secondary processor inputs
+
+- USE FS FOR INPUT FS AS WELL