Source code for law.target.local

# coding: utf-8

"""
Local target implementations.
"""

__all__ = ["LocalFileSystem", "LocalTarget", "LocalFileTarget", "LocalDirectoryTarget"]


import os
import fnmatch
import shutil
import glob
import random
from contextlib import contextmanager

import six

from law.config import Config
import law.target.luigi_shims as shims
from law.target.file import (
    FileSystem, FileSystemTarget, FileSystemFileTarget, FileSystemDirectoryTarget, get_path,
    get_scheme, add_scheme, remove_scheme,
)
from law.target.formatter import AUTO_FORMATTER, find_formatter
from law.util import is_file_exists_error
from law.logger import get_logger


logger = get_logger(__name__)


[docs]class LocalFileSystem(FileSystem, shims.LocalFileSystem): default_instance = None @classmethod def parse_config(cls, section, config=None, overwrite=False): config = super(LocalFileSystem, cls).parse_config(section, config=config, overwrite=overwrite) cfg = Config.instance() # helper to add a config value if it exists, extracted with a config parser method def add(option, func): if option not in config or overwrite: config[option] = func(section, option) # default base path add("base", cfg.get_expanded) return config def __init__(self, section=None, base=None, **kwargs): # setting both section and base is ambiguous and not allowed if section and base: raise Exception( "setting both 'section' and 'base' as {} arguments is ambiguous and therefore not " "supported, but got {} and {}".format(self.__class__.__name__, section, base), ) # determine the configured default local fs section cfg = Config.instance() default_section = cfg.get_expanded("target", "default_local_fs") self.config_section = None # when no base is given, evaluate the config section if not base: if not section: # use the default section when none is set section = default_section elif section != default_section: # check if the section exists if not cfg.has_section(section): raise Exception("law config has no section '{}' to read {} options".format( section, self.__class__.__name__)) # extend non-default sections by options of the default one data = dict(cfg.items(default_section, expand_vars=False, expand_user=False)) cfg.update({section: data}, overwrite_sections=True, overwrite_options=False) self.config_section = section # parse the config and set fs name and base kwargs = self.parse_config(self.config_section, kwargs) kwargs.setdefault("name", self.config_section) base = kwargs.pop("base", None) or os.sep # special case: the default local fs is not allowed to have a base directory other than # "/" to ensure that files and directories wrapped by local targets in law or derived # projects for convenience are interpreted as such and in particular do not resolve them # relative to a base path defined in some config if self.config_section == default_section and base != os.sep: raise Exception( "the default local fs '{}' must not have a base defined, but got {}".format( default_section, base), ) # set the base self.base = os.path.abspath(self._unscheme(str(base))) super(LocalFileSystem, self).__init__(**kwargs) def _unscheme(self, path): path = str(path) return remove_scheme(path) if get_scheme(path) == "file" else path def abspath(self, path): path = os.path.expandvars(os.path.expanduser(self._unscheme(path))) # join with the base path base = os.path.expandvars(os.path.expanduser(str(self.base))) path = os.path.join(base, path) return os.path.abspath(path) def stat(self, path, **kwargs): return os.stat(self.abspath(path))
[docs] def exists(self, path, stat=False, **kwargs): exists = os.path.exists(self.abspath(path)) return (self.stat(path, **kwargs) if exists else None) if stat else exists
[docs] def isdir(self, path, **kwargs): return os.path.isdir(self.abspath(path))
def isfile(self, path, **kwargs): return os.path.isfile(self.abspath(path)) def chmod(self, path, perm, silent=True, **kwargs): if not self.has_permissions or perm is None: return True if silent and not self.exists(path): return False os.chmod(self.abspath(path), perm) return True
[docs] def remove(self, path, recursive=True, silent=True, **kwargs): abspath = self.abspath(path) if silent and not self.exists(path): return False if self.isdir(path): if recursive: shutil.rmtree(abspath) else: os.rmdir(abspath) else: os.remove(abspath) return True
[docs] def mkdir(self, path, perm=None, recursive=True, silent=True, **kwargs): if silent and self.exists(path): return False if perm is None: perm = self.default_dir_perm # prepare arguments passed to makedirs or mkdir args = (self.abspath(path),) if perm is not None: args += (perm,) # the mode passed to os.mkdir or os.makedirs is ignored on some systems, so the strategy # here is to disable the process' current umask, create the directories and use chmod again orig = os.umask(0) if perm is not None else None func = os.makedirs if recursive else os.mkdir try: try: func(*args) except Exception as e: if not silent or not is_file_exists_error(e): raise self.chmod(path, perm) finally: if orig is not None: os.umask(orig) return True
[docs] def listdir(self, path, pattern=None, type=None, **kwargs): abspath = self.abspath(path) elems = os.listdir(abspath) # apply pattern filter if pattern is not None: elems = fnmatch.filter(elems, pattern) # apply type filter if type == "f": elems = [e for e in elems if not self.isdir(os.path.join(path, e))] elif type == "d": elems = [e for e in elems if self.isdir(os.path.join(path, e))] return elems
def walk(self, path, max_depth=-1, **kwargs): # mimic os.walk with a max_depth and yield the current depth search_dirs = [(str(path), 0)] while search_dirs: (search_dir, depth) = search_dirs.pop(0) # check depth if max_depth >= 0 and depth > max_depth: continue # find dirs and files dirs = [] files = [] for elem in self.listdir(search_dir): if self.isdir(os.path.join(search_dir, elem)): dirs.append(elem) else: files.append(elem) # yield everything yield (self.abspath(search_dir), dirs, files, depth) # use dirs to update search dirs search_dirs.extend((os.path.join(search_dir, d), depth + 1) for d in dirs) def glob(self, pattern, cwd=None, **kwargs): pattern = self.abspath(pattern) if cwd is not None: cwd = self.abspath(cwd) pattern = os.path.join(cwd, pattern) elems = glob.glob(pattern) # cut the cwd if there was any if cwd is not None: elems = [os.path.relpath(e, cwd) for e in elems] return elems def _prepare_dst_dir(self, dst, src=None, perm=None, **kwargs): """ Prepares the directory of a target located at *dst* for copying and returns its full location as specified below. *src* can be the location of a source file target, which is (e.g.) used by a file copy or move operation. When *dst* is already a directory, calling this method has no effect and the *dst* path is returned, optionally joined with the basename of *src*. When *dst* is a file, the absolute *dst* path is returned. Otherwise, when *dst* does not exist yet, it is interpreted as a file path and missing directories are created when :py:attr:`create_file_dir` is *True*, using *perm* to set the directory permission. The absolute path to *dst* is returned. """ dst, src = str(dst), src and str(src) if self.isdir(dst): full_dst = os.path.join(dst, os.path.basename(src)) if src else dst elif self.isfile(dst): full_dst = dst else: # interpret dst as a file name, create missing dirs dst_dir = self.dirname(dst) if dst_dir and self.create_file_dir and not self.isdir(dst_dir): self.mkdir(dst_dir, perm=perm, recursive=True) full_dst = dst return full_dst
[docs] def copy(self, src, dst, perm=None, dir_perm=None, **kwargs): dst = self._prepare_dst_dir(dst, src=src, perm=dir_perm) # copy the file shutil.copy2(self.abspath(src), self.abspath(dst)) # set permissions if perm is None: perm = self.default_file_perm self.chmod(dst, perm) return dst
[docs] def move(self, src, dst, perm=None, dir_perm=None, **kwargs): dst = self._prepare_dst_dir(dst, src=src, perm=dir_perm) # move the file shutil.move(self.abspath(src), self.abspath(dst)) # set permissions if perm is None: perm = self.default_file_perm self.chmod(dst, perm) return dst
def open(self, path, mode, perm=None, dir_perm=None, **kwargs): abspath = self.abspath(path) # some preparations in case the file is written or updated # check if the file is only read if not mode.startswith("r"): # prepare the destination directory self._prepare_dst_dir(path, perm=dir_perm) if perm is None: perm = self.default_file_perm # when setting permissions, ensure the file exists first if perm is not None and self.has_permissions: open(abspath, mode).close() self.chmod(path, perm) return open(abspath, mode)
LocalFileSystem.default_instance = LocalFileSystem()
[docs]class LocalTarget(FileSystemTarget, shims.LocalTarget): fs = LocalFileSystem.default_instance def __init__(self, path=None, fs=LocalFileSystem.default_instance, is_tmp=False, tmp_dir=None, **kwargs): if isinstance(fs, six.string_types): fs = LocalFileSystem(fs) # handle tmp paths manually since luigi uses the env tmp dir if not path: if not is_tmp: raise Exception("when no target path is defined, is_tmp must be set") if str(fs.base) != "/": raise Exception( "when is_tmp is set, the base of the underlying file system must be '/', but " "found '{}'".format(fs.base), ) # if not set, get the tmp dir from the config and ensure that it exists cfg = Config.instance() if tmp_dir: tmp_dir = get_path(tmp_dir) else: tmp_dir = os.path.realpath(cfg.get_expanded("target", "tmp_dir")) if not fs.exists(tmp_dir): perm = cfg.get_expanded_int("target", "tmp_dir_perm") fs.mkdir(tmp_dir, perm=perm) # create a random path while True: basename = "luigi-tmp-{:09d}".format(random.randint(0, 999999999)) path = os.path.join(tmp_dir, basename) if not fs.exists(path): break # is_tmp might be a file extension if isinstance(is_tmp, six.string_types): if is_tmp[0] != ".": is_tmp = "." + is_tmp path += is_tmp else: # ensure path is not a target and has no scheme path = fs._unscheme(get_path(path)) super(LocalTarget, self).__init__(path=path, is_tmp=is_tmp, fs=fs, **kwargs) def __del__(self): # when this destructor is called during shutdown, os.path or os.path.exists might be unset if getattr(os, "path", None) is None or not callable(os.path.exists): return super(LocalTarget, self).__del__() def _repr_flags(self): flags = super(LocalTarget, self)._repr_flags() if self.is_tmp: flags.append("temporary") return flags def _parent_args(self): args, kwargs = super(LocalTarget, self)._parent_args() kwargs["fs"] = self.fs return args, kwargs @property def abspath(self): return self.uri(scheme=False) def uri(self, scheme=True, return_all=False, **kwargs): uri = self.fs.abspath(self.path) if scheme: uri = add_scheme(uri, "file") return [uri] if return_all else uri def copy_to_local(self, *args, **kwargs): return self.fs._unscheme(self.copy_to(*args, **kwargs)) def copy_from_local(self, *args, **kwargs): return self.fs._unscheme(self.copy_from(*args, **kwargs)) def move_to_local(self, *args, **kwargs): return self.fs._unscheme(self.move_to(*args, **kwargs)) def move_from_local(self, *args, **kwargs): return self.fs._unscheme(self.move_from(*args, **kwargs)) def load(self, *args, **kwargs): # remove kwargs that might be designated for remote files kwargs = RemoteFileSystem.split_remote_kwargs(kwargs)[1] # invoke formatter formatter = kwargs.pop("_formatter", None) or kwargs.pop("formatter", AUTO_FORMATTER) return find_formatter(self.abspath, "load", formatter).load(self.abspath, *args, **kwargs) def dump(self, *args, **kwargs): # remove kwargs that might be designated for remote files kwargs = RemoteFileSystem.split_remote_kwargs(kwargs)[1] # also remove permission settings perm = kwargs.pop("perm", None) dir_perm = kwargs.pop("dir_perm", None) # create intermediate directories self.parent.touch(perm=dir_perm) # invoke the formatter formatter = kwargs.pop("_formatter", None) or kwargs.pop("formatter", AUTO_FORMATTER) ret = find_formatter(self.abspath, "dump", formatter).dump(self.abspath, *args, **kwargs) # chmod if perm and self.exists(): self.chmod(perm) return ret
[docs]class LocalFileTarget(FileSystemFileTarget, LocalTarget): @contextmanager def localize(self, mode="r", perm=None, dir_perm=None, tmp_dir=None, **kwargs): if mode not in ["r", "w", "a"]: raise Exception("unknown mode '{}', use 'r', 'w' or 'a'".format(mode)) logger.debug("localizing {!r} with mode '{}'".format(self, mode)) # get additional arguments is_tmp = kwargs.pop("is_tmp", mode in ("w", "a")) if mode == "r": if is_tmp: # create a temporary target tmp = self.__class__(is_tmp=self.ext(n=1) or True, tmp_dir=tmp_dir) # always copy self.copy_to_local(tmp) # yield the copy try: yield tmp finally: tmp.remove(silent=True) else: # simply yield yield self else: # mode "w" or "a" if is_tmp: # create a temporary target tmp = self.__class__(is_tmp=self.ext(n=1) or True, tmp_dir=tmp_dir) # copy in append mode if mode == "a" and self.exists(): self.copy_to_local(tmp) # yield the copy try: yield tmp # move back again if tmp.exists(): tmp.copy_to_local(self, perm=perm, dir_perm=dir_perm) else: logger.warning("cannot move non-existing localized target to actual " "representation {!r}".format(self)) finally: tmp.remove() else: # create the parent dir self.parent.touch(perm=dir_perm) # simply yield yield self self.chmod(perm, silent=True)
[docs]class LocalDirectoryTarget(FileSystemDirectoryTarget, LocalTarget): def _child_args(self, path): args, kwargs = super(LocalDirectoryTarget, self)._child_args(path) kwargs["fs"] = self.fs return args, kwargs @contextmanager def localize(self, mode="r", perm=None, dir_perm=None, tmp_dir=None, **kwargs): if mode not in ["r", "w", "a"]: raise Exception("unknown mode '{}', use 'r', 'w' or 'a'".format(mode)) logger.debug("localizing {!r} with mode '{}'".format(self, mode)) # get additional arguments is_tmp = kwargs.pop("is_tmp", mode in ("w", "a")) if mode == "r": if is_tmp: # create a temporary target tmp = self.__class__(is_tmp=True, tmp_dir=tmp_dir) # copy contents self.copy_to_local(tmp) # yield the copy try: yield tmp finally: tmp.remove(silent=True) else: # simply yield yield self else: # mode "w" or "a" if is_tmp: # create a temporary target tmp = self.__class__(is_tmp=True, tmp_dir=tmp_dir) # copy in append mode, otherwise ensure that it exists if mode == "a" and self.exists(): self.copy_to_local(tmp) else: tmp.touch() # yield the copy try: yield tmp # move back again, first removing current content # TODO: keep track of changed contents in "a" mode and copy only those? if tmp.exists(): self.remove() tmp.copy_to_local(self, perm=perm, dir_perm=dir_perm) else: logger.warning("cannot move non-existing localized target to actual " "representation {!r}, leaving original contents unchanged".format(self)) finally: tmp.remove() else: # create the parent dir and the directory itself self.parent.touch(perm=dir_perm) self.touch(perm=perm) # simply yield, do not differentiate "w" and "a" modes yield self
LocalTarget.file_class = LocalFileTarget LocalTarget.directory_class = LocalDirectoryTarget # trailing imports from law.target.remote.base import RemoteFileSystem