# coding: utf-8
"""
Custom luigi file system and target objects.
"""
from __future__ import annotations
__all__ = [
"FileSystem", "FileSystemTarget", "FileSystemFileTarget", "FileSystemDirectoryTarget",
"get_path", "get_scheme", "has_scheme", "add_scheme", "remove_scheme", "localize_file_targets",
]
import os
import sys
import re
import pathlib
import contextlib
from abc import abstractmethod, abstractproperty
from functools import partial
from law.config import Config
import law.target.luigi_shims as shims
from law.target.base import Target
from law.util import map_struct, create_random_string, human_bytes, no_value
from law._types import (
Any, Generator, Callable, T, Literal, Iterator, Type, AbstractContextManager, IO,
)
[docs]
class FileSystem(shims.FileSystem):
@classmethod
def parse_config(
cls,
section: str,
config: dict[str, Any] | None = None,
*,
overwrite: bool = False,
) -> dict[str, Any]:
# reads a law config section and returns parsed file system configs
cfg = Config.instance()
if config is None:
config = {}
# helper to add a config value if it exists, extracted with a config parser method
def add(option: str, func: Callable[[str, str], Any]) -> None:
if option not in config or overwrite:
config[option] = func(section, option)
# read configs
int_or_none = partial(cfg.get_expanded_int, default=None)
add("has_permissions", cfg.get_expanded_bool)
add("default_file_perm", int_or_none)
add("default_dir_perm", int_or_none)
add("create_file_dir", cfg.get_expanded_bool)
return config
def __init__(
self,
name: str | None = None,
*,
has_permissions: bool = True,
default_file_perm: int | None = None,
default_dir_perm: int | None = None,
create_file_dir: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.name = name
self.has_permissions = has_permissions
self.default_file_perm = default_file_perm
self.default_dir_perm = default_dir_perm
self.create_file_dir = create_file_dir
def __repr__(self) -> str:
return f"{self.__class__.__name__}(name={self.name}, {hex(id(self))})"
def dirname(self, path: str | pathlib.Path) -> str | None:
return os.path.dirname(str(path)) if path != "/" else None
def basename(self, path: str | pathlib.Path) -> str:
return os.path.basename(str(path)) if path != "/" else "/"
def ext(self, path: str | pathlib.Path, n: int = 1) -> str:
# split the path
parts = self.basename(path).lstrip(".").split(".")
# empty extension in the trivial case or use the last n parts except for the first one
return "" if len(parts) == 1 else ".".join(parts[1:][min(-n, 0):])
def _unscheme(self, path: str | pathlib.Path) -> str:
return remove_scheme(path)
@abstractproperty
def default_instance(self) -> FileSystem:
...
@abstractmethod
def abspath(self, path: str | pathlib.Path) -> str:
...
@abstractmethod
def stat(self, path: str | pathlib.Path, **kwargs) -> os.stat_result:
...
[docs]
@abstractmethod
def exists(
self,
path: str | pathlib.Path,
*,
stat: bool = False,
**kwargs,
) -> bool | os.stat_result | None:
...
[docs]
@abstractmethod
def isdir(self, path: str | pathlib.Path, **kwargs) -> bool:
...
@abstractmethod
def isfile(self, path: str | pathlib.Path, **kwargs) -> bool:
...
@abstractmethod
def chmod(self, path: str | pathlib.Path, perm: int, *, silent: bool = True, **kwargs) -> bool:
...
[docs]
@abstractmethod
def remove( # type: ignore[override]
self,
path: str | pathlib.Path,
*,
recursive: bool = True,
silent: bool = True,
**kwargs,
) -> bool:
...
[docs]
@abstractmethod
def mkdir( # type: ignore[override]
self,
path: str | pathlib.Path,
*,
perm: int | None = None,
recursive: bool = True,
silent: bool = True,
**kwargs,
) -> bool:
...
[docs]
@abstractmethod
def listdir(
self,
path: str | pathlib.Path,
*,
pattern: str | None = None,
type: Literal["f", "d"] | None = None,
**kwargs,
) -> list[str]:
...
@abstractmethod
def walk(
self,
path: str | pathlib.Path,
*,
max_depth: int = -1,
**kwargs,
) -> Iterator[tuple[str, list[str], list[str], int]]:
...
@abstractmethod
def glob(
self,
pattern: str | pathlib.Path,
*,
cwd: str | pathlib.Path | None = None,
**kwargs,
) -> list[str]:
...
[docs]
@abstractmethod
def copy( # type: ignore[override]
self,
src: str | pathlib.Path,
dst: str | pathlib.Path,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
...
[docs]
@abstractmethod
def move( # type: ignore[override]
self,
src: str | pathlib.Path,
dst: str | pathlib.Path,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
...
@abstractmethod
@contextlib.contextmanager
def open(
self,
path: str | pathlib.Path,
mode: str,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> Iterator[IO]:
...
[docs]
class FileSystemTarget(Target, shims.FileSystemTarget):
# must be set by subclasses
file_class: Type[FileSystemFileTarget]
directory_class: Type[FileSystemDirectoryTarget]
open: Callable | None = None # type: ignore[assignment]
def __init__(self, path: str | pathlib.Path, fs: FileSystem | None = None, **kwargs) -> None:
if fs is not None:
self.fs: FileSystem = fs # type: ignore[misc]
# _path and _unexpanded_path are set during super init through properties below
self._path: str
self._unexpanded_path: str
super().__init__(path=path, **kwargs)
def _repr_pairs(self, color: bool = True) -> list[tuple[str, Any]]:
pairs = super()._repr_pairs()
# add the fs name
if self.fs:
pairs.append(("fs", self.fs.name))
# add the path
cfg = Config.instance()
expand = cfg.get_expanded_bool("target", "expand_path_repr")
pairs.append(("path", self.path if expand else self.unexpanded_path))
# optionally add the file size
if cfg.get_expanded_bool("target", "filesize_repr"):
stat: os.stat_result = self.exists(stat=True) # type: ignore[assignment]
pairs.append(("size", human_bytes(stat.st_size, fmt="{:.1f}{}") if stat else "-"))
return pairs
def _parent_args(self) -> tuple[tuple[Any, ...], dict[str, Any]]:
return (), {}
@property
def unexpanded_path(self) -> str:
return self._unexpanded_path
@property
def path(self) -> str:
return self._path
@path.setter
def path(self, path: str | pathlib.Path) -> None:
path = self.fs._unscheme(str(path))
self._unexpanded_path = path
self._path = os.path.expandvars(os.path.expanduser(self._unexpanded_path))
@property
def dirname(self) -> str:
return self.fs.dirname(self.path) # type: ignore[return-value]
@property
def absdirname(self) -> str:
return self.fs.dirname(self.abspath) # type: ignore[return-value]
@property
def basename(self) -> str:
return self.fs.basename(self.path)
@property
def unique_basename(self) -> str:
return f"{hex(self.hash)[2:]}_{self.basename}"
@property
def parent(self) -> Type[FileSystemDirectoryTarget] | None:
# get the dirname, but favor the unexpanded one to propagate variables
dirname = self.dirname
unexpanded_dirname: str = self.fs.dirname(self.unexpanded_path) # type: ignore[assignment]
expanded_dirname = os.path.expandvars(os.path.expanduser(unexpanded_dirname))
if unexpanded_dirname and self.fs.abspath(dirname) == self.fs.abspath(expanded_dirname):
dirname = unexpanded_dirname
if dirname is None:
return None
args, kwargs = self._parent_args()
return self.directory_class(dirname, *args, **kwargs) # type: ignore[return-value]
def sibling(self, *args, **kwargs) -> FileSystemTarget:
parent = self.parent
if not parent:
raise Exception(f"cannot determine parent of {self!r}")
return parent.child(*args, **kwargs)
def stat(self, **kwargs) -> os.stat_result:
return self.fs.stat(self.path, **kwargs)
[docs]
def exists(self, **kwargs) -> bool | os.stat_result | None: # type: ignore[override]
return self.fs.exists(self.path, **kwargs)
[docs]
def remove(self, *, silent: bool = True, **kwargs) -> bool: # type: ignore[override]
return self.fs.remove(self.path, silent=silent, **kwargs)
def chmod(self, perm, *, silent: bool = False, **kwargs) -> bool:
return self.fs.chmod(self.path, perm, silent=silent, **kwargs)
def makedirs(self, *args, **kwargs) -> None:
# overwrites luigi's makedirs method
parent = self.parent
if parent:
parent.touch(*args, **kwargs)
def _prepare_dir(self, **kwargs) -> None:
dir_target = self if isinstance(self, self.directory_class) else self.parent
dir_target.touch(**kwargs) # type: ignore[union-attr]
@abstractproperty
def fs(self) -> FileSystem:
...
@abstractproperty
def abspath(self) -> str:
...
@abstractmethod
def uri(self, *, return_all: bool = False, scheme: bool = True, **kwargs) -> str | list[str]:
...
@abstractmethod
def touch(self, *, perm: int | None = None, dir_perm: int | None = None, **kwargs) -> bool:
...
@abstractmethod
def copy_to(
self,
dst: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
...
@abstractmethod
def copy_from(
self,
src: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
...
@abstractmethod
def move_to(
self,
dst: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
...
@abstractmethod
def move_from(
self,
src: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
...
@abstractmethod
def copy_to_local(
self,
dst: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
...
@abstractmethod
def copy_from_local(
self,
src: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
...
@abstractmethod
def move_to_local(
self,
dst: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
...
@abstractmethod
def move_from_local(
self,
src: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
...
@abstractmethod
@contextlib.contextmanager
def localize(
self,
mode: str = "r",
*,
perm: int | None = None,
dir_perm: int | None = None,
tmp_dir: str | pathlib.Path | None = None,
**kwargs,
) -> Iterator[FileSystemTarget]:
...
@abstractmethod
def load(self, *args, **kwargs) -> Any:
...
@abstractmethod
def dump(self, *args, **kwargs) -> Any:
...
[docs]
class FileSystemFileTarget(FileSystemTarget):
type: str = "f"
def ext(self, n: int = 1) -> str:
return self.fs.ext(self.path, n=n)
[docs]
def open(self, mode: str, **kwargs) -> AbstractContextManager[IO]:
return self.fs.open(self.path, mode, **kwargs)
def touch(self, **kwargs) -> bool:
# create the file via open without content
with self.open("w", **kwargs) as f:
f.write("")
return True
def copy_to(
self,
dst: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
if isinstance(dst, FileSystemTarget):
dst._prepare_dir(perm=dir_perm, **kwargs)
# TODO: complain when dst not local? forward to copy_from request depending on protocol?
return self.fs.copy(self.path, get_path(dst), perm=perm, **kwargs)
def copy_from(
self,
src: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
self._prepare_dir(perm=dir_perm, **kwargs)
if isinstance(src, FileSystemFileTarget):
return src.copy_to(self.abspath, perm=perm or self.fs.default_file_perm, **kwargs)
# TODO: complain when src not local? forward to copy_to request depending on protocol?
# when src is a plain string, let the fs handle it
return self.fs.copy(get_path(src), self.path, perm=perm, **kwargs)
def move_to(
self,
dst: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
if isinstance(dst, FileSystemTarget):
dst._prepare_dir(perm=dir_perm, **kwargs)
# TODO: complain when dst not local? forward to copy_from request depending on protocol?
return self.fs.move(self.path, get_path(dst), perm=perm, **kwargs)
def move_from(
self,
src: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
self._prepare_dir(perm=dir_perm, **kwargs)
if isinstance(src, FileSystemFileTarget):
return src.move_to(self.abspath, perm=perm or self.fs.default_file_perm, **kwargs)
# when src is a plain string, let the fs handle it
# TODO: complain when src not local? forward to copy_to request depending on protocol?
return self.fs.move(get_path(src), self.path, perm=perm, **kwargs)
[docs]
class FileSystemDirectoryTarget(FileSystemTarget):
type = "d"
open = None
def _child_args(
self,
path: str | pathlib.Path,
type: str,
) -> tuple[tuple[Any, ...], dict[str, Any]]:
return (), {}
def child(
self,
path: str | pathlib.Path,
type: Literal["f", "d"] | None = None,
*,
mktemp_pattern: str | None = None,
**kwargs,
) -> FileSystemTarget:
if type not in (None, "f", "d"):
raise ValueError("invalid child type, use 'f' or 'd'")
# apply mktemp's feature to replace at least three consecutive 'X' with random characters
path = get_path(path)
if mktemp_pattern and "XXX" in path:
repl = lambda m: create_random_string(l=len(m.group(1)))
path = re.sub("(X{3,})", repl, path)
unexpanded_path = os.path.join(self.unexpanded_path, path)
path = os.path.join(self.path, path)
if type == "f":
cls = self.file_class
elif type == "d":
cls = self.__class__ # type: ignore[assignment]
elif not self.fs.exists(path):
raise Exception(f"cannot guess type of non-existing path '{path}'")
elif self.fs.isdir(path):
cls = self.__class__ # type: ignore[assignment]
type = "d"
else:
cls = self.file_class
type = "f"
args, _kwargs = self._child_args(path, type)
_kwargs.update(kwargs)
return cls(unexpanded_path, *args, **_kwargs)
def listdir(self, **kwargs) -> list[str]:
return self.fs.listdir(self.path, **kwargs)
def glob(self, pattern: str | pathlib.Path, **kwargs) -> list[str]:
return self.fs.glob(pattern, cwd=self.path, **kwargs)
def walk(self, **kwargs) -> Iterator[tuple[str, list[str], list[str], int]]:
return self.fs.walk(self.path, **kwargs)
def touch(self, **kwargs) -> bool: # type: ignore[override]
kwargs.setdefault("silent", True)
return self.fs.mkdir(self.path, **kwargs)
def copy_to(
self,
dst: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
# create the target dir
_dst = get_path(dst)
if isinstance(dst, FileSystemDirectoryTarget):
dst.touch(perm=dir_perm, **kwargs)
else:
# TODO: complain when dst not local? forward to copy_from request depending on protocol?
self.fs.mkdir(_dst, perm=dir_perm, **kwargs)
# walk and operate recursively
for path, dirs, files, _ in self.walk(max_depth=0, **kwargs):
# recurse through directories and files
for basenames, type_flag in [(dirs, "d"), [files, "f"]]:
for basename in basenames:
t = self.child(basename, type=type_flag) # type: ignore[arg-type]
t.copy_to(os.path.join(_dst, basename), perm=perm, dir_perm=dir_perm, **kwargs)
return _dst
def copy_from(
self,
src: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
# when src is a directory target itself, forward to its copy_to implementation as it might
# be more performant to use its own directory walking
if isinstance(src, FileSystemDirectoryTarget):
return src.copy_to(self, perm=perm, dir_perm=dir_perm, **kwargs)
# create the target dir
self.touch(perm=dir_perm, **kwargs)
# when src is a plain string, let the fs handle it
# walk and operate recursively
# TODO: complain when src not local? forward to copy_from request depending on protocol?
_src = get_path(src)
for path, dirs, files, _ in self.fs.walk(_src, max_depth=0, **kwargs):
# recurse through directories and files
for basenames, type_flag in [(dirs, "d"), [files, "f"]]:
for basename in basenames:
t = self.child(basename, type=type_flag) # type: ignore[arg-type]
t.copy_from(os.path.join(_src, basename), perm=perm, dir_perm=dir_perm, **kwargs)
return self.abspath
def move_to(
self,
dst: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
# create the target dir
_dst = get_path(dst)
if isinstance(dst, FileSystemDirectoryTarget):
dst.touch(perm=dir_perm, **kwargs)
else:
# TODO: complain when dst not local? forward to copy_from request depending on protocol?
self.fs.mkdir(_dst, perm=dir_perm, **kwargs)
# walk and operate recursively
for path, dirs, files, _ in self.walk(max_depth=0, **kwargs):
# recurse through directories and files
for basenames, type_flag in [(dirs, "d"), [files, "f"]]:
for basename in basenames:
t = self.child(basename, type=type_flag) # type: ignore[arg-type]
t.move_to(os.path.join(_dst, basename), perm=perm, dir_perm=dir_perm, **kwargs)
# finally remove
self.remove()
return _dst
def move_from(
self,
src: str | pathlib.Path | FileSystemTarget,
*,
perm: int | None = None,
dir_perm: int | None = None,
**kwargs,
) -> str:
# when src is a directory target itself, forward to its move_to implementation as it might
# be more performant to use its own directory walking
if isinstance(src, FileSystemDirectoryTarget):
return src.move_to(self, perm=perm, dir_perm=dir_perm, **kwargs)
# create the target dir
self.touch(perm=dir_perm, **kwargs)
# when src is a plain string, let the fs handle it
# walk and operate recursively
# TODO: complain when src not local? forward to copy_from request depending on protocol?
_src = get_path(src)
for path, dirs, files, _ in self.fs.walk(_src, max_depth=0, **kwargs):
# recurse through directories and files
for basenames, type_flag in [(dirs, "d"), [files, "f"]]:
for basename in basenames:
t = self.child(basename, type=type_flag) # type: ignore[arg-type]
t.copy_from(os.path.join(_src, basename), perm=perm, dir_perm=dir_perm, **kwargs)
# finally remove
self.fs.remove(_src)
return self.abspath
[docs]
def get_path(target: T) -> str:
# file targets
if isinstance(target, FileSystemTarget):
path = getattr(target, "abspath", no_value)
if path != no_value:
return str(path)
# objects that have a "path" attribute
path = getattr(target, "path", no_value)
if path != no_value:
return str(path)
# strings and paths
if isinstance(target, (str, pathlib.Path)):
return str(target)
raise TypeError(f"cannot get path from {target!r}")
[docs]
def get_scheme(uri: str | pathlib.Path) -> str | None:
# ftp://path/to/file -> ftp
# /path/to/file -> None
m = re.match(r"^(\w+)\:\/\/.*$", str(uri))
return m.group(1) if m else None
[docs]
def has_scheme(uri: str | pathlib.Path) -> bool:
return get_scheme(uri) is not None
[docs]
def add_scheme(path: str | pathlib.Path, scheme: str) -> str:
# adds a scheme to a path, if it does not already contain one
path = str(path)
if has_scheme(path):
return path
return f"{scheme.rstrip(':/')}://{path}"
[docs]
def remove_scheme(uri: str | pathlib.Path) -> str:
# ftp://path/to/file -> /path/to/file
# /path/to/file -> /path/to/file
return re.sub(r"^(\w+\:\/\/)", "", str(uri))
[docs]
@contextlib.contextmanager
def localize_file_targets(struct, *args, **kwargs) -> Generator[Any, None, None]:
"""
Takes an arbitrary *struct* of targets, opens the contexts returned by their
:py:meth:`FileSystemFileTarget.localize` implementations and yields their localized
representations in the same structure as passed in *struct*. When the context is closed, the
contexts of all localized targets are closed.
"""
managers = []
def enter(target):
if callable(getattr(target, "localize", None)):
manager = target.localize(*args, **kwargs)
managers.append(manager)
return manager.__enter__()
return target
# localize all targets, maintain the structure
localized_targets = map_struct(enter, struct)
# prepare exception info
exc = None
exc_info = (None, None, None)
try:
yield localized_targets
except (Exception, KeyboardInterrupt) as e:
exc = e
exc_info = sys.exc_info() # type: ignore[assignment]
raise
finally:
exit_exc = []
for manager in managers:
try:
manager.__exit__(*exc_info)
except Exception as e:
exit_exc.append(e)
# when there was no exception during the actual yield and
# an exception occured in one of the exit methods, raise the first one
if not exc and exit_exc:
raise exit_exc[0]