# coding: utf-8
"""
Remote filesystem and targets, using a configurable remote file interface for atomic operations.
"""
__all__ = ["RemoteFileSystem", "RemoteTarget", "RemoteFileTarget", "RemoteDirectoryTarget"]
import os
import time
import fnmatch
from contextlib import contextmanager
import six
from law.config import Config
from law.target.file import (
FileSystem, FileSystemTarget, FileSystemFileTarget, FileSystemDirectoryTarget, get_path,
get_scheme, add_scheme, remove_scheme,
)
from law.target.local import LocalFileSystem, LocalFileTarget, LocalDirectoryTarget
from law.target.remote.cache import RemoteCache
from law.util import make_list, merge_dicts
from law.logger import get_logger
logger = get_logger(__name__)
_local_fs = LocalFileSystem.default_instance
[docs]class RemoteFileSystem(FileSystem):
default_instance = None
file_interface_cls = None
local_fs = _local_fs
_updated_sections = set()
@classmethod
def parse_config(cls, section, config=None, overwrite=False):
config = super(RemoteFileSystem, cls).parse_config(section, config=config,
overwrite=overwrite)
cfg = Config.instance()
# helper to add a config value if it exists, extracted with a config parser method
def add(option, func):
if option not in config or overwrite:
config[option] = func(section, option)
# default setting for validation for existence after copy
add("validate_copy", cfg.get_expanded_bool)
# default setting for using the cache
add("use_cache", cfg.get_expanded_bool)
# cache options
if cfg.options(section, prefix="cache_"):
RemoteCache.parse_config(section, config.setdefault("cache_config", {}),
overwrite=overwrite)
return config
@classmethod
def _update_section_defaults(cls, default_section, section):
# do not update the section when it is the default one or it was already updated
if section == default_section or section in cls._updated_sections:
return
cls._updated_sections.add(section)
# get the defaults
cfg = Config.instance()
defaults = dict(cfg.items(default_section, expand_vars=False, expand_user=False))
# update
cfg.update({section: defaults}, overwrite_sections=True, overwrite_options=False)
[docs] @classmethod
def split_remote_kwargs(cls, kwargs, include=None, skip=None):
"""
Takes keyword arguments *kwargs*, splits them into two separate dictionaries depending on
their content, and returns them in a tuple. The first one will contain arguments related to
potential remote file operations (e.g. ``"cache"`` or ``"retries"``), while the second one
will contain all remaining arguments. This function is used internally to decide which
arguments to pass to target formatters. *include* (*skip*) can be a list of argument keys
that are considered as well (ignored).
"""
include = make_list(include) if include else []
skip = make_list(skip) if skip else []
transfer_kwargs = {
name: kwargs.pop(name)
for name in ["cache", "prefer_cache", "retries", "retry_delay"] + include
if name in kwargs and name not in skip
}
return transfer_kwargs, kwargs
def __init__(self, file_interface, validate_copy=False, use_cache=False, cache_config=None,
local_fs=None, **kwargs):
super(RemoteFileSystem, self).__init__(**kwargs)
# store the file interface
self.file_interface = file_interface
# store other configs
self.validate_copy = validate_copy
self.use_cache = use_cache
# set the cache when a cache root is set in the cache_config
if cache_config and cache_config.get("root"):
self.cache = RemoteCache(self, **cache_config)
else:
self.cache = None
# when passed, store a custom local fs on instance level
# otherwise, the class level member is used
if local_fs:
self.local_fs = local_fs
def __del__(self):
# cleanup the cache
if getattr(self, "cache", None):
del self.cache
self.cache = None
def __repr__(self):
return "{}({}, name={}, base={}, {})".format(self.__class__.__name__,
self.file_interface.__class__.__name__, self.name, self.base[0], hex(id(self)))
def _init_configs(self, section, default_fs_option, default_section, init_kwargs):
cfg = Config.instance()
# get the proper section
if not section:
section = cfg.get_expanded("target", default_fs_option)
# try to read it and fill configs to pass to the file system and the remote file interface
fs_config = {}
fi_config = {}
if isinstance(section, six.string_types):
# when set, the section must exist
if not cfg.has_section(section):
raise Exception("law config has no section '{}' to read {} options".format(
section, self.__class__.__name__))
# extend options of sections other than the default one with its values
self._update_section_defaults(default_section, section)
# read the configs from the section for both the file system and remote interface
fs_config = self.parse_config(section)
fi_config = self.file_interface_cls.parse_config(section)
# update both configs with init kwargs
fs_config = merge_dicts(fs_config, init_kwargs, deep=True)
fi_config = merge_dicts(fi_config, init_kwargs, deep=True)
return section, fs_config, fi_config
@property
def base(self):
return self.file_interface.base
def is_local(self, path):
return get_scheme(path) == "file"
def abspath(self, path):
# due to the dynamic definition of remote bases, path is supposed to be already absolute,
# so just handle leading and trailing slashes when there is no scheme scheme
path = str(path)
return ("/" + path.strip("/")) if not get_scheme(path) else path
def uri(self, path, **kwargs):
return self.file_interface.uri(self.abspath(path), **kwargs)
def dirname(self, path):
# forward to local_fs
if self.is_local(path):
return self.local_fs.dirname(path)
return super(RemoteFileSystem, self).dirname(self.abspath(path))
def basename(self, path):
# forward to local_fs
if self.is_local(path):
return self.local_fs.basename(path)
return super(RemoteFileSystem, self).basename(self.abspath(path))
def stat(self, path, **kwargs):
# forward to local_fs
if self.is_local(path):
return self.local_fs.stat(path)
return self.file_interface.stat(self.abspath(path), **kwargs)
[docs] def exists(self, path, stat=False, **kwargs):
# forward to local_fs
if self.is_local(path):
return self.local_fs.exists(path, stat=stat)
return self.file_interface.exists(self.abspath(path), stat=stat, **kwargs)
[docs] def isdir(self, path, rstat=None, **kwargs):
# forward to local_fs
if self.is_local(path):
return self.local_fs.isdir(path)
return self.file_interface.isdir(path, stat=rstat, **kwargs)
def isfile(self, path, rstat=None, **kwargs):
# forward to local_fs
if self.is_local(path):
return self.local_fs.isfile(path)
return self.file_interface.isfile(path, stat=rstat, **kwargs)
def chmod(self, path, perm, **kwargs):
# forward to local_fs
if self.is_local(path):
return self.local_fs.chmod(path, perm)
if not self.has_permissions:
return True
return self.file_interface.chmod(self.abspath(path), perm, **kwargs)
[docs] def remove(self, path, **kwargs):
# forward to local_fs
if self.is_local(path):
return self.local_fs.remove(path)
# protection against removing the base directory of the remote file system
path = self.abspath(path)
if path == "/":
logger.warning("refused request to remove base directory of {!r}".format(self))
return
return self.file_interface.remove(path, **kwargs)
[docs] def mkdir(self, path, perm=None, recursive=True, **kwargs):
# forward to local_fs
if self.is_local(path):
return self.local_fs.mkdir(path, perm=perm, recursive=recursive)
if perm is None:
perm = self.default_dir_perm or 0o0770
func = self.file_interface.mkdir_rec if recursive else self.file_interface.mkdir
x = func(self.abspath(path), perm, **kwargs)
return x
[docs] def listdir(self, path, pattern=None, type=None, **kwargs):
# forward to local_fs
path = str(path)
if self.is_local(path):
return self.local_fs.listdir(path, pattern=pattern, type=type)
elems = self.file_interface.listdir(self.abspath(path), **kwargs)
# apply pattern filter
if pattern is not None:
elems = fnmatch.filter(elems, pattern)
# apply type filter
if type == "f":
elems = [e for e in elems if not self.isdir(os.path.join(path, e), **kwargs)]
elif type == "d":
elems = [e for e in elems if self.isdir(os.path.join(path, e, **kwargs))]
return elems
def walk(self, path, max_depth=-1, **kwargs):
# forward to local_fs
if self.is_local(path):
for obj in self.local_fs.walk(path, max_depth=max_depth):
yield obj
return
# mimic os.walk with a max_depth and yield the current depth
search_dirs = [(self.abspath(path), 0)]
while search_dirs:
(search_dir, depth) = search_dirs.pop(0)
# check depth
if max_depth >= 0 and depth > max_depth:
continue
# find dirs and files
dirs = []
files = []
for elem in self.listdir(search_dir, **kwargs):
if self.isdir(os.path.join(search_dir, elem), **kwargs):
dirs.append(elem)
else:
files.append(elem)
# yield everything
yield (search_dir, dirs, files, depth)
# use dirs to update search dirs
search_dirs.extend((os.path.join(search_dir, d), depth + 1) for d in dirs)
def glob(self, pattern, cwd=None, **kwargs):
# forward to local_fs
pattern = str(pattern)
if self.is_local(pattern):
return self.local_fs.glob(pattern, cwd=cwd)
# helper to check if a string represents a pattern
def is_pattern(s):
return "*" in s or "?" in s
# prepare pattern
if cwd is not None:
pattern = os.path.join(cwd, pattern)
# split the pattern to determine the search path, i.e. the leading part that does not
# contain any glob chars, e.g. "foo/bar/test*/baz*" -> "foo/bar"
search_dir = []
patterns = []
for part in pattern.split("/"):
if not patterns and not is_pattern(part):
search_dir.append(part)
else:
patterns.append(part)
search_dir = self.abspath("/".join(search_dir))
# walk trough the search path and use fnmatch for comparison
elems = []
max_depth = len(patterns) - 1
for root, dirs, files, depth in self.walk(search_dir, max_depth=max_depth, **kwargs):
# get the current pattern
pattern = patterns[depth]
# when we are still below the max depth, filter dirs
# otherwise, filter files and dirs to select
if depth < max_depth:
dirs[:] = fnmatch.filter(dirs, pattern)
else:
elems += [os.path.join(root, e) for e in fnmatch.filter(dirs + files, pattern)]
# cut the cwd if there was any
if cwd is not None:
elems = [os.path.relpath(e, cwd) for e in elems]
return elems
# atomic copy
def _atomic_copy(self, src, dst, perm=None, validate=None, **kwargs):
if validate is None:
validate = self.validate_copy
src = self.abspath(src)
dst = self.abspath(dst)
# actual copy
src_uri, dst_uri = self.file_interface.filecopy(src, dst, **kwargs)
# copy validation
dst_fs = self.local_fs if self.is_local(dst_uri) else self
if validate:
if not dst_fs.exists(dst):
raise Exception("validation failed after copying {} to {}".format(src_uri, dst_uri))
# handle permissions
if perm is None:
perm = dst_fs.default_file_perm
dst_fs.chmod(dst, perm)
return dst_uri
# generic copy with caching ability (local paths must have a "file://" scheme)
def _cached_copy(self, src, dst, perm=None, cache=None, prefer_cache=False, validate=None,
**kwargs):
"""
When this method is called, both *src* and *dst* should refer to files.
"""
if self.cache is None:
cache = False
elif cache is None:
cache = self.use_cache
else:
cache = bool(cache)
# ensure absolute paths
src = self.abspath(src)
dst = dst and self.abspath(dst) or None
# determine the copy mode for code readability
# (remote-remote: "rr", remote-local: "rl", remote-cache: "rc", ...)
src_local = self.is_local(src)
dst_local = dst and self.is_local(dst)
mode = "rl"[src_local] + ("rl"[dst_local] if dst is not None else "c")
# disable caching when the mode is local-local, local-cache or remote-remote
if mode in ("ll", "lc", "rr"):
cache = False
# dst can be None, but in this case, caching should be enabled
if dst is None and not cache:
raise Exception("copy destination must not be empty when caching is disabled")
if not cache:
# simply copy and return the dst path
return self._atomic_copy(src, dst, perm=perm, validate=validate, **kwargs)
kwargs_no_retries = kwargs.copy()
kwargs_no_retries["retries"] = 0
# handle 3 cases: lr, rl, rc
if mode == "lr":
# strategy: copy to remote, copy to cache, sync stats
# copy to remote, no need to validate as we compute the stat anyway
dst_uri = self._atomic_copy(src, dst, perm=perm, validate=False, **kwargs)
rstat = self.stat(dst, **kwargs_no_retries)
# remove the cache entry
if dst in self.cache:
logger.debug("removing destination file {} from cache".format(dst))
self.cache.remove(dst)
# allocate cache space and copy to cache
lstat = self.local_fs.stat(src)
self.cache.allocate(lstat.st_size)
cdst_uri = add_scheme(self.cache.cache_path(dst), "file")
with self.cache.lock(dst):
logger.debug("loading source file {} to cache".format(src))
self._atomic_copy(src, cdst_uri, validate=False)
self.cache.touch(dst, (int(time.time()), rstat.st_mtime))
return dst_uri
else: # rl, rc
# strategy: copy to cache when not up to date, sync stats, opt. copy to local
# build the uri to the cache path of the src file
csrc_uri = add_scheme(self.cache.cache_path(src), "file")
# if the file is cached and prefer_cache is true,
# return the cache path, no questions asked
# otherwise, check if the file is there and up to date
if not prefer_cache or src not in self.cache:
with self.cache.lock(src):
# in cache and outdated?
rstat = self.stat(src, **kwargs_no_retries)
if src in self.cache and not self.cache.check_mtime(src, rstat.st_mtime):
logger.debug("source file {} is outdated in cache, removing".format(src))
self.cache.remove(src, lock=False)
# in cache at all?
if src not in self.cache:
self.cache.allocate(rstat.st_size)
self._atomic_copy(src, csrc_uri, validate=validate, **kwargs)
logger.debug("loading source file {} to cache".format(src))
self.cache.touch(src, (int(time.time()), rstat.st_mtime))
if mode == "rl":
# simply use the local_fs for copying
self.local_fs.copy(csrc_uri, dst, perm=perm)
return dst
# mode is rc
return csrc_uri
def _prepare_dst_dir(self, dst, src=None, perm=None, **kwargs):
"""
Prepares the directory of a target located at *dst* for copying and returns its full
location as specified below. *src* can be the location of a source file target, which is
(e.g.) used by a file copy or move operation. When *dst* is already a directory, calling
this method has no effect and the *dst* path is returned, optionally joined with the
basename of *src*. When *dst* is a file, *dst* path is returned unchanged. Otherwise, when
*dst* does not exist yet, it is interpreted as a file path and missing directories are
created when :py:attr:`create_file_dir` is *True*, using *perm* to set the directory
permission. *dst* is returned.
"""
rstat = self.exists(dst, stat=True)
if rstat:
if self.file_interface.isdir(dst, stat=rstat) and src:
full_dst = os.path.join(dst, os.path.basename(src))
else:
full_dst = dst
else:
# interpret dst as a file name, create missing dirs
dst_dir = self.dirname(dst)
if dst_dir and self.create_file_dir and not self.isdir(dst_dir):
self.mkdir(dst_dir, perm=perm, recursive=True, **kwargs)
full_dst = dst
return full_dst
[docs] def copy(self, src, dst, perm=None, dir_perm=None, **kwargs):
# dst might be an existing directory
if dst:
dst_fs = self.local_fs if self.is_local(dst) else self
dst_fs._prepare_dst_dir(dst, src=src, perm=dir_perm, **kwargs)
# copy the file
return self._cached_copy(src, dst, perm=perm, **kwargs)
[docs] def move(self, src, dst, perm=None, dir_perm=None, **kwargs):
if not dst:
raise Exception("move requires dst to be set")
# copy the file
kwargs["cache"] = False
kwargs.setdefault("validate", True)
dst = self.copy(src, dst, perm=perm, dir_perm=dir_perm, **kwargs)
# remove the src
src_fs = self.local_fs if self.is_local(src) else self
src_fs.remove(src, **kwargs)
return dst
def open(self, path, mode, perm=None, dir_perm=None, cache=None, **kwargs):
if self.cache is None:
cache = False
elif cache is None:
cache = self.use_cache
else:
cache = bool(cache)
yield_path = kwargs.pop("_yield_path", False)
path = self.abspath(path)
tmp = None
read_mode = mode.startswith("r")
if read_mode:
if cache:
lpath = self._cached_copy(path, None, cache=cache, **kwargs)
else:
tmp = LocalFileTarget(is_tmp=self.ext(path, n=0) or True)
lpath = self.copy(path, tmp.uri(), cache=cache, **kwargs)
lpath = remove_scheme(lpath)
def cleanup():
if not cache and tmp and tmp.exists():
tmp.remove()
f = lpath if yield_path else open(lpath, mode)
return RemoteFileProxy(f, success_fn=cleanup, failure_fn=cleanup)
else: # write or update
tmp = LocalFileTarget(is_tmp=self.ext(path, n=0) or True)
lpath = tmp.path
def cleanup():
tmp.remove(silent=True)
def copy_and_cleanup():
exists = True
try:
exists = tmp.exists()
if exists:
self.copy(tmp.uri(), path, perm=perm, dir_perm=dir_perm, cache=cache,
**kwargs)
finally:
if exists:
tmp.remove(silent=True)
f = lpath if yield_path else open(lpath, mode)
return RemoteFileProxy(f, success_fn=copy_and_cleanup, failure_fn=cleanup)
[docs]class RemoteTarget(FileSystemTarget):
fs = None
def __init__(self, path, fs, **kwargs):
if not isinstance(fs, RemoteFileSystem):
raise TypeError("fs must be a {} instance, is {}".format(RemoteFileSystem, fs))
self.fs = fs
super(RemoteTarget, self).__init__(path, **kwargs)
def _parent_args(self):
args, kwargs = super(RemoteTarget, self)._parent_args()
args += (self.fs,)
return args, kwargs
@property
def path(self):
return self._path
@path.setter
def path(self, path):
if os.path.normpath(str(path)).startswith(".."):
raise ValueError("path {} forbidden, surpasses file system root".format(path))
path = self.fs.abspath(path)
super(RemoteTarget, self.__class__).path.fset(self, path)
@property
def abspath(self):
return self.uri()
def uri(self, **kwargs):
return self.fs.uri(self.path, **kwargs)
def copy_to_local(self, dst=None, **kwargs):
if dst:
dst = add_scheme(self.fs.local_fs.abspath(get_path(dst)), "file")
dst = self.copy_to(dst, **kwargs)
return remove_scheme(dst)
def copy_from_local(self, src=None, **kwargs):
src = add_scheme(self.fs.local_fs.abspath(get_path(src)), "file")
return self.copy_from(src, **kwargs)
def move_to_local(self, dst=None, **kwargs):
if dst:
dst = add_scheme(self.fs.local_fs.abspath(get_path(dst)), "file")
dst = self.move_to(dst, **kwargs)
return remove_scheme(dst)
def move_from_local(self, src=None, **kwargs):
src = add_scheme(self.fs.local_fs.abspath(get_path(src)), "file")
return self.move_from(src, **kwargs)
def load(self, *args, **kwargs):
# split kwargs that might be designated for remote files
remote_kwargs, kwargs = self.fs.split_remote_kwargs(kwargs)
# forward to the localized representation
with self.localize(mode="r", **remote_kwargs) as loc:
return loc.load(*args, **kwargs)
def dump(self, *args, **kwargs):
# split kwargs that might be designated for remote files
remote_kwargs, kwargs = self.fs.split_remote_kwargs(kwargs)
# forward to the localized representation
with self.localize(mode="a", **remote_kwargs) as loc:
return loc.dump(*args, **kwargs)
[docs]class RemoteFileTarget(FileSystemFileTarget, RemoteTarget):
@property
def cache_path(self):
if not self.fs.cache:
return None
return self.fs.cache.cache_path(self.path)
@contextmanager
def localize(self, mode="r", perm=None, dir_perm=None, tmp_dir=None, **kwargs):
if mode not in ["r", "w", "a"]:
raise Exception("unknown mode '{}', use 'r', 'w' or 'a'".format(mode))
logger.debug("localizing {!r} with mode '{}'".format(self, mode))
if mode == "r":
with self.fs.open(self.path, "r", _yield_path=True, perm=perm, **kwargs) as lpath:
yield LocalFileTarget(lpath)
else: # mode "w" or "a"
tmp = LocalFileTarget(is_tmp=self.ext(n=1) or True, tmp_dir=tmp_dir)
# copy to local in append mode
if mode == "a" and self.exists():
self.copy_to_local(tmp, **kwargs)
try:
yield tmp
if tmp.exists():
self.copy_from_local(tmp, perm=perm, dir_perm=dir_perm, **kwargs)
else:
logger.warning("cannot move non-existing localized target to actual "
"representation {!r}".format(self))
finally:
tmp.remove()
[docs]class RemoteDirectoryTarget(FileSystemDirectoryTarget, RemoteTarget):
def _child_args(self, path):
args, kwargs = super(RemoteDirectoryTarget, self)._child_args(path)
args += (self.fs,)
return args, kwargs
@contextmanager
def localize(self, mode="r", perm=None, dir_perm=None, tmp_dir=None, **kwargs):
if mode not in ["r", "w", "a"]:
raise Exception("unknown mode '{}', use 'r', 'w' or 'a'".format(mode))
logger.debug("localizing {!r} with mode '{}'".format(self, mode))
if mode == "r":
# create a temporary directory
tmp = LocalDirectoryTarget(is_tmp=True, tmp_dir=tmp_dir)
# copy contents
self.copy_to_local(tmp, **kwargs)
# yield the copy
try:
yield tmp
finally:
tmp.remove(silent=True)
else: # mode "w" or "a"
# create a temporary directory
tmp = LocalDirectoryTarget(is_tmp=True, tmp_dir=tmp_dir)
# copy in append mode, otherwise ensure that it exists
if mode == "a" and self.exists():
self.copy_to_local(tmp)
else:
tmp.touch()
# yield the copy
try:
yield tmp
# move back again, first removing current content
# TODO: keep track of changed contents in "a" mode and copy only those?
if tmp.exists():
self.remove(**kwargs)
self.copy_from_local(tmp, perm=perm, dir_perm=dir_perm, **kwargs)
else:
logger.warning("cannot move non-existing localized target to actual "
"representation {!r}, leaving original contents unchanged".format(self))
finally:
tmp.remove()
RemoteTarget.file_class = RemoteFileTarget
RemoteTarget.directory_class = RemoteDirectoryTarget
class RemoteFileProxy(object):
def __init__(self, f, close_fn=None, success_fn=None, failure_fn=None):
super(RemoteFileProxy, self).__init__()
self.f = f
self.is_file = not isinstance(f, six.string_types)
self.close_fn = close_fn
self.success_fn = success_fn
self.failure_fn = failure_fn
def __call__(self):
return self.f
def __getattr__(self, attr):
return getattr(self.f, attr)
def __enter__(self):
return self.f.__enter__() if self.is_file else self.f
def __exit__(self, exc_type, exc_value, traceback):
# when an exception was raised, the context was not successful
success = exc_type is None
# invoke the exit of the file object
# when its return value is True, it overwrites the success flag
if getattr(self.f, "__exit__", None) is not None:
exit_ret = self.f.__exit__(exc_type, exc_value, traceback)
if exit_ret is True:
success = True
if success:
if callable(self.success_fn):
self.success_fn()
else:
if callable(self.failure_fn):
self.failure_fn()
return success
def close(self, *args, **kwargs):
ret = self.f.close(*args, **kwargs)
if callable(self.close_fn):
self.close_fn()
return ret