# coding: utf-8
"""
Collections that wrap multiple targets.
"""
__all__ = [
"TargetCollection", "FileCollection", "SiblingFileCollection", "NestedSiblingFileCollection",
]
import types
import random
from abc import abstractmethod
from contextlib import contextmanager
import six
from law.config import Config
from law.target.base import Target
from law.target.file import FileSystemTarget, FileSystemDirectoryTarget, localize_file_targets
from law.target.local import LocalDirectoryTarget
from law.util import colored, flatten, map_struct
from law.logger import get_logger
logger = get_logger(__name__)
[docs]class TargetCollection(Target):
"""
Collection of arbitrary targets.
"""
def __init__(self, targets, threshold=1.0, **kwargs):
if isinstance(targets, types.GeneratorType):
targets = list(targets)
elif not isinstance(targets, (list, tuple, dict)):
raise TypeError("invalid targets, must be of type: list, tuple, dict")
super(TargetCollection, self).__init__(**kwargs)
# store targets and threshold
self.targets = targets
self.threshold = threshold
# store flat targets per element in the input structure of targets
if isinstance(targets, (list, tuple)):
gen = (flatten(t) for t in targets)
else: # dict
gen = ((k, flatten(t)) for k, t in six.iteritems(targets))
self._flat_targets = targets.__class__(gen)
# also store an entirely flat list of targets for simplified iterations
self._flat_target_list = flatten(targets)
def __len__(self):
return len(self.targets)
def __getitem__(self, key):
return self.targets[key]
def __iter__(self):
# explicitly disable iterability enabled by __getitem__ as per PEP234
# to (e.g.) prevent that flatten() applies to collections
raise TypeError("'{}' object is not iterable".format(self.__class__.__name__))
def _copy_kwargs(self):
kwargs = super(TargetCollection, self)._copy_kwargs()
kwargs["threshold"] = self.threshold
return kwargs
def _repr_pairs(self):
return Target._repr_pairs(self) + [("len", len(self)), ("threshold", self.threshold)]
def _iter_flat(self):
# prepare the generator for looping
if isinstance(self._flat_targets, (list, tuple)):
gen = enumerate(self._flat_targets)
else: # dict
gen = six.iteritems(self._flat_targets)
# loop and yield
for key, targets in gen:
yield (key, targets)
def _iter_state(self, existing=True, optional_existing=None, keys=False, unpack=True):
existing = bool(existing)
if optional_existing is not None:
optional_existing = bool(optional_existing)
# helper to check for existence
def exists(t):
if optional_existing is not None and t.optional:
return optional_existing
if isinstance(t, TargetCollection):
return t.exists(optional_existing=optional_existing)
return t.exists()
# loop and yield
for key, targets in self._iter_flat():
state = all(exists(t) for t in targets)
if state is existing:
if unpack:
targets = self.targets[key]
yield (key, targets) if keys else targets
def iter_existing(self, **kwargs):
return self._iter_state(existing=True, **kwargs)
def iter_missing(self, **kwargs):
return self._iter_state(existing=False, **kwargs)
def keys(self):
if isinstance(self._flat_targets, (list, tuple)):
return list(range(len(self)))
# dict
return list(self._flat_targets.keys())
def uri(self, *args, **kwargs):
return flatten(t.uri(*args, **kwargs) for t in self._flat_target_list)
@property
def first_target(self):
if not self._flat_target_list:
return None
return flatten_collections(self._flat_target_list)[0]
def remove(self, silent=True):
for t in self._flat_target_list:
if silent:
t.remove(silent=True)
elif t.exists():
t.remove()
def _abs_threshold(self):
if self.threshold < 0:
return 0
if self.threshold <= 1:
return len(self) * self.threshold
return min(len(self), max(self.threshold, 0.0))
[docs] def complete(self, **kwargs):
kwargs["optional_existing"] = True
return self.optional or self.exists(**kwargs)
def _exists_fwd(self, **kwargs):
fwd = ["optional_existing"]
return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs})
[docs] def exists(self, **kwargs):
# get the threshold
threshold = self._abs_threshold()
if threshold == 0:
return True
# simple counting with early stopping criteria for both success and fail cases
n = 0
for i, targets in enumerate(self.iter_existing(**kwargs)):
n += 1
# check for early success
if n >= threshold:
return True
# check for early fail
if n + (len(self) - i - 1) < threshold:
return False
return False
def count(self, **kwargs):
# simple counting of keys
keys = kwargs.get("keys", False)
kwargs["keys"] = True
target_keys = [key for key, _ in self._iter_state(**kwargs)]
n = len(target_keys)
return n if not keys else (n, target_keys)
def random_target(self):
if isinstance(self.targets, (list, tuple)):
return random.choice(self.targets)
# dict
return random.choice(list(self.targets.values()))
[docs] def map(self, func):
"""
Returns a copy of this collection with all targets being transformed by *func*.
"""
return self.__class__(map_struct(func, self.targets), **self._copy_kwargs())
def status_text(self, max_depth=0, flags=None, color=False, exists=None):
count, existing_keys = self.count(keys=True)
exists = count >= self._abs_threshold()
if exists:
text = "existent"
_color = "green"
else:
text = "absent"
_color = "red" if not self.optional else "dark_grey"
text = colored(text, _color, style="bright") if color else text
text += " ({}/{})".format(count, len(self))
if flags and "missing" in flags and count != len(self):
missing_keys = [str(key) for key in self.keys() if key not in existing_keys]
text += ", missing branches: " + ",".join(missing_keys)
if max_depth > 0:
if isinstance(self.targets, (list, tuple)):
gen = enumerate(self.targets)
else: # dict
gen = six.iteritems(self.targets)
for key, item in gen:
text += "\n{}: ".format(key)
if isinstance(item, TargetCollection):
t = item.status_text(max_depth=max_depth - 1, color=color)
text += "\n ".join(t.split("\n"))
elif isinstance(item, Target):
t = item.status_text(color=color, exists=key in existing_keys)
text += "{} ({})".format(t, item.repr(color=color))
else:
t = self.__class__(item).status_text(max_depth=max_depth - 1, color=color)
text += "\n ".join(t.split("\n"))
return text
class FileCollection(TargetCollection):
"""
Collection of targets that represent files or other FileCollection's.
"""
def __init__(self, *args, **kwargs):
TargetCollection.__init__(self, *args, **kwargs)
# check if all targets are either FileSystemTarget's or FileCollection's
for t in self._flat_target_list:
if not isinstance(t, (FileSystemTarget, FileCollection)):
raise TypeError("FileCollection's only wrap FileSystemTarget's and other "
"FileCollection's, got {}".format(t.__class__))
@contextmanager
def localize(self, *args, **kwargs):
# when localizing collections using temporary files, it makes sense to put
# them all in the same temporary directory
tmp_dir = kwargs.get("tmp_dir")
if not tmp_dir:
tmp_dir = LocalDirectoryTarget(is_tmp=True)
elif not isinstance(tmp_dir, LocalDirectoryTarget):
tmp_dir = LocalDirectoryTarget(str(tmp_dir))
kwargs["tmp_dir"] = tmp_dir
# enter localize contexts of all targets
with localize_file_targets(self.targets, *args, **kwargs) as localized_targets:
# create a copy of this collection that wraps the localized targets
yield self.__class__(localized_targets, **self._copy_kwargs())
class SiblingFileCollectionBase(FileCollection):
"""
Base class for file collections whose elements are located in the same directory (siblings).
"""
def remove(self, silent=True):
for targets in self.iter_existing(unpack=False):
for t in targets:
t.remove(silent=silent)
@abstractmethod
def _exists_fwd(self, **kwargs):
return
[docs]class SiblingFileCollection(SiblingFileCollectionBase):
"""
Collection of targets that represent files which are all located in the same directory.
Specifically, the performance of :py:meth:`exists` and :py:meth:`count` can greatly improve with
respect to the standard :py:class:`FileCollection` as the directory listing is used internally.
This is especially useful for large collections of remote files.
"""
@classmethod
def from_directory(cls, directory, **kwargs):
# dir should be a FileSystemDirectoryTarget or a string, in which case it is interpreted as
# a local path
if isinstance(directory, FileSystemDirectoryTarget):
d = directory
elif directory:
d = LocalDirectoryTarget(str(directory))
else:
raise TypeError("directory must either be a string or a FileSystemDirectoryTarget "
"object, got '{}'".format(directory))
# find all files, pass kwargs which may filter the result further
kwargs["type"] = "f"
basenames = d.listdir(**kwargs)
# convert to file targets
targets = [d.child(basename, type="f") for basename in basenames]
return cls(targets)
def __init__(self, *args, **kwargs):
SiblingFileCollectionBase.__init__(self, *args, **kwargs)
# find the first target and store its directory
if self.first_target is None:
raise Exception("{} requires at least one file target".format(self.__class__.__name__))
self.dir = self.first_target.parent
# check that targets are in fact located in the same directory
for t in flatten_collections(self._flat_target_list):
if t.dirname != self.dir.path:
raise Exception("{} {} is not located in common directory {}".format(
t.__class__.__name__, t, self.dir))
def _repr_pairs(self):
expand = Config.instance().get_expanded_bool("target", "expand_path_repr")
dir_path = self.dir.path if expand else self.dir.unexpanded_path
return TargetCollection._repr_pairs(self) + [("fs", self.dir.fs.name), ("dir", dir_path)]
def _iter_state(
self,
existing=True,
optional_existing=None,
basenames=None,
keys=False,
unpack=True,
):
existing = bool(existing)
if optional_existing is not None:
optional_existing = bool(optional_existing)
# the directory must exist
if not self.dir.exists():
return
# get the basenames of all elements of the directory
if basenames is None:
basenames = self.dir.listdir()
# helper to check for existence
def exists(t):
if optional_existing is not None and t.optional:
return optional_existing
if isinstance(t, SiblingFileCollectionBase):
return t._exists_fwd(
basenames=basenames,
optional_existing=optional_existing,
)
if isinstance(t, TargetCollection):
return all(exists(_t) for _t in flatten_collections(t))
return t.basename in basenames
# loop and yield
for key, targets in self._iter_flat():
state = all(exists(t) for t in targets)
if state is existing:
if unpack:
targets = self.targets[key]
yield (key, targets) if keys else targets
def _exists_fwd(self, **kwargs):
fwd = ["basenames", "optional_existing"]
return self.exists(**{key: kwargs[key] for key in fwd if key in kwargs})
[docs]class NestedSiblingFileCollection(SiblingFileCollectionBase):
"""
Collection of targets that represent files which are located across several directories, with
files in the same directory being wrapped by a :py:class:`SiblingFileCollection` to exploit its
benefit over the standard :py:class:`FileCollection` (see description above). This is especially
useful for large collections of remote files that are located in different (sub) directories.
The constructor identifies targets located in the same physical directory (identified by URI),
creates one collection for each of them, and stores them in the *collections* attribute. Key
access, iteration, etc., is identical to the standard :py:class:`FileCollection`.
"""
def __init__(self, *args, **kwargs):
super(NestedSiblingFileCollection, self).__init__(*args, **kwargs)
# as per FileCollection's init, targets are already stored in both the _flat_targets and
# _flat_target_list attributes, but store them again in sibling file collections to speed up
# some methods by grouping them into targets in the same physical directory
self.collections = []
self._flat_target_collections = {}
grouped_targets = {}
for t in flatten_collections(self._flat_target_list):
grouped_targets.setdefault(t.parent.uri(), []).append(t)
for targets in grouped_targets.values():
# create and store the collection
collection = SiblingFileCollection(targets)
self.collections.append(collection)
# remember the collection per target
for t in targets:
self._flat_target_collections[t] = collection
def _repr_pairs(self):
return SiblingFileCollectionBase._repr_pairs(self) + [("collections", len(self.collections))]
def _get_basenames(self):
return {
collection: (collection.dir.listdir() if collection.dir.exists() else [])
for collection in self.collections
}
def _iter_state(
self,
existing=True,
optional_existing=None,
basenames=None,
keys=False,
unpack=True,
):
existing = bool(existing)
if optional_existing is not None:
optional_existing = bool(optional_existing)
# get the dict of all basenames
if basenames is None:
basenames = self._get_basenames()
# helper to check for existence
def exists(t, _basenames):
if optional_existing is not None and t.optional:
return optional_existing
if isinstance(t, SiblingFileCollectionBase):
return t._exists_fwd(
basenames=_basenames,
optional_existing=optional_existing,
)
if isinstance(t, TargetCollection):
return all(exists(_t for _t in flatten_collections(t)))
return t.basename in _basenames
# loop and yield
for key, targets in self._iter_flat():
state = all(exists(t, basenames[self._flat_target_collections[t]]) for t in targets)
if state is existing:
if unpack:
targets = self.targets[key]
yield (key, targets) if keys else targets
def _exists_fwd(self, **kwargs):
fwd = [("basenames", "basenames_dict"), ("optional_existing", "optional_existing")]
return self.exists(**{dst: kwargs[src] for dst, src in fwd if src in kwargs})
def flatten_collections(*targets):
lookup = flatten(targets)
targets = []
while lookup:
t = lookup.pop(0)
if isinstance(t, TargetCollection):
lookup[:0] = t._flat_target_list
else:
targets.append(t)
return targets