Source code for law.target.remote.interface

# coding: utf-8

"""
Interface for communicating with a remote file service.
"""

__all__ = ["RemoteFileInterface"]


import os
import sys
import time
import abc
import functools
import random as _random

import six

from law.config import Config
from law.target.file import remove_scheme
from law.util import make_list, is_lazy_iterable, brace_expand, parse_duration
from law.logger import get_logger


logger = get_logger(__name__)


class RetryException(Exception):

    def __init__(self, msg="", exc=None):
        self.exc_type, self.exc_value, self.exc_traceback = exc or sys.exc_info()
        super(RetryException, self).__init__(msg or str(self.exc_value))

    def reraise(self):
        return six.reraise(self.exc_type, self.exc_value, self.exc_traceback)


[docs]class RemoteFileInterface(six.with_metaclass(abc.ABCMeta, object)): @classmethod def parse_config(cls, section, config=None, overwrite=False): cfg = Config.instance() if config is None: config = {} # helper to add a config value if it exists, extracted with a config parser method def add(option, func): if option not in config or overwrite: config[option] = func(section, option) def get_expanded_list(section, option): # get config value, run brace expansion taking into account csv splitting value = cfg.get_expanded(section, option, None) return value and [v.strip() for v in brace_expand(value.strip(), split_csv=True)] def get_time(section, option): value = cfg.get_expanded(section, option) return parse_duration(value, input_unit="s", unit="s") # base path(s) add("base", get_expanded_list) # base path(s) per operation options = cfg.options(section, prefix="base_") add("bases", lambda *_: { option[5:]: get_expanded_list(section, option) for option in options if not cfg.is_missing_or_none(section, option) }) # default number of retries add("retries", cfg.get_expanded_int) # default delay between retries add("retry_delay", get_time) # default setting for the random base selection add("random_base", cfg.get_expanded_bool) return config @classmethod def retry(cls, func=None, uri_cmd=None, uri_base_name=None): # cmd will be deprecated and fully renamed to base_name, but has priority for now if uri_cmd: logger.warning_once( "deprecate_RemoteFileInterface_retry_uri_cmd", "the argument 'uri_cmd' in {}.retry is deprected, ".format(cls.__name__) + "please use 'uri_base_name' instead", ) uri_base_name = uri_cmd def decorator(func): @functools.wraps(func) def wrapper(self, *args, **kwargs): # function name for logs func_name = func.__name__ # get retry configs with defaults from the interface itself retries = kwargs.pop("retries", self.retries) delay = kwargs.pop("retry_delay", self.retry_delay) random_base = kwargs.pop("random_base", self.random_base) attempt = 0 try: base = None base_set = bool(kwargs.get("base")) skip_indices = [] while True: # when no base was set initially and a uri_base_name is given, get a random # uri base under consideration of bases (given by their indices) to skip if not base_set and uri_base_name: base, idx = self.get_base(base_name=uri_base_name, random=random_base, skip_indices=skip_indices, return_index=True) kwargs["base"] = base skip_indices.append(idx) try: return func(self, *args, **kwargs) except RetryException as e: attempt += 1 # raise to the outer try-except block when there are no attempts left if attempt > retries: e.reraise() # log and sleep logger.debug("{}.{}(args: {}, kwargs: {}) failed: {}, retry".format( self.__class__.__name__, func_name, args, kwargs, e)) time.sleep(delay) except: # at this point, no more retry attempts are available, # so update the exception to reflect that, then reraise e_type, e, traceback = sys.exc_info() msg = str(e) msg += "\nfunction: {}.{}".format(self.__class__.__name__, func_name) msg += "\nattempts: {}".format(attempt) msg += "\nargs : {}".format(args) msg += "\nkwargs : {}".format(kwargs) msg += "\nerror : {}: '{}'".format(e_type.__name__, e) six.reraise(e_type, e_type(msg, *e.args[1:]), traceback) return wrapper return decorator(func) if func else decorator def __init__(self, base=None, bases=None, retries=0, retry_delay=0, random_base=True, **kwargs): super(RemoteFileInterface, self).__init__() # convert base(s) to list for random selection base = make_list(base or []) bases = {k: make_list(b) for k, b in six.iteritems(bases)} if bases else {} # at least one base in expected if len(base) == 0: raise Exception("{} expected at least one base path, received none".format( self.__class__.__name__)) # expand variables in base and bases expand = lambda p: os.path.expandvars(str(p)) self.base = list(map(expand, base)) self.bases = {k: list(map(expand, b)) for k, b in six.iteritems(bases)} # store other attributes self.retries = retries self.retry_delay = retry_delay self.random_base = random_base def sanitize_path(self, p): return str(p) def get_base(self, cmd=None, base_name=None, random=None, skip_indices=None, return_index=False, return_all=False): if random is None: random = self.random_base # cmd will be deprecated and fully renamed to base_name, but has priority for now if cmd: logger.warning_once( "deprecate_RemoteFileInterface_get_base_cmd", "the argument 'cmd' in {}.get_base is deprected, ".format(self.__class__.__name__) + "please use 'base_name' instead", ) base_name = cmd # get potential bases for the given base_name bases = make_list(self.base) if base_name: for base_name in make_list(base_name): if base_name in self.bases: bases = self.bases[base_name] break if not bases: raise Exception("no bases available for command '{}'".format(base_name)) # are there indices to skip? all_bases = bases if skip_indices: _bases = [b for i, b in enumerate(bases) if i not in skip_indices] if _bases: bases = _bases # return all? if return_all: return bases # select one if len(bases) == 1 or not random: # select the first base base = bases[0] else: # select a random base base = _random.choice(bases) return base if not return_index else (base, all_bases.index(base)) def uri(self, path, base=None, return_all=False, scheme=True, **kwargs): # get a base path when not given if not base: kwargs["return_index"] = False base = self.get_base(return_all=return_all, **kwargs) # helper to join the path to some base b and remove the scheme if requested def uri(b): uri = os.path.join(b, self.sanitize_path(path).lstrip("/")).rstrip("/") return uri if scheme else remove_scheme(uri) if isinstance(base, (list, tuple)) or is_lazy_iterable(base): return [uri(b) for b in base] if return_all: return [uri(base)] return uri(base)
[docs] @abc.abstractmethod def exists(self, path, base=None, stat=False, **kwargs): """ Returns *True* when the *path* exists and *False* otherwise. When *stat* is *True*, returns the stat object or *None*. """ return
[docs] @abc.abstractmethod def stat(self, path, base=None, **kwargs): """ Returns a stat object or raises an exception when *path* does not exist. """ return
[docs] @abc.abstractmethod def isdir(self, path, stat=None, base=None, **kwargs): """ Returns *True* when *path* refers to an existing directory, optionally using a precomputed stat object instead, and *False* otherwise. """ return
[docs] @abc.abstractmethod def isfile(self, path, stat=None, base=None, **kwargs): """ Returns *True* when *path* refers to a existing file, optionally using a precomputed stat object instead, and *False* otherwise. """ return
[docs] @abc.abstractmethod def chmod(self, path, perm, base=None, silent=False, **kwargs): """ Changes the permission of a *path* to *perm*. Raises an exception when *path* does not exist or returns *False* when *silent* is *True*, and returns *True* on success. """ return
[docs] @abc.abstractmethod def rmdir(self, path, base=None, silent=True, **kwargs): """ Removes a directory at *path*. Raises an exception when *path* does not exist or returns *False* when *silent* is *True*, and returns *True* on success. """ return
[docs] @abc.abstractmethod def remove(self, path, base=None, silent=True, **kwargs): """ Removes any file or directory at *path*. Directories are removed recursively. Raises an exception when *path* does not exist or returns *False* when *silent* is *True*, and returns *True* on success. """ return
[docs] @abc.abstractmethod def mkdir(self, path, perm, base=None, silent=True, **kwargs): """ Creates a directory at *path* with permissions *perm*. Raises an exception when *path* already exists or returns *False* when *silent* is *True*, and returns *True* on success. """ return
[docs] @abc.abstractmethod def mkdir_rec(self, path, perm, base=None, **kwargs): """ Recursively creates a directory and intermediate missing directories at *path* with permissions *perm*. Raises an exception when *path* already exists or returns *False* when *silent* is *True*, and returns *True* on success. """ return
[docs] @abc.abstractmethod def listdir(self, path, base=None, **kwargs): """ Returns a list of elements in and relative to *path*. """ return
[docs] @abc.abstractmethod def filecopy(self, src, dst, base=None, **kwargs): """ Copies a file from *src* to *dst*. Returns the full, schemed *src* and *dst* URIs used for copying in a 2-tuple. """ return