Source code for law.target.file

# coding: utf-8

"""
Custom luigi file system and target objects.
"""

__all__ = [
    "FileSystem", "FileSystemTarget", "FileSystemFileTarget", "FileSystemDirectoryTarget",
    "get_path", "get_scheme", "has_scheme", "add_scheme", "remove_scheme", "localize_file_targets",
]


import os
import sys
import re
from abc import abstractmethod, abstractproperty
from functools import partial
from contextlib import contextmanager

from law.config import Config
import law.target.luigi_shims as shims
from law.target.base import Target
from law.util import map_struct, create_random_string, human_bytes, no_value


[docs]class FileSystem(shims.FileSystem): @classmethod def parse_config(cls, section, config=None, overwrite=False): # reads a law config section and returns parsed file system configs cfg = Config.instance() if config is None: config = {} # helper to add a config value if it exists, extracted with a config parser method def add(option, func): if option not in config or overwrite: config[option] = func(section, option) # read configs int_or_none = partial(cfg.get_expanded_int, default=None) add("has_permissions", cfg.get_expanded_bool) add("default_file_perm", int_or_none) add("default_dir_perm", int_or_none) add("create_file_dir", cfg.get_expanded_bool) return config def __init__(self, name=None, has_permissions=True, default_file_perm=None, default_dir_perm=None, create_file_dir=True, **kwargs): super(FileSystem, self).__init__(**kwargs) self.name = name self.has_permissions = has_permissions self.default_file_perm = default_file_perm self.default_dir_perm = default_dir_perm self.create_file_dir = create_file_dir def __repr__(self): return "{}(name={}, {})".format(self.__class__.__name__, self.name, hex(id(self))) def dirname(self, path): return os.path.dirname(str(path)) if path != "/" else None def basename(self, path): return os.path.basename(str(path)) if path != "/" else "/" def ext(self, path, n=1): # split the path parts = self.basename(path).lstrip(".").split(".") # empty extension in the trivial case or use the last n parts except for the first one return "" if len(parts) == 1 else ".".join(parts[1:][min(-n, 0):]) def _unscheme(self, path): return remove_scheme(path) @abstractproperty def default_instance(self): return @abstractmethod def abspath(self, path): return @abstractmethod def stat(self, path, **kwargs): return
[docs] @abstractmethod def exists(self, path, stat=False, **kwargs): return
[docs] @abstractmethod def isdir(self, path, **kwargs): return
@abstractmethod def isfile(self, path, **kwargs): return @abstractmethod def chmod(self, path, perm, silent=True, **kwargs): return
[docs] @abstractmethod def remove(self, path, recursive=True, silent=True, **kwargs): return
[docs] @abstractmethod def mkdir(self, path, perm=None, recursive=True, silent=True, **kwargs): return
[docs] @abstractmethod def listdir(self, path, pattern=None, type=None, **kwargs): return
@abstractmethod def walk(self, path, max_depth=-1, **kwargs): return @abstractmethod def glob(self, pattern, cwd=None, **kwargs): return
[docs] @abstractmethod def copy(self, src, dst, perm=None, dir_perm=None, **kwargs): return
[docs] @abstractmethod def move(self, src, dst, perm=None, dir_perm=None, **kwargs): return
@abstractmethod @contextmanager def open(self, path, mode, perm=None, dir_perm=None, **kwargs): return
[docs]class FileSystemTarget(Target, shims.FileSystemTarget): file_class = None directory_class = None def __init__(self, path, fs=None, **kwargs): if fs: self.fs = fs self._path = None self._unexpanded_path = None super(FileSystemTarget, self).__init__(path=path, **kwargs) def _repr_pairs(self, color=True): pairs = super(FileSystemTarget, self)._repr_pairs() # add the fs name if self.fs: pairs.append(("fs", self.fs.name)) # add the path cfg = Config.instance() expand = cfg.get_expanded_bool("target", "expand_path_repr") pairs.append(("path", self.path if expand else self.unexpanded_path)) # optionally add the file size if cfg.get_expanded_bool("target", "filesize_repr"): stat = self.exists(stat=True) pairs.append(("size", human_bytes(stat.st_size, fmt="{:.1f}{}") if stat else "-")) return pairs def _parent_args(self): return (), {} @property def unexpanded_path(self): return self._unexpanded_path @property def path(self): return self._path @path.setter def path(self, path): path = self.fs._unscheme(str(path)) self._unexpanded_path = path self._path = os.path.expandvars(os.path.expanduser(self._unexpanded_path)) @property def dirname(self): return self.fs.dirname(self.path) @property def abs_dirname(self): return self.fs.dirname(self.abspath) @property def basename(self): return self.fs.basename(self.path) @property def unique_basename(self): return "{}_{}".format(hex(self.hash)[2:], self.basename) @property def parent(self): # get the dirname, but favor the unexpanded one to propagate variables dirname = self.dirname unexpanded_dirname = self.fs.dirname(self.unexpanded_path) expanded_dirname = os.path.expandvars(os.path.expanduser(unexpanded_dirname)) if unexpanded_dirname and self.fs.abspath(dirname) == self.fs.abspath(expanded_dirname): dirname = unexpanded_dirname args, kwargs = self._parent_args() return self.directory_class(dirname, *args, **kwargs) if dirname is not None else None def sibling(self, *args, **kwargs): parent = self.parent if not parent: raise Exception("cannot determine parent of {!r}".format(self)) return parent.child(*args, **kwargs) def stat(self, **kwargs): return self.fs.stat(self.path, **kwargs)
[docs] def exists(self, **kwargs): return self.fs.exists(self.path, **kwargs)
[docs] def remove(self, silent=True, **kwargs): self.fs.remove(self.path, silent=silent, **kwargs)
def chmod(self, perm, silent=False, **kwargs): self.fs.chmod(self.path, perm, silent=silent, **kwargs) @abstractproperty def fs(self): return @abstractproperty def abspath(self): return @abstractmethod def uri(self, return_all=False, scheme=True, **kwargs): return @abstractmethod def touch(self, perm=None, dir_perm=None, **kwargs): return @abstractmethod def copy_to(self, dst, perm=None, dir_perm=None, **kwargs): return @abstractmethod def copy_from(self, src, perm=None, dir_perm=None, **kwargs): return @abstractmethod def move_to(self, dst, perm=None, dir_perm=None, **kwargs): return @abstractmethod def move_from(self, src, perm=None, dir_perm=None, **kwargs): return @abstractmethod def copy_to_local(self, *args, **kwargs): return @abstractmethod def copy_from_local(self, *args, **kwargs): return @abstractmethod def move_to_local(self, *args, **kwargs): return @abstractmethod def move_from_local(self, *args, **kwargs): return @abstractmethod @contextmanager def localize(self, mode="r", perm=None, dir_perm=None, tmp_dir=None, **kwargs): return @abstractmethod def load(self, *args, **kwargs): return @abstractmethod def dump(self, *args, **kwargs): return
[docs]class FileSystemFileTarget(FileSystemTarget): type = "f" def ext(self, n=1): return self.fs.ext(self.path, n=n)
[docs] def open(self, mode, **kwargs): return self.fs.open(self.path, mode, **kwargs)
def touch(self, **kwargs): # create the file via open without content with self.open("w", **kwargs) as f: f.write("") def copy_to(self, dst, perm=None, dir_perm=None, **kwargs): # TODO: complain when dst not local? forward to copy_from request depending on protocol? return self.fs.copy(self.path, get_path(dst), perm=perm, dir_perm=dir_perm, **kwargs) def copy_from(self, src, perm=None, dir_perm=None, **kwargs): if isinstance(src, FileSystemFileTarget): return src.copy_to(self.abspath, perm=perm, dir_perm=dir_perm, **kwargs) # when src is a plain string, let the fs handle it # TODO: complain when src not local? forward to copy_to request depending on protocol? return self.fs.copy(get_path(src), self.path, perm=perm, dir_perm=dir_perm, **kwargs) def move_to(self, dst, perm=None, dir_perm=None, **kwargs): # TODO: complain when dst not local? forward to copy_from request depending on protocol? return self.fs.move(self.path, get_path(dst), perm=perm, dir_perm=dir_perm, **kwargs) def move_from(self, src, perm=None, dir_perm=None, **kwargs): if isinstance(src, FileSystemFileTarget): return src.move_to(self.abspath, perm=perm, dir_perm=dir_perm, **kwargs) # when src is a plain string, let the fs handle it # TODO: complain when src not local? forward to copy_to request depending on protocol? return self.fs.move(get_path(src), self.path, perm=perm, dir_perm=dir_perm, **kwargs)
[docs]class FileSystemDirectoryTarget(FileSystemTarget): type = "d" open = None def _child_args(self, path): return (), {} def child(self, path, type=None, mktemp_pattern=False, **kwargs): if type not in (None, "f", "d"): raise ValueError("invalid child type, use 'f' or 'd'") # apply mktemp's feature to replace at least three consecutive 'X' with random characters path = get_path(path) if mktemp_pattern and "XXX" in path: repl = lambda m: create_random_string(l=len(m.group(1))) path = re.sub("(X{3,})", repl, path) unexpanded_path = os.path.join(self.unexpanded_path, path) path = os.path.join(self.path, path) if type == "f": cls = self.file_class elif type == "d": cls = self.__class__ elif not self.fs.exists(path): raise Exception("cannot guess type of non-existing path '{}'".format(path)) elif self.fs.isdir(path): cls = self.__class__ else: cls = self.file_class args, _kwargs = self._child_args(path) _kwargs.update(kwargs) return cls(unexpanded_path, *args, **_kwargs) def listdir(self, **kwargs): return self.fs.listdir(self.path, **kwargs) def glob(self, pattern, **kwargs): return self.fs.glob(pattern, cwd=self.path, **kwargs) def walk(self, **kwargs): return self.fs.walk(self.path, **kwargs) def touch(self, **kwargs): kwargs.setdefault("silent", True) self.fs.mkdir(self.path, **kwargs) def copy_to(self, dst, perm=None, dir_perm=None, **kwargs): # create the target dir _dst = get_path(dst) if isinstance(dst, FileSystemDirectoryTarget): dst.touch(perm=dir_perm, **kwargs) else: # TODO: complain when dst not local? forward to copy_from request depending on protocol? self.fs.mkdir(_dst, perm=dir_perm, **kwargs) # walk and operate recursively for path, dirs, files, _ in self.walk(max_depth=0, **kwargs): # recurse through directories and files for basenames, type_flag in [(dirs, "d"), [files, "f"]]: for basename in basenames: t = self.child(basename, type=type_flag) t.copy_to(os.path.join(_dst, basename), perm=perm, dir_perm=dir_perm, **kwargs) return _dst def copy_from(self, src, perm=None, dir_perm=None, **kwargs): # when src is a directory target itself, forward to its copy_to implementation as it might # be more performant to use its own directory walking if isinstance(src, FileSystemDirectoryTarget): return src.copy_to(self, perm=perm, dir_perm=dir_perm, **kwargs) # create the target dir self.touch(perm=dir_perm, **kwargs) # when src is a plain string, let the fs handle it # walk and operate recursively # TODO: complain when src not local? forward to copy_from request depending on protocol? _src = get_path(src) for path, dirs, files, _ in self.fs.walk(_src, max_depth=0, **kwargs): # recurse through directories and files for basenames, type_flag in [(dirs, "d"), [files, "f"]]: for basename in basenames: t = self.child(basename, type=type_flag) t.copy_from(os.path.join(_src, basename), perm=perm, dir_perm=dir_perm, **kwargs) return self.abspath def move_to(self, dst, perm=None, dir_perm=None, **kwargs): # create the target dir _dst = get_path(dst) if isinstance(dst, FileSystemDirectoryTarget): dst.touch(perm=dir_perm, **kwargs) else: # TODO: complain when dst not local? forward to copy_from request depending on protocol? self.fs.mkdir(_dst, perm=dir_perm, **kwargs) # walk and operate recursively for path, dirs, files, _ in self.walk(max_depth=0, **kwargs): # recurse through directories and files for basenames, type_flag in [(dirs, "d"), [files, "f"]]: for basename in basenames: t = self.child(basename, type=type_flag) t.move_to(os.path.join(_dst, basename), perm=perm, dir_perm=dir_perm, **kwargs) # finally remove self.remove() return _dst def move_from(self, src, perm=None, dir_perm=None, **kwargs): # when src is a directory target itself, forward to its move_to implementation as it might # be more performant to use its own directory walking if isinstance(src, FileSystemDirectoryTarget): return src.move_to(self, perm=perm, dir_perm=dir_perm, **kwargs) # create the target dir self.touch(perm=dir_perm, **kwargs) # when src is a plain string, let the fs handle it # walk and operate recursively # TODO: complain when src not local? forward to copy_from request depending on protocol? _src = get_path(src) for path, dirs, files, _ in self.fs.walk(_src, max_depth=0, **kwargs): # recurse through directories and files for basenames, type_flag in [(dirs, "d"), [files, "f"]]: for basename in basenames: t = self.child(basename, type=type_flag) t.copy_from(os.path.join(_src, basename), perm=perm, dir_perm=dir_perm, **kwargs) # finally remove self.fs.remove(_src) return self.abspath
FileSystemTarget.file_class = FileSystemFileTarget FileSystemTarget.directory_class = FileSystemDirectoryTarget
[docs]def get_path(target): if isinstance(target, FileSystemTarget): path = getattr(target, "abspath", no_value) if path != no_value: return path path = getattr(target, "path", no_value) if path != no_value: return path if target: return str(target) return target
[docs]def get_scheme(uri): # ftp://path/to/file -> ftp # /path/to/file -> None m = re.match(r"^(\w+)\:\/\/.*$", str(uri)) return m.group(1) if m else None
[docs]def has_scheme(uri): return get_scheme(uri) is not None
[docs]def add_scheme(path, scheme): # adds a scheme to a path, if it does not already contain one path = str(path) return "{}://{}".format(scheme.rstrip(":/"), path) if not has_scheme(path) else path
[docs]def remove_scheme(uri): # ftp://path/to/file -> /path/to/file # /path/to/file -> /path/to/file return re.sub(r"^(\w+\:\/\/)", "", str(uri))
[docs]@contextmanager def localize_file_targets(struct, *args, **kwargs): """ Takes an arbitrary *struct* of targets, opens the contexts returned by their :py:meth:`FileSystemFileTarget.localize` implementations and yields their localized representations in the same structure as passed in *struct*. When the context is closed, the contexts of all localized targets are closed. """ managers = [] def enter(target): if callable(getattr(target, "localize", None)): manager = target.localize(*args, **kwargs) managers.append(manager) return manager.__enter__() return target # localize all targets, maintain the structure localized_targets = map_struct(enter, struct) # prepare exception info exc = None exc_info = (None, None, None) try: yield localized_targets except (Exception, KeyboardInterrupt) as e: exc = e exc_info = sys.exc_info() raise finally: exit_exc = [] for manager in managers: try: manager.__exit__(*exc_info) except Exception as e: exit_exc.append(e) # when there was no exception during the actual yield and # an exception occured in one of the exit methods, raise the first one if not exc and exit_exc: raise exit_exc[0]