Source code for law.contrib.gfal.target

# coding: utf-8

"""
Implementation of a file interface using GFAL.
"""

from __future__ import annotations

__all__ = ["GFALFileInterface"]

import os
import sys
import pathlib
import contextlib
import stat as _stat

from law.config import Config
from law.target.file import has_scheme, get_scheme
from law.target.remote.interface import RemoteFileInterface, RetryException
from law.logger import get_logger
from law._types import Any, Callable, Iterator, Sequence, TracebackType


logger = get_logger(__name__)


# try to import gfal2
try:
    import gfal2  # type: ignore[import-untyped, import-not-found]
    HAS_GFAL2 = True

except (ImportError, TypeError):
    HAS_GFAL2 = False

    class GFAL2Dummy(object):

        def __getattr__(self, attr):
            raise Exception(f"trying to access 'gfal2.{attr}', but gfal2 is not installed")

    gfal2 = GFAL2Dummy()


[docs] class GFALFileInterface(RemoteFileInterface): @classmethod def parse_config( cls, section: str, config: dict[str, Any] | None = None, *, overwrite: bool = False, ) -> dict[str, Any]: _config = super().parse_config(section, config=config, overwrite=overwrite) cfg = Config.instance() # helper to add a config value if it exists, extracted with a config parser method def add( option: str, func: Callable[[str, str], Any], postfix: str = "gfal_", _config: dict[str, Any] = _config, ) -> None: if option not in _config or overwrite: _config[option] = func(section, postfix + option) # use atomic contexts per operation add("atomic_contexts", cfg.get_expanded_bool) # transfer config _config.setdefault("transfer_config", {}) transfer_specs = [ ("timeout", cfg.get_expanded_int), ("checksum_check", cfg.get_expanded_bool), ("nbstreams", cfg.get_expanded_int), ("overwrite", cfg.get_expanded_bool), ("create_parent", cfg.get_expanded_bool), ("strict_copy", cfg.get_expanded_bool), ] for name, func in transfer_specs: add(name, func, "gfal_transfer_", _config["transfer_config"]) return _config def __init__( self, *, atomic_contexts: bool = False, gfal_options: dict[str, Any] | None = None, transfer_config: dict[str, Any] | None = None, **kwargs, ) -> None: super(GFALFileInterface, self).__init__(**kwargs) # cache for gfal context objects and transfer parameters per pid for thread safety self._contexts: dict[int, gfal2.Gfal2Context] = {} self._transfer_parameters: dict[int, gfal2.TransferParameters] = {} # store gfal options and transfer configs self.gfal_options = gfal_options or {} self.transfer_config = transfer_config or {} # other configs self.atomic_contexts = atomic_contexts @contextlib.contextmanager def context(self) -> Iterator[gfal2.Gfal2Context]: # context objects are stored per pid, so create one if it does not exist yet pid = os.getpid() if pid not in self._contexts: self._contexts[pid] = ctx = gfal2.creat_context() for _type, args_list in self.gfal_options.items(): for args in args_list: getattr(ctx, f"set_opt_{_type}")(*args) # yield and optionally close it which frees potentially open connections try: yield self._contexts[pid] finally: if self.atomic_contexts and pid in self._contexts: del self._contexts[pid] @contextlib.contextmanager def transfer_parameters(self, ctx: gfal2.Gfal2Context) -> Iterator[gfal2.TransferParameters]: pid = os.getpid() if pid not in self._transfer_parameters: self._transfer_parameters[pid] = ctx.transfer_parameters() for key, value in self.transfer_config.items(): setattr(self._transfer_parameters[pid], key, value) try: yield self._transfer_parameters[pid] finally: if self.atomic_contexts and pid in self._transfer_parameters: del self._transfer_parameters[pid]
[docs] def exists( self, path: str | pathlib.Path, *, stat: bool = False, base: str | Sequence[str] | None = None, **kwargs, ) -> bool | os.stat_result | None: uri = self.uri(path, base_name="stat" if stat else ("exists", "stat"), base=base) with self.context() as ctx: try: logger.debug(f"invoking gfal2 exists({uri})") rstat = ctx.stat(uri) return rstat if stat else True except gfal2.GError: return None if stat else False
[docs] @RemoteFileInterface.retry(uri_base_name="stat") def stat( self, path: str | pathlib.Path, *, base: str | Sequence[str] | None = None, **kwargs, ) -> os.stat_result: uri = self.uri(path, base_name="stat", base=base) with self.context() as ctx: try: logger.debug(f"invoking gfal2 stat({uri})") return ctx.stat(uri) except gfal2.GError: raise RetryException()
[docs] def isdir( self, path: str | pathlib.Path, *, stat: os.stat_result | None = None, base: str | Sequence[str] | None = None, **kwargs, ) -> bool: if not stat: stat = self.exists(path, stat=True, base=base) # type: ignore[assignment] if not stat: return False # some file protocols do not return standard st_mode values in stat requests, # e.g. SRM returns file type bits 0o50000 for directories instead of 0o40000, # these differences are rather distinct and can be taken into account here, # see http://man7.org/linux/man-pages/man7/inode.7.html for info on st_mode values return _stat.S_ISDIR(stat.st_mode) or _stat.S_IFMT(stat.st_mode) == 0o50000
[docs] def isfile( self, path: str | pathlib.Path, *, stat: os.stat_result | None = None, base: str | Sequence[str] | None = None, **kwargs, ) -> bool: if not stat: stat = self.exists(path, stat=True, base=base) # type: ignore[assignment] if not stat: return False return not self.isdir(path, stat=stat, base=base)
[docs] @RemoteFileInterface.retry(uri_base_name="chmod") def chmod( self, path: str | pathlib.Path, perm: int, *, base: str | Sequence[str] | None = None, silent: bool = False, **kwargs, ) -> bool: if perm is None: return True uri: str = self.uri(path, base_name="chmod", base=base, return_all=False) # type: ignore[assignment] # noqa with self.context() as ctx: try: logger.debug(f"invoking gfal2 chmod({uri}, {perm})") ctx.chmod(uri, perm) return True except gfal2.GError: e = GFALError_chmod(uri) # check if the operation should be retried, can fail silently, or raised immediately if e.reason == e.UNKNOWN: raise e if e.reason in (e.NOT_FOUND, e.NOT_SUPPORTED) and silent: return False raise e
[docs] @RemoteFileInterface.retry(uri_base_name="rmdir") def rmdir( self, path: str | pathlib.Path, *, base: str | Sequence[str] | None = None, silent: bool = True, **kwargs, ) -> bool: uri: str = self.uri(path, base_name="rmdir", base=base, return_all=False) # type: ignore[assignment] # noqa with self.context() as ctx: try: logger.debug(f"invoking gfal2 rmdir({uri})") ctx.rmdir(uri) return True except gfal2.GError: e = GFALError_rmdir(uri) # check if the operation should be retried, can fail silently, or raised immediately if e.reason == e.UNKNOWN: raise e if e.reason == e.NOT_FOUND and silent: return False raise e
[docs] @RemoteFileInterface.retry(uri_base_name="unlink") def remove( self, path: str | pathlib.Path, *, base: str | Sequence[str] | None = None, silent: bool = True, **kwargs, ) -> bool: """ Recursive removal is potentially expensive in terms of remote file operations, so this method is designed to reduce them as much as possible. """ # most common use case is file removal, so try this first and in case there is an error # interpret its message to get more info on the object without further operations uri: str = self.uri(path, base_name="unlink", base=base, return_all=False) # type: ignore[assignment] # noqa with self.context() as ctx: try: logger.debug(f"invoking gfal2 unlink({uri})") ctx.unlink(uri) return True except gfal2.GError: e = GFALError_unlink(uri) # handle all cases, except when uri is a directory if e.reason != e.IS_DIRECTORY: if e.reason == e.UNKNOWN: raise e if e.reason == e.NOT_FOUND and silent: return False raise e # at this point, we are dealing with a directory so try to delete it right away uri = self.uri(path, base_name="rmdir", base=base, return_all=False) # type: ignore[assignment] # noqa with self.context() as ctx: try: logger.debug(f"invoking gfal2 rmdir({uri})") ctx.rmdir(uri) return True except gfal2.GError: e2 = GFALError_rmdir(uri) # handle all cases, except when the directory is not empty if e2.reason != e2.NOT_EMPTY: if e2.reason == e2.UNKNOWN: raise e if e2.reason == e2.NOT_FOUND and silent: return False raise e2 # the directory is not empty, so there is no other way than deleting contents recursively # first, and then removing the directory itself path = str(path) for elem in self.listdir(path, base=base, retries=0): self.remove(os.path.join(path, elem), base=base, silent=silent, retries=0) return self.rmdir(path, base=base, silent=silent, retries=0)
[docs] @RemoteFileInterface.retry(uri_base_name="mkdir") def mkdir( self, path: str | pathlib.Path, perm: int, *, base: str | Sequence[str] | None = None, silent: bool = True, **kwargs, ) -> bool: uri: str = self.uri(path, base_name="mkdir", base=base, return_all=False) # type: ignore[assignment] # noqa with self.context() as ctx: try: logger.debug(f"invoking gfal2 mkdir({uri}, {perm})") ctx.mkdir(uri, perm) return True except gfal2.GError: e = GFALError_mkdir(uri) # check if the operation should be retried, can fail silently, or raised immediately if e.reason == e.UNKNOWN: raise e if e.reason == e.EXISTS and silent: # fail silently only when uri is really a dictionary if self.isdir(path, base=base): return False raise e
[docs] @RemoteFileInterface.retry(uri_base_name=["mkdir_rec", "mkdir"]) def mkdir_rec( self, path: str | pathlib.Path, perm: int, *, base: str | Sequence[str] | None = None, silent: bool = True, **kwargs, ) -> bool: uri: str = self.uri(path, base_name=["mkdir_rec", "mkdir"], base=base, return_all=False) # type: ignore[assignment] # noqa with self.context() as ctx: try: logger.debug(f"invoking gfal2 mkdir_rec({uri}, {perm})") ctx.mkdir_rec(uri, perm) return True except gfal2.GError: e = GFALError_mkdir(uri) # check if the operation should be retried, can fail silently, or raised immediately if e.reason == e.UNKNOWN: raise e if e.reason == e.EXISTS and silent: # fail silently only when uri is really a dictionary if self.isdir(path, base=base): return False raise e
[docs] @RemoteFileInterface.retry(uri_base_name="listdir") def listdir( self, path: str | pathlib.Path, *, base: str | Sequence[str] | None = None, **kwargs, ) -> list[str]: uri: str = self.uri(path, base_name="listdir", base=base, return_all=False) # type: ignore[assignment] # noqa with self.context() as ctx: try: logger.debug(f"invoking gfal2 listdir({uri})") return ctx.listdir(uri) except gfal2.GError: e = GFALError_listdir(uri) # some protocols throw an error upon listdir on empty directories if e.EMPTY: return [] raise e
[docs] @RemoteFileInterface.retry(uri_base_name="filecopy") def filecopy( self, src: str | pathlib.Path, dst: str | pathlib.Path, base: str | Sequence[str] | None = None, **kwargs, ) -> tuple[str, str]: if has_scheme(src): src_uri = self.sanitize_path(src) else: src_uri = self.uri(src, base_name="filecopy", base=base, return_all=False) # type: ignore[assignment] # noqa if has_scheme(dst): dst_uri = self.sanitize_path(dst) else: dst_uri = self.uri(dst, base_name="filecopy", base=base, return_all=False) # type: ignore[assignment] # noqa with self.context() as ctx, self.transfer_parameters(ctx) as params: try: logger.debug(f"invoking gfal2 filecopy({src_uri}, {dst_uri})") ctx.filecopy(params, src_uri, dst_uri) except gfal2.GError: e = GFALError_filecopy(src_uri, dst_uri) # check if the operation should be retried or raised immediately if e.reason == e.UNKNOWN: raise e raise e return src_uri, dst_uri
class GFALOperationError(RetryException): UNKNOWN = "unknown reason" def __init__( self, uri: str | pathlib.Path, exc: tuple[type, BaseException, TracebackType] | None = None, ) -> None: # store uri and scheme self.uri = str(uri) self.scheme = get_scheme(uri) or "file" # get the original error objects and find the error reason _exc = sys.exc_info() if exc is None else exc self.reason = self._get_reason(str(_exc[1]), self.uri, self.scheme) # add the error reason to the message msg = f"{_exc[1]} ({self.__class__.__name__}: {self.reason})" super().__init__(msg=msg, exc=exc) @classmethod def _get_reason(cls, msg: str, uri: str, scheme: str) -> str: raise NotImplementedError() class GFALError_chmod(GFALOperationError): NOT_FOUND = "no such file or directory" NOT_SUPPORTED = "chmod operation not supported" @classmethod def _get_reason(cls, msg: str, uri: str, scheme: str) -> str: lmsg = msg.lower() if scheme == "root": if "no such file or directory" in lmsg: return cls.NOT_FOUND elif scheme == "gsiftp": if "no such file or directory" in lmsg: return cls.NOT_FOUND elif scheme == "srm": if "no such file or directory" in lmsg: return cls.NOT_FOUND if "operation not supported" in lmsg: return cls.NOT_SUPPORTED elif scheme in ("dav", "davs"): if "protocol not supported" in lmsg: return cls.NOT_SUPPORTED elif scheme == "dropbox": if "protocol not supported" in lmsg: return cls.NOT_SUPPORTED else: logger.warning(f"scheme '{scheme}' not known to {cls.__name__}, cannot parse '{msg}'") return cls.UNKNOWN class GFALError_unlink(GFALOperationError): NOT_FOUND = "target not found" IS_DIRECTORY = "target is a directory" @classmethod def _get_reason(cls, msg: str, uri: str, scheme: str) -> str: lmsg = msg.lower() if scheme == "root": if "no such file or directory" in lmsg: return cls.NOT_FOUND if "is a directory" in lmsg: return cls.IS_DIRECTORY elif scheme == "gsiftp": if "no such file or directory" in lmsg: return cls.NOT_FOUND if "not a file" in lmsg: return cls.IS_DIRECTORY elif scheme == "srm": if "no such file" in lmsg: return cls.NOT_FOUND if "not a file" in lmsg: return cls.IS_DIRECTORY elif scheme in ("dav", "davs"): if "file not found" in lmsg: return cls.NOT_FOUND if "is a directory" in lmsg: return cls.IS_DIRECTORY elif scheme == "dropbox": if "not_found" in lmsg: return cls.NOT_FOUND else: logger.warning(f"scheme '{scheme}' not known to {cls.__name__}, cannot parse '{msg}'") return cls.UNKNOWN class GFALError_rmdir(GFALOperationError): NOT_FOUND = "target not found" IS_FILE = "target is a file" NOT_EMPTY = "directory is not empty" @classmethod def _get_reason(cls, msg: str, uri: str, scheme: str) -> str: lmsg = msg.lower() if scheme == "root": if "no such file or directory" in lmsg: return cls.NOT_FOUND if "not a directory" in lmsg: return cls.IS_FILE if "no such device" in lmsg: # cryptic message for non-empty directory return cls.NOT_EMPTY elif scheme == "gsiftp": if "no such file or directory" in lmsg: return cls.NOT_FOUND if "not a directory" in lmsg: return cls.IS_FILE if "directory is not empty" in lmsg: return cls.NOT_EMPTY elif scheme == "srm": if "no such file or directory" in lmsg: return cls.NOT_FOUND if "this file is not a directory" in lmsg: return cls.IS_FILE if "directory not empty" in lmsg: return cls.NOT_EMPTY elif scheme in ("dav", "davs"): if "file not found" in lmsg: return cls.NOT_FOUND elif scheme == "dropbox": if "not_found" in lmsg: return cls.NOT_FOUND else: logger.warning(f"scheme '{scheme}' not known to {cls.__name__}, cannot parse '{msg}'") return cls.UNKNOWN class GFALError_mkdir(GFALOperationError): EXISTS = "target already exists" @classmethod def _get_reason(cls, msg: str, uri: str, scheme: str) -> str: lmsg = msg.lower() if scheme == "root": if "file exists" in lmsg: return cls.EXISTS elif scheme == "gsiftp": if "directory already exists" in lmsg: return cls.EXISTS elif scheme == "srm": if "directory already exist" in lmsg: return cls.EXISTS elif scheme == "dropbox": if "the directory already exists" in lmsg: return cls.EXISTS else: logger.warning(f"scheme '{scheme}' not known to {cls.__name__}, cannot parse '{msg}'") return cls.UNKNOWN class GFALError_listdir(GFALOperationError): EMPTY = "directory is empty" @classmethod def _get_reason(cls, msg: str, uri: str, scheme: str) -> str: lmsg = msg.lower() if scheme == "root": # xrootd throws an expcetion when a directory is empty if lmsg.strip().endswith("invalid response (unknown error 303)"): return cls.EMPTY else: logger.warning(f"scheme '{scheme}' not known to {cls.__name__}, cannot parse '{msg}'") return cls.UNKNOWN class GFALError_filecopy(GFALOperationError): SRC_NOT_FOUND = "source not found" DST_EXISTS = "target already exists" def __init__( self, src_uri: str, dst_uri: str, exc=None, ) -> None: # store uri and scheme self.src_uri = str(src_uri) self.dst_uri = str(dst_uri) self.src_scheme = get_scheme(src_uri) or "file" self.dst_scheme = get_scheme(dst_uri) or "file" # get the original error objects and find the error reason _exc = sys.exc_info() if exc is None else exc self.reason = self._get_reason( str(_exc[1]), self.src_uri, self.dst_uri, self.src_scheme, self.dst_scheme, ) # add the error reason to the message msg = f"{_exc[1]} ({self.__class__.__name__}: {self.reason})" # bypass the GFALOperationError init RetryException.__init__(self, msg=msg, exc=exc) @classmethod def _get_reason( # type: ignore[override] cls, msg: str, src_uri: str, dst_uri: str, src_scheme: str, dst_scheme: str, ) -> str: # in gfal, error messages on missing source files or existing target files depend on both # source and destination protocols, so all cases need to be handled separately lmsg = msg.lower() if (src_scheme, dst_scheme) == ("file", "file"): if "could not open source" in lmsg: return cls.SRC_NOT_FOUND if "the file exists" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) == ("file", "root"): if "no such file or directory (source)" in lmsg: return cls.SRC_NOT_FOUND if "file exists (destination)" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) == ("file", "gsiftp"): if "local system call no such file or directory" in lmsg: return cls.SRC_NOT_FOUND if "file exists" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) == ("file", "srm"): if "local system call no such file or directory" in lmsg: return cls.SRC_NOT_FOUND if "file exists" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) in (("file", "dav"), ("file", "davs")): if "local system call no such file or directory" in lmsg: return cls.SRC_NOT_FOUND elif (src_scheme, dst_scheme) == ("file", "dropbox"): if "could not open source" in lmsg: return cls.SRC_NOT_FOUND if "the file exists" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) == ("gsiftp", "file"): if "no such file or directory on url" in lmsg: return cls.SRC_NOT_FOUND if "the file exists" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) == ("gsiftp", "gsiftp"): if "file not found" in lmsg: return cls.SRC_NOT_FOUND if "destination already exist" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) in (("gsiftp", "dav"), ("gsiftp", "davs")): if "is a directory" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) == ("srm", "file"): if "no such file" in lmsg: return cls.SRC_NOT_FOUND if "the file exists" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) == ("srm", "root"): if "no such file" in lmsg: return cls.SRC_NOT_FOUND if "the file exists" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) == ("srm", "srm"): if "no such file" in lmsg: return cls.SRC_NOT_FOUND if "file exists" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) in (("srm", "dav"), ("srm", "davs")): if "no such file" in lmsg: return cls.SRC_NOT_FOUND if "file exists" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) == ("root", "file"): if "no such file or directory" in lmsg: return cls.SRC_NOT_FOUND if "file exists (destination)" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) == ("root", "srm"): if "no such file or directory" in lmsg: return cls.SRC_NOT_FOUND if "file exists" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) == ("root", "root"): if "destination does not support delegation." in lmsg: return cls.SRC_NOT_FOUND if "file exists" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) in (("root", "dav"), ("root", "davs")): if "failed to open file (block device required)" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) in (("dav", "file"), ("davs", "file")): if "file not found" in lmsg: return cls.SRC_NOT_FOUND elif (src_scheme, dst_scheme) in (("dav", "gsiftp"), ("davs", "gsiftp")): if "file not found" in lmsg: return cls.SRC_NOT_FOUND if "not a file" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) in (("dav", "root"), ("davs", "root")): # it appears that there is a bug in gfal when copying via davix to xrootd in that # the full dst path is repeated, e.g. "root://url.tld:1090/pnfs/.../root://url..." # which causes weird behavior, and as long as this issue persists, there should be no # error parsing in law pass elif (src_scheme, dst_scheme) in (("dav", "srm"), ("davs", "srm")): # same issue as for davix -> xrootd, wait until this is resolved pass elif (src_scheme, dst_scheme) == ("dropbox", "file"): if "could not open source" in lmsg: return cls.SRC_NOT_FOUND if "the file exists" in lmsg: return cls.DST_EXISTS elif (src_scheme, dst_scheme) == ("dropbox", "dropbox"): if "could not open source" in lmsg: return cls.SRC_NOT_FOUND if "the file exists" in lmsg: return cls.DST_EXISTS else: logger.warning( f"combination of source scheme '{src_scheme}' and target scheme '{dst_scheme}' not " f"known to {cls.__name__}, cannot parse '{msg}'", ) return cls.UNKNOWN