Source code for law.target.remote.cache

# coding: utf-8

"""
Cache for remote files on local disk.
"""

__all__ = ["RemoteCache"]


import os
import shutil
import time
import tempfile
import weakref
import atexit
from contextlib import contextmanager

from law.config import Config
from law.util import (
    makedirs, human_bytes, parse_bytes, parse_duration, create_hash, user_owns_file, io_lock,
)
from law.logger import get_logger


logger = get_logger(__name__)


[docs]class RemoteCache(object): TMP = "__TMP__" lock_postfix = ".lock" _instances = [] def __new__(cls, *args, **kwargs): inst = object.__new__(cls) # cache instances cls._instances.append(inst) return inst @classmethod def cleanup_all(cls): # clear all caches for inst in cls._instances: try: inst._cleanup() except: pass @classmethod def parse_config(cls, section, config=None, overwrite=False): from law.sandbox.base import _sandbox_switched # reads a law config section and returns parsed file system configs cfg = Config.instance() if config is None: config = {} # helper to add a config value if it exists, extracted with a config parser method def add(option, func): cache_option = "cache_" + option if cfg.is_missing_or_none(section, cache_option): return elif option not in config or overwrite: config[option] = func(section, cache_option) def get_size(section, cache_option): value = cfg.get_expanded(section, cache_option) return parse_bytes(value, input_unit="MB", unit="MB") def get_time(section, cache_option): value = cfg.get_expanded(section, cache_option) return parse_duration(value, input_unit="s", unit="s") add("root", cfg.get_expanded) add("cleanup", cfg.get_expanded_bool) add("max_size", get_size) add("mtime_patience", cfg.get_expanded_float) add("file_perm", cfg.get_expanded_int) add("dir_perm", cfg.get_expanded_int) add("wait_delay", get_time) add("max_waits", cfg.get_expanded_int) add("global_lock", cfg.get_expanded_bool) # inside sandboxes, never cleanup since the outer process will do that if needed if _sandbox_switched: config["cleanup"] = False return config def __init__(self, fs, root=TMP, cleanup=False, max_size=0, mtime_patience=1.0, file_perm=0o0660, dir_perm=0o0770, wait_delay=5.0, max_waits=120, global_lock=False): object.__init__(self) # max_size is in MB, wait_delay is in seconds # create a unique name based on fs attributes name = "{}_{}".format(fs.__class__.__name__, create_hash(fs.base[0])) # create the root dir, handle tmp root = os.path.expandvars(os.path.expanduser(str(root))) or self.TMP if not os.path.exists(root) and root == self.TMP: cfg = Config.instance() tmp_dir = cfg.get_expanded("target", "tmp_dir") base = tempfile.mkdtemp(dir=tmp_dir) cleanup = True else: base = os.path.join(root, name) makedirs(base, dir_perm) # save attributes and configs self.root = root self.fs_ref = weakref.ref(fs) self.base = base self.name = name self.cleanup = cleanup self.max_size = max_size self.mtime_patience = mtime_patience self.dir_perm = dir_perm self.file_perm = file_perm self.wait_delay = wait_delay self.max_waits = max_waits self.global_lock = global_lock # path to the global lock file which should guard global actions such as cache allocations self._global_lock_path = self._lock_path(os.path.join(base, "global")) # currently locked cache paths, only used to clean up broken files during cleanup self._locked_cpaths = set() logger.debug("created {} at '{}'".format(self.__class__.__name__, self.base)) def __del__(self): try: self._cleanup() except (OSError, TypeError): pass def __repr__(self): return "<{} '{}' at {}>".format(self.__class__.__name__, self.base, hex(id(self))) def __contains__(self, rpath): return os.path.exists(self.cache_path(rpath)) @property def fs(self): return self.fs_ref() def _cleanup(self): # full cleanup or remove open locks if getattr(self, "cleanup", False): if os.path.exists(self.base): shutil.rmtree(self.base) else: for cpath in set(self._locked_cpaths): self._unlock(cpath) self._remove(cpath) self._locked_cpaths.clear() self._unlock_global() logger.debug("cleanup RemoteCache at '{}'".format(self.base)) def cache_path(self, rpath): rpath = str(rpath) basename = "{}_{}".format(create_hash(rpath), os.path.basename(rpath)) return os.path.join(self.base, basename) def _lock_path(self, cpath): return str(cpath) + self.lock_postfix def is_locked_global(self): return os.path.exists(self._global_lock_path) def _is_locked(self, cpath): return os.path.exists(self._lock_path(cpath)) def is_locked(self, rpath): return self._is_locked(self.cache_path(rpath)) def _unlock_global(self): try: os.remove(self._global_lock_path) except OSError: pass def _unlock(self, cpath): try: os.remove(self._lock_path(cpath)) except OSError: pass def _await_global(self, delay=None, max_waits=None, silent=False): delay = delay if delay is not None else self.wait_delay max_waits = max_waits if max_waits is not None else self.max_waits _max_waits = max_waits while self.is_locked_global(): if max_waits <= 0: if not silent: raise Exception("max_waits of {} exceeded while waiting for global lock".format( _max_waits)) return False time.sleep(delay) max_waits -= 1 return True def _await(self, cpath, delay=None, max_waits=None, silent=False, global_lock=None): cpath = str(cpath) delay = delay if delay is not None else self.wait_delay max_waits = max_waits if max_waits is not None else self.max_waits _max_waits = max_waits global_lock = self.global_lock if global_lock is None else global_lock # strategy: wait as long the file is locked and if the file size did not change, reduce # max_waits per iteration and raise when 0 is reached last_size = -1 while self._is_locked(cpath) or (global_lock and self.is_locked_global()): if max_waits <= 0: if not silent: raise Exception("max_waits of {} exceeded while waiting for file '{}'".format( _max_waits, cpath)) return False time.sleep(delay) # only reduce max_waits when the file size did not change # otherwise, set it to its original value again if os.path.exists(cpath): size = os.stat(cpath).st_size if size != last_size: last_size = size max_waits = _max_waits + 1 max_waits -= 1 return True @contextmanager def _lock_global(self, **kwargs): self._await_global(**kwargs) try: with io_lock: with open(self._global_lock_path, "w") as f: f.write("") os.utime(self._global_lock_path, None) yield finally: self._unlock_global() @contextmanager def _lock(self, cpath, **kwargs): cpath = str(cpath) lock_path = self._lock_path(cpath) self._await(cpath, **kwargs) try: with io_lock: with open(lock_path, "w") as f: f.write("") self._locked_cpaths.add(cpath) try: os.utime(lock_path, None) except OSError: pass yield except: # when something went really wrong, conservatively delete the cached file self._remove(cpath, lock=False) raise finally: # unlock again self._unlock(cpath) if cpath in self._locked_cpaths: self._locked_cpaths.remove(cpath) def lock(self, rpath): return self._lock(self.cache_path(rpath)) def allocate(self, size): logger.debug("allocating {0[0]:.2f} {0[1]} in cache '{1}'".format(human_bytes(size), self)) # determine stats and current cache size file_stats = [] for elem in os.listdir(self.base): if elem.endswith(self.lock_postfix): continue cpath = os.path.join(self.base, elem) file_stats.append((cpath, os.stat(cpath))) current_size = sum(stat.st_size for _, stat in file_stats) # get the available space of the disk that contains the cache in bytes, leave 10% fs_stat = os.statvfs(self.base) free_size = fs_stat.f_frsize * fs_stat.f_bavail * 0.9 # determine the maximum size of the cache # make sure it is always smaller than what is available if self.max_size <= 0: max_size = current_size + free_size else: max_size = min(self.max_size * 1024**2, current_size + free_size) # determine the size of files that need to be deleted delete_size = current_size + size - max_size if delete_size <= 0: logger.debug("cache space sufficient, {0[0]:.2f} {0[1]} remaining".format( human_bytes(-delete_size))) return True logger.info("need to delete {0[0]:.2f} {0[1]} from cache".format( human_bytes(delete_size))) # delete files, ordered by their access time, skip locked ones for cpath, cstat in sorted(file_stats, key=lambda tpl: tpl[1].st_atime): if self._is_locked(cpath): continue self._remove(cpath) delete_size -= cstat.st_size if delete_size <= 0: return True logger.warning("could not allocate remaining {0[0]:.2f} {0[1]} in cache".format( human_bytes(delete_size))) return False def _touch(self, cpath, times=None): cpath = str(cpath) if os.path.exists(cpath): if user_owns_file(cpath): os.chmod(cpath, self.file_perm) os.utime(cpath, times) def touch(self, rpath, times=None): self._touch(self.cache_path(rpath), times=times) def _mtime(self, cpath): return os.stat(str(cpath)).st_mtime def mtime(self, rpath): return self._mtime(self.cache_path(rpath)) def check_mtime(self, rpath, rmtime): if self.mtime_patience < 0: return True return abs(self.mtime(rpath) - rmtime) <= self.mtime_patience def _remove(self, cpath, lock=True): def remove(): try: os.remove(str(cpath)) except OSError: pass if lock: with self._lock(cpath): remove() else: remove() def remove(self, rpath, lock=True): return self._remove(self.cache_path(rpath), lock=lock)
atexit.register(RemoteCache.cleanup_all)