Source code for law.target.collection

# coding: utf-8

"""
Collections that wrap multiple targets.
"""

__all__ = [
    "TargetCollection", "FileCollection", "SiblingFileCollection", "NestedSiblingFileCollection",
]


import types
import random
from functools import partial
from contextlib import contextmanager
from collections import defaultdict, deque
from multiprocessing.pool import ThreadPool

import six

from law.config import Config
from law.target.base import Target
from law.target.file import FileSystemTarget, FileSystemDirectoryTarget, localize_file_targets
from law.target.mirrored import MirroredTarget, MirroredDirectoryTarget
from law.target.local import LocalDirectoryTarget
from law.util import no_value, colored, flatten, map_struct
from law.logger import get_logger


logger = get_logger(__name__)


[docs]class TargetCollection(Target): """ Collection of arbitrary targets. """ def __init__(self, targets, threshold=1.0, optional_existing=None, remove_threads=None, **kwargs): if isinstance(targets, types.GeneratorType): targets = list(targets) elif not isinstance(targets, (list, tuple, dict)): raise TypeError("invalid targets, must be of type: list, tuple, dict") super(TargetCollection, self).__init__(**kwargs) # default number of threads for removal if remove_threads is None: remove_threads = Config.instance().get_expanded_int("target", "collection_remove_threads") # store attributes self.targets = targets self.threshold = threshold self.optional_existing = optional_existing self.remove_threads = remove_threads # store flat targets per element in the input structure of targets if isinstance(targets, (list, tuple)): gen = (flatten(t) for t in targets) else: # dict gen = ((k, flatten(t)) for k, t in six.iteritems(targets)) self._flat_targets = targets.__class__(gen) # also store an entirely flat list of targets for simplified iterations self._flat_target_list = flatten(targets) def __len__(self): return len(self.targets) def __getitem__(self, key): return self.targets[key] def __iter__(self): # explicitly disable iterability enabled by __getitem__ as per PEP234 # to (e.g.) prevent that flatten() applies to collections raise TypeError("'{}' object is not iterable".format(self.__class__.__name__)) def _copy_kwargs(self): kwargs = super(TargetCollection, self)._copy_kwargs() kwargs["threshold"] = self.threshold kwargs["optional_existing"] = self.optional_existing return kwargs def _repr_pairs(self): pairs = Target._repr_pairs(self) + [("len", len(self))] # add non-default attributes if self.threshold != 1.0: pairs.append(("threshold", self.threshold)) if self.optional_existing is not None: pairs.append(("optional_existing", self.optional_existing)) return pairs def _iter_flat(self): # prepare the generator for looping if isinstance(self._flat_targets, (list, tuple)): gen = enumerate(self._flat_targets) else: # dict gen = six.iteritems(self._flat_targets) # loop and yield for key, targets in gen: yield (key, targets) def _iter_state( self, existing=True, optional_existing=no_value, keys=False, unpack=True, exists_func=None, ): if existing is not None: existing = bool(existing) if optional_existing is no_value: optional_existing = self.optional_existing # helper to check for existence if existing is not None and exists_func is None: def exists_func(t): if optional_existing is not None and t.optional: return bool(optional_existing) if isinstance(t, TargetCollection): return t.exists(optional_existing=optional_existing) return t.exists() # loop and yield for key, targets in self._iter_flat(): if existing is None or all(map(exists_func, targets)) is existing: if unpack: targets = self.targets[key] yield (key, targets) if keys else targets def iter_existing(self, **kwargs): return self._iter_state(existing=True, **kwargs) def iter_missing(self, **kwargs): return self._iter_state(existing=False, **kwargs) def iter_all(self, **kwargs): return self._iter_state(existing=None, **kwargs) def keys(self): if isinstance(self._flat_targets, (list, tuple)): return list(range(len(self))) # dict return list(self._flat_targets.keys()) def uri(self, *args, **kwargs): return flatten(t.uri(*args, **kwargs) for t in self._flat_target_list) @property def first_target(self): if not self._flat_target_list: return None return flatten_collections(self._flat_target_list)[0] def remove(self, silent=True, threads=None, **kwargs): if threads is None: threads = self.remove_threads # atomic removal def remove(target): target.remove(silent=silent, **kwargs) # target generator def target_gen(): for target in self._flat_target_list: yield target # parallel or sequential removal if threads > 0: with ThreadPool(threads) as pool: pool.map(remove, target_gen()) else: for target in target_gen(): remove(target) def _abs_threshold(self): if self.threshold < 0: return 0 if self.threshold <= 1: return len(self) * self.threshold return min(len(self), max(self.threshold, 0.0))
[docs] def complete(self, **kwargs): kwargs["optional_existing"] = True return self.optional or self.exists(**kwargs)
def _exists_fwd(self, **kwargs): fwd = ["optional_existing", "exists_func"] return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs})
[docs] def exists(self, **kwargs): # get the threshold threshold = self._abs_threshold() if threshold == 0: return True # simple counting with early stopping criteria for both success and fail cases n = 0 for i, _ in enumerate(self.iter_existing(**kwargs)): n += 1 # check for early success if n >= threshold: return True # check for early fail if n + (len(self) - i - 1) < threshold: return False return False
def count(self, **kwargs): # simple counting of keys keys = kwargs.get("keys", False) kwargs["keys"] = True target_keys = [key for key, _ in self._iter_state(**kwargs)] n = len(target_keys) return (n, target_keys) if keys else n def random_target(self): if isinstance(self.targets, (list, tuple)): return random.choice(self.targets) # dict return random.choice(list(self.targets.values()))
[docs] def map(self, func): """ Returns a copy of this collection with all targets being transformed by *func*. """ return self.__class__(map_struct(func, self.targets), **self._copy_kwargs())
def status_text(self, max_depth=0, flags=None, color=False, **kwargs): count, existing_keys = self.count(keys=True, **kwargs) exists = count >= self._abs_threshold() if exists: text = "existent" _color = "green" else: text = "absent" _color = "red" if not self.optional else "dark_grey" text = colored(text, _color, style="bright") if color else text text += " ({}/{})".format(count, len(self)) if flags and "missing" in flags and count != len(self): missing_keys = [str(key) for key in self.keys() if key not in existing_keys] text += ", missing branches: " + ",".join(missing_keys) if max_depth > 0: if isinstance(self.targets, (list, tuple)): gen = enumerate(self.targets) else: # dict gen = six.iteritems(self.targets) for key, item in gen: text += "\n{}: ".format(key) if isinstance(item, TargetCollection): t = item.status_text(max_depth=max_depth - 1, color=color, **kwargs) text += "\n ".join(t.split("\n")) elif isinstance(item, Target): t = item.status_text(color=color, exists=key in existing_keys) text += "{} ({})".format(t, item.repr(color=color)) else: t = self.__class__(item).status_text(max_depth=max_depth - 1, color=color) text += "\n ".join(t.split("\n")) return text
class FileCollection(TargetCollection): """ Collection of targets that represent files or other FileCollection's. """ def __init__(self, *args, **kwargs): TargetCollection.__init__(self, *args, **kwargs) # check if all targets are either FileSystemTarget's or FileCollection's for t in self._flat_target_list: if not isinstance(t, (FileSystemTarget, FileCollection)): raise TypeError("FileCollection's only wrap FileSystemTarget's and other " "FileCollection's, got {}".format(t.__class__)) @contextmanager def localize(self, *args, **kwargs): # when localizing collections using temporary files, it makes sense to put # them all in the same temporary directory tmp_dir = kwargs.get("tmp_dir") if not tmp_dir: tmp_dir = LocalDirectoryTarget(is_tmp=True) elif not isinstance(tmp_dir, LocalDirectoryTarget): tmp_dir = LocalDirectoryTarget(str(tmp_dir)) kwargs["tmp_dir"] = tmp_dir # enter localize contexts of all targets with localize_file_targets(self.targets, *args, **kwargs) as localized_targets: # create a copy of this collection that wraps the localized targets yield self.__class__(localized_targets, **self._copy_kwargs()) class SiblingFileCollectionBase(FileCollection): """ Base class for file collections whose elements are located in the same directory (siblings). """ @classmethod def _exists_in_basenames(cls, target, basenames, optional_existing, target_dirs): if optional_existing not in (None, no_value) and target.optional: return optional_existing if isinstance(target, SiblingFileCollectionBase): return target._exists_fwd( basenames=basenames, optional_existing=optional_existing, ) if isinstance(target, TargetCollection): return target.exists(exists_func=partial( cls._exists_in_basenames, basenames=basenames, optional_existing=optional_existing, target_dirs=target_dirs, )) if isinstance(basenames, dict): if target_dirs and target in target_dirs: basenames = basenames[target_dirs[target]] else: # need to find find the collection manually, that could possibly contain the target, # then use its basenames for col_absdir, _basenames in basenames.items(): if _target_path_in_dir(target, col_absdir): basenames = _basenames break else: return False if not basenames: return False return target.basename in basenames def remove(self, silent=True, threads=None, **kwargs): if threads is None: threads = self.remove_threads # atomic removal def remove(target): target.remove(silent=silent, **kwargs) # target generator def target_gen(): for targets in self.iter_existing(unpack=False): for target in targets: yield target # parallel or sequential removal if threads > 0: with ThreadPool(threads) as pool: pool.map(remove, target_gen()) else: for target in target_gen(): remove(target)
[docs]class SiblingFileCollection(SiblingFileCollectionBase): """ Collection of targets that represent files which are all located in the same directory. Specifically, the performance of :py:meth:`exists` and :py:meth:`count` can greatly improve with respect to the standard :py:class:`FileCollection` as the directory listing is used internally. This is especially useful for large collections of remote files. """ @classmethod def from_directory(cls, directory, **kwargs): # dir should be a FileSystemDirectoryTarget or a string, in which case it is interpreted as # a local path if isinstance(directory, FileSystemDirectoryTarget): d = directory elif directory: d = LocalDirectoryTarget(str(directory)) else: raise TypeError("directory must either be a string or a FileSystemDirectoryTarget " "object, got '{}'".format(directory)) # find all files, pass kwargs which may filter the result further kwargs["type"] = "f" basenames = d.listdir(**kwargs) # convert to file targets targets = [d.child(basename, type="f") for basename in basenames] return cls(targets) def __init__(self, *args, **kwargs): SiblingFileCollectionBase.__init__(self, *args, **kwargs) # find the first target and store its directory if self.first_target is None: raise Exception("{} requires at least one file target".format(self.__class__.__name__)) self.dir = self.first_target.parent # check that targets are in fact located in the same directory for t in flatten_collections(self._flat_target_list): if not _target_path_in_dir(t, self.dir): raise Exception("{} is not located in common directory {}".format(t, self.dir)) def _repr_pairs(self): expand = Config.instance().get_expanded_bool("target", "expand_path_repr") dir_path = self.dir.path if expand else self.dir.unexpanded_path return TargetCollection._repr_pairs(self) + [("fs", self.dir.fs.name), ("dir", dir_path)] def _iter_state( self, existing=True, optional_existing=no_value, basenames=None, keys=False, unpack=True, exists_func=None, ): # the directory must exist if not self.dir.exists(): return if existing is not None: existing = bool(existing) if optional_existing is no_value: optional_existing = self.optional_existing # default helper to check for existence if existing is not None and exists_func is None: # get all basenames if basenames is None: basenames = self.dir.listdir() if self.dir.exists() else [] # convert to set for faster lookup basenames = set(basenames) if basenames else set() exists_func = partial( self._exists_in_basenames, basenames=basenames, optional_existing=optional_existing, target_dirs=None, ) # loop and yield for key, targets in self._iter_flat(): if existing is None or all(map(exists_func, targets)) is existing: if unpack: targets = self.targets[key] yield (key, targets) if keys else targets def _exists_fwd(self, **kwargs): fwd = ["optional_existing", "basenames", "exists_func"] return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs})
[docs]class NestedSiblingFileCollection(SiblingFileCollectionBase): """ Collection of targets that represent files which are located across several directories, with files in the same directory being wrapped by a :py:class:`SiblingFileCollection` to exploit its benefit over the standard :py:class:`FileCollection` (see description above). This is especially useful for large collections of remote files that are located in different (sub) directories. The constructor identifies targets located in the same physical directory (identified by URI), creates one collection for each of them, and stores them in the *collections* attribute. Key access, iteration, etc., is identical to the standard :py:class:`FileCollection`. """ def __init__(self, *args, **kwargs): super(NestedSiblingFileCollection, self).__init__(*args, **kwargs) # as per FileCollection's init, targets are already stored in both the _flat_targets and # _flat_target_list attributes, but store them again in sibling file collections to speed up # some methods by grouping them into targets in the same physical directory self.collections = [] self._flat_target_dirs = {} grouped_targets = defaultdict(list) for t in flatten_collections(self._flat_target_list): grouped_targets[t.parent.uri()].append(t) for targets in grouped_targets.values(): # create and store the collection collection = SiblingFileCollection(targets) self.collections.append(collection) # remember the absolute collection dir per target for faster loopups later for t in targets: self._flat_target_dirs[t] = collection.dir.abspath def _repr_pairs(self): return SiblingFileCollectionBase._repr_pairs(self) + [("collections", len(self.collections))] def _iter_state( self, existing=True, optional_existing=no_value, basenames=None, keys=False, unpack=True, exists_func=None, ): if existing is not None: existing = bool(existing) if optional_existing is no_value: optional_existing = self.optional_existing # default helper to check for existence if existing is not None and exists_func is None: # get all basenames if basenames is None: basenames = { col.dir.abspath: (col.dir.listdir() if col.dir.exists() else []) for col in self.collections } # convert to sets for faster lookups basenames = {k: (set(v) if v else set()) for k, v in basenames.items()} exists_func = partial( self._exists_in_basenames, basenames=basenames, optional_existing=optional_existing, target_dirs=self._flat_target_dirs, ) # loop and yield for key, targets in self._iter_flat(): if existing is None or all(map(exists_func, targets)) is existing: if unpack: targets = self.targets[key] yield (key, targets) if keys else targets def _exists_fwd(self, **kwargs): fwd = ["optional_existing", "basenames", "exists_func"] return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs})
def _target_path_in_dir(target, directory): # comparisons of dirnames are transparently possible for most target classes since their # paths are consistent, but implement a custom check for mirrored targets if isinstance(target, str): target_absdir = target else: target_absdir = ( target.remote_target if isinstance(target, MirroredTarget) else target ).absdirname if isinstance(directory, str): dir_abspath = directory else: dir_abspath = ( directory.remote_target if isinstance(directory, MirroredDirectoryTarget) else directory ).abspath # do the comparison return target_absdir == dir_abspath def flatten_collections(*targets): lookup = deque(flatten(targets)) targets = [] while lookup: t = lookup.popleft() if isinstance(t, TargetCollection): lookup.extendleft(t._flat_target_list) else: targets.append(t) return targets