# coding: utf-8
"""
Workflow and workflow proxy base class definitions.
"""
__all__ = [
"BaseWorkflow", "WorkflowParameter", "workflow_property", "dynamic_workflow_condition",
"DynamicWorkflowCondition",
]
import re
import copy
import functools
import itertools
import inspect
from collections import OrderedDict, defaultdict
from abc import abstractmethod
import luigi
import six
from law.task.base import Register
from law.task.proxy import ProxyTask, ProxyAttributeTask
from law.target.collection import TargetCollection
from law.target.local import LocalFileTarget
from law.parameter import NO_STR, MultiRangeParameter, CSVParameter
from law.util import (
no_value, make_list, make_set, iter_chunks, range_expand, range_join, create_hash,
is_classmethod, DotDict,
)
from law.logger import get_logger
logger = get_logger(__name__)
[docs]class BaseWorkflowProxy(ProxyTask):
"""
Base class of all workflow proxies.
.. py:classattribute:: workflow_type
type: string
The named type of the workflow. This attribute refers to the value of the ``--workflow``
parameter on the command line to select a particular workflow.
.. py:attribute:: task
type: Task
Reference to the actual *workflow* task.
"""
workflow_type = None
add_workflow_run_decorators = True
def __init__(self, *args, **kwargs):
super(BaseWorkflowProxy, self).__init__(*args, **kwargs)
# find decorators for this proxy's run method that can be configured on the actual task
if self.add_workflow_run_decorators:
for prefix in [self.workflow_type + "_", ""]:
attr = "{}workflow_run_decorators".format(prefix)
decorators = getattr(self.task, attr, None)
if decorators is not None:
# found decorators, so unbound, decorate and re-bound
run_func = self.run.__func__
for decorator in decorators:
run_func = decorator(run_func)
self.run = run_func.__get__(self, self.__class__)
break
self._workflow_has_reset_branch_map = False
def _get_task_attribute(self, name, fallback=False):
"""
Return an attribute of the actual task named ``<workflow_type>_<name>``. When the attribute
does not exist and *fallback* is *True*, try to return the task attribute simply named
*name*. *name* can also be a sequence of strings that are check in the given order. In this
case, the *fallback* option is not considered.
Eventually, if no matching attribute is found, an AttributeError is raised.
"""
if isinstance(name, (list, tuple)):
attributes = name
else:
attributes = [
"{}_{}".format(self.workflow_type, name),
name,
]
for attr in attributes:
value = getattr(self.task, attr, no_value)
if value != no_value:
return value
raise AttributeError("'{!r}' object has none of the requested attribute(s) {}".format(
self, ",".join(map(str, attributes)),
))
[docs] def complete(self):
"""
Custom completion check that invokes the task's *workflow_complete* method and if it returns
anything else than *NotImplemented* returns the value, or just does the default completion
check otherwise.
"""
complete = self.task.workflow_complete()
if complete is not NotImplemented:
return complete
return super(BaseWorkflowProxy, self).complete()
[docs] def requires(self):
"""
Returns the default workflow requirements in an ordered dictionary, which is updated with
the return value of the task's *workflow_requires* method.
"""
reqs = DotDict()
workflow_reqs = self.task.workflow_requires()
if workflow_reqs:
reqs.update(workflow_reqs)
return reqs
[docs] def output(self):
"""
Returns the default workflow outputs in an ordered dictionary. At the moment this is just
the collection of outputs of the branch tasks, stored with the key ``"collection"``.
"""
cls = self.task.output_collection_cls or TargetCollection
targets = luigi.task.getpaths(self.task.get_branch_tasks())
collection = cls(targets, threshold=self.threshold(len(targets)))
return DotDict([("collection", collection)])
[docs] def threshold(self, n=None):
"""
Returns the threshold number of tasks that need to be complete in order to consider the
workflow as being complete itself. This takes into account the
:py:attr:`law.BaseWorkflow.acceptance` parameter of the workflow. The threshold is passed
to the :py:class:`law.TargetCollection` (or :py:class:`law.SiblingFileCollection`) within
:py:meth:`output`. By default, the maximum number of tasks is taken from the length of the
branch map. For performance purposes, you can set this value, *n*, directly.
"""
if n is None:
n = len(self.task.get_branch_map())
acceptance = self.task.acceptance
return (acceptance * n) if acceptance <= 1 else acceptance
[docs] def run(self):
"""
Default run implementation that resets the branch map once if requested.
"""
if self.task.reset_branch_map_before_run and not self._workflow_has_reset_branch_map:
self._workflow_has_reset_branch_map = True
# reset cached branch map, branch tasks and boundaries
self.task._branch_map = None
self.task._branch_tasks = None
self.task.branches = self.task._initial_branches
[docs]def workflow_property(func=None, attr=None, setter=True, cache=False, empty_value=no_value):
"""
Decorator to declare an attribute that is stored only on a workflow and optionally cached for
subsequent calls. Therefore, the decorated method is expected to (lazily) provide the value to
cache if enabled. When the value is equal to *empty_value*, it is not cached and the next access
to the property will invoke the decorated method again. The resulting value is stored as either
``_workflow_<func.__name__>`` or ``_workflow_cached_<func.__name__>`` on the workflow. By
default, a setter is provded to overwrite the the attribute. Set *setter* to *False* to disable
this feature. Example:
.. code-block:: python
class MyTask(Workflow):
@workflow_property
def common_data(self):
# this method is always called with *self* being the *workflow*
return some_demanding_computation()
@workflow_property(attr="my_own_property", setter=False, cache=True)
def common_data2(self):
return some_other_computation()
"""
def decorator(func):
_attr = attr or "_workflow_{}{}".format("cached_" if cache else "", func.__name__)
@functools.wraps(func)
def getter(self):
wf = self.as_workflow()
if getattr(wf, _attr, empty_value) == empty_value or not cache:
setattr(wf, _attr, func(wf))
return getattr(wf, _attr)
_setter = None
if setter:
def _setter(self, value): # noqa: F811
wf = self.as_workflow()
setattr(wf, _attr, value)
_setter.__name__ = func.__name__
return property(fget=getter, fset=_setter)
return decorator if func is None else decorator(func)
class WorkflowParameter(CSVParameter):
def __init__(self, *args, **kwargs):
# force an empty default value, disable single values being wrapped by tuples, and declare
# the parameter as insignificant as they only act as a convenient branch lookup interface
kwargs["default"] = no_value
kwargs["force_tuple"] = False
kwargs["significant"] = False
super(WorkflowParameter, self).__init__(*args, **kwargs)
# linearize the default
self._default = no_value
def parse(self, inp):
""""""
if inp in (None, NO_STR, no_value):
return no_value
return super(WorkflowParameter, self).parse(inp)
def serialize(self, value):
""""""
if value in (None, no_value):
return ""
return super(WorkflowParameter, self).serialize(value)
[docs]def dynamic_workflow_condition(
condition_fn=None,
create_branch_map_fn=None,
requires_fn=None,
output_fn=None,
condition_as_workflow=False,
cache_met_condition=True,
):
"""
Decorator factory that is meant to wrap a workflow methods that defines a dynamic workflow
condition, returning a :py:class:`DynamicWorkflowCondition` instance.
"""
def decorator(condition_fn):
return DynamicWorkflowCondition(
condition_fn=condition_fn,
create_branch_map_fn=create_branch_map_fn,
requires_fn=requires_fn,
output_fn=output_fn,
condition_as_workflow=condition_as_workflow,
cache_met_condition=cache_met_condition,
)
return decorator if condition_fn is None else decorator(condition_fn)
class DynamicWorkflowCondition(object):
"""
Container for a workflow method that defines whether the branch map can be dynamically
constructed or whether a placeholder should be used until the condition is met. Similar to
Python's ``property``, instances of this class provide additional attributes for decorating
other methods that usually depend on the branch map, such as branch requirements or outputs.
It is recommended to use the :py:func:`dynamic_workflow_condition` decorator (factory).
Example:
.. code-block:: python
class MyWorkflow(law.LocalWorkflow):
def workflow_requires(self):
# define requirements for the full workflow to start
reqs = super().workflow_requires()
reqs["files"] = OtherTask.req(self)
return reqs
@law.dynamic_workflow_condition
def workflow_condition(self):
# declare that the branch map can be built if the workflow requirement exists
# note: self.input() refers to the outputs of tasks defined in workflow_requires()
return self.input()["files"].exists()
@workflow_condition.create_branch_map
def create_branch_map(self):
# let's assume that OtherTask produces a json file containing a list of objects
# that _this_ workflows iterates over, so we can simply return this list here
return self.input()["files"].load(formatter="json")
def requires(self):
# branch-level requirement
# note: this is not really necessary, since the branch requirements are only
# evaluated _after_ a branch map is built, so OtherTask must have been completed
return OtherTask.req(self)
@workflow_condition.output
def output(self):
# define the output
return law.LocalFileTarget("file_{}.txt".format(self.branch))
def run(self):
# trivial run implementation
self.output().touch()
The condition is defined by ``workflow_condition`` which is decorated by *this* object. Once it
is met, the branch map is fully created and cached (as usual) for subsequent calls.
In addition, both ``create_branch_map()`` and ``output()`` are decorated with corresponding
attributes of the initially decorated object. As a result, both methods will return placeholder
objects as long as the condition is not met - the branch map will be considered empty and the
output will refer to a temporary placeholder target that is never created. Note that a third
decorator for ``requires`` exists as well.
As a consequence, the amended workflow is fully dynamic with its exact shape potentially
depending heavily on conditions that are only known at runtime.
Internally, the condition is evaluated by the calling task which is usually a workflow, but it
can also be one of its branch tasks if, for instance, sandboxing is involved. Set
*condition_as_workflow* to *True* to ensure that the condition is always evaluated by the
workflow itself.
In case the ``workflow_condition`` involves a costly computation, it is recommended to cache
evluation of the condition by setting *cache_met_condition* argument to *True* or a string
denoting the task instance attribute where the met condition is stored. In the first case,
the attribute defaults to ``_dynamic_workflow_condition_met``.
"""
_decorator_result = object()
def __init__(
self,
condition_fn,
create_branch_map_fn=None,
requires_fn=None,
output_fn=None,
condition_as_workflow=False,
cache_met_condition=True,
):
super(DynamicWorkflowCondition, self).__init__()
# attributes
self._condition_fn = condition_fn
self._create_branch_map_fn = create_branch_map_fn
self._requires_fn = requires_fn
self._output_fn = output_fn
self.condition_as_workflow = condition_as_workflow
self.cache_met_condition = cache_met_condition
if self.cache_met_condition and not isinstance(cache_met_condition, str):
self.cache_met_condition = "_dynamic_workflow_condition_met"
def _wrap_condition_fn(self):
if self._condition_fn is None:
return None
@functools.wraps(self._condition_fn)
def condition(inst, *args, **kwargs):
# when caching, and the condition is already met, return the cached value
if self.cache_met_condition and getattr(inst, self.cache_met_condition, False):
return getattr(inst, self.cache_met_condition)
# evaluate the condition
task = inst.as_workflow() if self.condition_as_workflow else inst
is_met = self._condition_fn(task, *args, **kwargs)
# write to cache if requested
if self.cache_met_condition and is_met:
setattr(inst, self.cache_met_condition, is_met)
return is_met
return condition
def create_branch_map(self, create_branch_map_fn):
# store the function
self._create_branch_map_fn = create_branch_map_fn
return self._decorator_result
def _wrap_create_branch_map(self, bound_condition_fn):
if self._create_branch_map_fn is None:
return None
@functools.wraps(self._create_branch_map_fn)
def create_branch_map(inst, *args, **kwargs):
if not bound_condition_fn():
return [None]
# enable branch map caching since the condition is met
inst.cache_branch_map = True
return self._create_branch_map_fn(inst, *args, **kwargs)
return create_branch_map
def requires(self, requires_fn):
# store the function
self._requires_fn = requires_fn
return self._decorator_result
def _wrap_requires(self, bound_condition_fn):
if self._requires_fn is None:
return None
@functools.wraps(self._requires_fn)
def requires(inst, *args, **kwargs):
if not bound_condition_fn():
return []
# enable branch map caching since the condition is met
inst.cache_branch_map = True
return self._requires_fn(inst, *args, **kwargs)
return requires
def output(self, output_fn):
# store the function
self._output_fn = output_fn
return self._decorator_result
def _wrap_output(self, bound_condition_fn):
if self._output_fn is None:
return None
@functools.wraps(self._output_fn)
def output(inst, *args, **kwargs):
if not bound_condition_fn():
return LocalFileTarget(is_tmp="DYNAMIC_WORKFLOW_PLACEHOLDER")
# enable branch map caching since the condition is met
inst.cache_branch_map = True
return self._output_fn(inst, *args, **kwargs)
return output
def _iter_wrappers(self, bound_condition_fn):
if self._create_branch_map_fn is not None:
yield "create_branch_map", self._wrap_create_branch_map(bound_condition_fn)
if self._requires_fn is not None:
yield "requires", self._wrap_requires(bound_condition_fn)
if self._output_fn is not None:
yield "output", self._wrap_output(bound_condition_fn)
def copy(self):
return copy.deepcopy(self)
class WorkflowRegister(Register):
def __new__(metacls, name, bases, classdict):
# handle dynamic workflow conditions
condition_attr = metacls.check_dynamic_workflow_conditions(name, classdict)
if condition_attr:
# store the attribute when found and disable the branch map caching by default
classdict["_condition_attr"] = condition_attr
classdict.setdefault("cache_branch_map_default", False)
# store a flag on the created class whether it defined a new workflow_proxy_cls
# this flag will define the classes in the mro to consider for instantiating the proxy
classdict["_defined_workflow_proxy"] = "workflow_proxy_cls" in classdict
# create and return the class
return super(WorkflowRegister, metacls).__new__(metacls, name, bases, classdict)
@classmethod
def check_dynamic_workflow_conditions(metacls, name, classdict):
# check that only one condition is present in classdict
condition_attr = None
for attr, value in classdict.items():
if not isinstance(value, DynamicWorkflowCondition):
continue
if condition_attr:
raise Exception(
"class '{}' defined with more than one DynamicWorkflowCondition, found "
"'{}' after previously registered '{}'".format(name, attr, condition_attr),
)
condition_attr = attr
return condition_attr
[docs]class BaseWorkflow(six.with_metaclass(WorkflowRegister, ProxyAttributeTask)):
"""
Base class of all workflows.
.. py:classattribute:: workflow
type: :py:class:`luigi.Parameter`
Workflow type that refers to the workflow proxy implementation at instantiation / execution
time. Empty default value.
.. py:classattribute:: acceptance
type: :py:class:`luigi.FloatParameter`
Number of complete tasks to consider the workflow successful. Values larger than one are
interpreted as absolute numbers, and as fractions otherwise. Defaults to *1.0*.
.. py:classattribute:: tolerance
type: :py:class:`luigi.FloatParameter`
Number of failed tasks to still consider the workflow successful. Values larger than one are
interpreted as absolute numbers, and as fractions otherwise. Defaults to *0.0*.
.. py:classattribute:: branch
type: :py:class:`luigi.IntParameter`
The branch number to run this task for. *-1* means that this task is the actual *workflow*,
rather than a *branch* task. Defaults to *-1*.
.. py:classattribute:: branches
type: :py:class:`law.MultiRangeParameter`
Explicit list of branches or branch ranges to process. Empty default value.
.. py:classattribute:: workflow_proxy_cls
type: :py:class:`BaseWorkflowProxy`
Reference to the workflow proxy class associated to this workflow.
.. py:classattribute:: output_collection_cls
type: :py:class:`law.TargetCollection`
Configurable target collection class to use, such as
:py:class:`target.collection.TargetCollection`, :py:class:`target.collection.FileCollection`
or :py:class:`target.collection.SiblingFileCollection`.
.. py:classattribute:: force_contiguous_branches
type: bool
Flag that denotes if this workflow is forced to use contiguous branch numbers, starting from
0. If *False*, an exception is raised otherwise.
.. py:classattribute:: reset_branch_map_before_run
type: bool
Flag that denotes whether the branch map should be recreated from scratch before the run
method of the underlying workflow proxy is called.
.. py:classattribute:: create_branch_map_before_repr
type: bool
Flag that denotes whether the branch map should be created (if not already done) before the
task representation is created via :py:meth:`repr`.
.. py:classattribute:: cache_workflow_requirements
type: bool
Whether workflow requirements should be evaluated only cached and cached afterwards in the
:py:attr:`_cached_workflow_requirements` attribute. Defaults to *False*.
.. py:classattribute:: cache_branch_map_default
type: bool
The initial default value of the :py:attr:`cache_branch_map` attribute that decides whether
the branch map be created only once and then cached in the :py:attr:`_branch_map` attribute.
Defaults to *True*.
.. py:classattribute:: workflow_run_decorators
type: sequence, None
Sequence of decorator functions that will be conveniently used to decorate the workflow
proxy's run method. This way, there is no need to subclass and reset the
:py:attr:`workflow_proxy_cls` just to add a decorator. The value is *None* by default.
.. py:attribute:: workflow_cls
type: :py:class:`law.Register`
Reference to the class of the realized workflow. This is especially helpful in case your
derived class inherits from multiple workflows.
.. py:attribute:: workflow_proxy
type: :py:class:`BaseWorkflowProxy`
Reference to the underlying workflow proxy instance.
.. py:attribute:: branch_map
type: dict (read-only)
Shorthand for :py:meth:`get_branch_map`.
.. py:attribute:: branch_data
type: any (read-only)
Shorthand for ``self.branch_map[self.branch]``.
"""
workflow = luigi.Parameter(
default=NO_STR,
description="the type of the workflow to use; uses the first workflow type in the MRO when "
"empty; default: empty",
)
effective_workflow = luigi.Parameter(
default=NO_STR,
description="do not set manually",
)
acceptance = luigi.FloatParameter(
default=1.0,
significant=False,
description="number of finished tasks to consider the task successful; relative fraction "
"(<= 1) or absolute value (> 1); default: 1.0",
)
tolerance = luigi.FloatParameter(
default=0.0,
significant=False,
description="number of failed tasks to still consider the task successful; relative "
"fraction (<= 1) or absolute value (> 1); default: 0.0",
)
pilot = luigi.BoolParameter(
default=False,
significant=False,
description="disable certain configurable requirements of the workflow to let branch tasks "
"resolve requirements on their own; default: False",
)
branch = luigi.IntParameter(
default=-1,
description="the branch number/index to run this task for; -1 means this task is the "
"workflow; default: -1",
)
branches = MultiRangeParameter(
default=(),
require_start=False,
require_end=False,
single_value=True,
description="comma-separated list of branches to select; each value can have the format "
"'start:end' (end not included as per Python) to support range syntax; default: empty",
)
# caches
_cls_branch_map_cache = {}
# configuration members
workflow_proxy_cls = BaseWorkflowProxy
output_collection_cls = None
force_contiguous_branches = False
reset_branch_map_before_run = False
create_branch_map_before_repr = False
cache_workflow_requirements = False
cache_branch_map_default = True
passthrough_requested_workflow = True
workflow_run_decorators = None
# skip from indexing
exclude_index = True
# parameter exclusions
exclude_params_req = {"effective_workflow"}
exclude_params_index = {"effective_workflow"}
exclude_params_repr = {"workflow"}
exclude_params_branch = {"acceptance", "tolerance", "pilot", "branches"}
exclude_params_workflow = {"branch"}
def __new__(cls, *args, **kwargs):
inst = super(BaseWorkflow, cls).__new__(cls)
# bind wrappers present in the optional condition object
condition_attr = getattr(cls, "_condition_attr", None)
if condition_attr:
condition = getattr(inst, condition_attr, None)
if isinstance(condition, DynamicWorkflowCondition):
# bind the condition method itself
bound_condition_fn = condition._wrap_condition_fn().__get__(inst)
setattr(inst, condition_attr, bound_condition_fn)
# store the condition object itself
setattr(inst, condition_attr + "_obj", condition)
# bind wrapped methods that currently correspond to placeholders
for attr, wrapper in condition._iter_wrappers(bound_condition_fn):
if getattr(inst, attr, None) != DynamicWorkflowCondition._decorator_result:
continue
setattr(inst, attr, wrapper.__get__(inst))
return inst
[docs] @classmethod
def modify_param_values(cls, params):
params = super(BaseWorkflow, cls).modify_param_values(params)
# determine the default workflow type when not set
if params.get("workflow") in [None, NO_STR]:
params["workflow"] = cls.find_workflow_cls().workflow_proxy_cls.workflow_type
# set the effective workflow parameter based on the actual resolution
workflow_cls = cls.find_workflow_cls(
name=params["workflow"],
fallback_to_first=cls.passthrough_requested_workflow,
)
params["effective_workflow"] = workflow_cls.workflow_proxy_cls.workflow_type
# resolve workflow parameters
params = cls._resolve_workflow_parameters(params)
return params
@classmethod
def _resolve_workflow_parameters(cls, params):
"""
Handles the translation from workflow parameters to branch values, updating *params*
in-place.
"""
workflow_params = [
(name, param, params.get(name, no_value))
for name, param in cls.get_params()
if isinstance(param, WorkflowParameter)
]
# nothing to do when the task does not use workflow parameters
if not workflow_params:
return params
# helper for error messages
cjoin = lambda seq: ",".join(map(str, seq))
wparams_repr = lambda: cjoin(map("{0[0]}={0[2]}".format, workflow_params))
# when there are any workflow parameters, create_branch_map must be a classmethod since
# there is no way of accessing this map before instantiation
if not is_classmethod(cls.create_branch_map, cls):
raise Exception(
"{}.create_branch_map must be a classmethod accepting a single parameter (dict "
"of parameter names and values) in case workflows use WorkflowParameter "
"objects in order to perform branch value lookups prior to any task "
"instantiation; found workflow parameter(s) {}".format(
cls.__name__, wparams_repr(),
),
)
# helper to extract an entry from branch data (usually a dict)
def get_branch_value(branch, branch_data, key):
if isinstance(branch_data, dict):
if key in branch_data:
return branch_data[key]
elif getattr(branch_data, key, no_value) != no_value:
return getattr(branch_data, key)
raise AttributeError(
"attribute or item '{}' unknown to branch data at branch {}: {}".format(
key, branch, branch_data,
),
)
# get the branch map, potentially from a cache
try:
# create a hash of all significant parameters to store the map
h = hash((cls.task_family, tuple(params.items())))
except TypeError:
# some parameter is not hashable
h = None
# recreate the maps if needed
branch_map, branch_map_reversed = (
cls._cls_branch_map_cache[h]
if h and h in cls._cls_branch_map_cache
else (None, None)
)
if branch_map is None:
# get the map and sanitize it
branch_map = cls.create_branch_map(params)
branch_map = cls._sanitize_branch_map(branch_map, cls.force_contiguous_branches)
# create the reversed map, using workflow parameter value tuples as keys
branch_map_reversed = OrderedDict()
for b, branch_data in branch_map.items():
key = tuple(
get_branch_value(b, branch_data, name)
for name, _, _ in workflow_params
)
if key not in branch_map_reversed:
branch_map_reversed[key] = []
branch_map_reversed[key].append(b)
# cache
if h:
cls._cls_branch_map_cache[h] = (branch_map, branch_map_reversed)
# get parameters
branch = params.get("branch", -1)
branches = params.get("branches", ())
# check if any or all workflow parameters are set, and if any of them is a sequence
set_idxs = [i for i, (_, _, value) in enumerate(workflow_params) if value != no_value]
any_set = len(set_idxs) > 0
all_set = len(set_idxs) == len(workflow_params)
any_seq = any(isinstance(value, (tuple, list, set)) for _, _, value in workflow_params)
# when all are set and none of them is a sequence, the workflow parameters can refer to
# no branch (-> exception), one branch (-> assign it), or multiple branches (-> workflow)
_branches = []
if all_set and not any_seq:
values = tuple(value for _, _, value in workflow_params)
_branches = branch_map_reversed.get(values, [])
if len(_branches) == 0:
raise ValueError(
"workflow parameters {} do not match any branch in {}".format(
wparams_repr(), cls.__name__,
),
)
if all_set and not any_seq and _branches and len(_branches) == 1:
# when all are set and do not refer to any sequence,
# lookup the branch value and verify that workflow parameter values match
_branch = _branches[0]
if branch != -1 and branch != _branch:
raise ValueError(
"workflow parameters {} in {} refer to branch {}, but branch {} "
"requested".format(wparams_repr(), cls.__name__, _branch, branch),
)
# always overwrite
params["branch"] = branch = _branch
elif any_set:
# at least one parameter is not set or is a sequence, resulting in a workflow,
# and in both cases we can filter the branch map to determine matching branches
# branch should not be set
if branch != -1:
raise ValueError(
"workflow parameters {} will lead to {} being a workflow, but branch "
"{} requested".format(wparams_repr(), cls.__name__, branch),
)
if not _branches:
# create a version of the reversed branch map where workflow parameters that are
# not given are removed and corresponding branch values are merged
branch_map_reversed_collapsed = defaultdict(list)
for values, b in branch_map_reversed.items():
collapsed_values = tuple(values[i] for i in set_idxs)
branch_map_reversed_collapsed[collapsed_values].extend(b)
# lookup all branches matched by parameters
_branches = []
names = [name for name, _, _ in workflow_params]
sequences = (make_list(value) for _, _, value in workflow_params)
for values in itertools.product(*sequences):
collapsed_values = tuple(values[i] for i in set_idxs)
if collapsed_values not in branch_map_reversed_collapsed:
param_repr = cjoin(map("{0[0]}={0[1]}".format, zip(names, values)))
raise Exception(
"workflow parameter combination {} not found in branch map of "
"{}".format(param_repr, cls.__name__),
)
_branches.extend(branch_map_reversed_collapsed[collapsed_values])
# check if _branches match branches when set
if branches:
branches = range_expand(list(branches), include_end=True, min_value=0,
max_value=max(branch_map))
if set(branches) != set(_branches):
raise ValueError(
"workflow parameters {} expanded in {} to branches ({}) do not match "
"passed branches ({})".format(
wparams_repr(), cls.__name__, cjoin(_branches), cjoin(branches),
),
)
# always overwrite
params["branches"] = tuple(range_join(_branches))
elif branch != -1:
# set all workflow parameters according to the data in the branch map at "branch"
if branch not in branch_map:
raise KeyError(
"branch map of task class {} does not contain branch {}".format(
cls.__name__, branch,
),
)
branch_data = branch_map[branch]
for name, _, _ in workflow_params:
params[name] = get_branch_value(branch, branch_data, name)
return params
@classmethod
def find_workflow_cls(cls, name=None, fallback_to_first=False):
first_cls = None
for workflow_cls in inspect.getmro(cls):
if not issubclass(workflow_cls, BaseWorkflow):
continue
if not workflow_cls._defined_workflow_proxy:
continue
if name in (workflow_cls.workflow_proxy_cls.workflow_type, None, NO_STR):
return workflow_cls
if first_cls is None:
first_cls = workflow_cls
if fallback_to_first and first_cls is not None:
return first_cls
msg = " for type '{}'".format(name) if name else ""
raise ValueError("cannot determine workflow class{} in task class {}".format(msg, cls))
@classmethod
def _sanitize_branch_map(cls, branch_map, force_contiguous_branches):
if isinstance(branch_map, (list, tuple)):
branch_map = dict(enumerate(branch_map))
elif isinstance(branch_map, six.integer_types):
branch_map = dict(enumerate(range(branch_map)))
elif force_contiguous_branches:
n = len(branch_map)
if set(branch_map.keys()) != set(range(n)):
raise ValueError("branch map keys must constitute contiguous range "
"[0, {})".format(n))
else:
for branch in branch_map:
if not isinstance(branch, six.integer_types) or branch < 0:
raise ValueError("branch map keys must be non-negative integers, got "
"'{}' ({})".format(branch, type(branch).__name__))
return branch_map
[docs] @classmethod
def req_different_branching(cls, inst, **kwargs):
"""
Variation of :py:meth:`Task.req` that should be used when defining requirements between
workflows that implement a different branch granularity (e.g. task B with 10 branches
requires task A with 2 branches). The only difference to the base method is that workflow
specific parameters such as *branches* or *tolerance* are automatically skipped when not
added explicitly in *kwargs*.
"""
_exclude = set(make_list(kwargs.get("_exclude", [])))
_exclude |= cls.exclude_params_branch
kwargs["_exclude"] = _exclude
return cls.req(inst, **kwargs)
def __init__(self, *args, **kwargs):
super(BaseWorkflow, self).__init__(*args, **kwargs)
# store a list of workflow parameter names
self._workflow_param_names = [
name
for name, param in self.get_params()
if isinstance(param, WorkflowParameter)
]
# workflow and branch specific attributes
if self.is_workflow():
# caches
self._branch_map = None
self._branch_tasks = None
self._cache_branch_map = self.__class__.cache_branch_map_default
self._cached_workflow_requirements = no_value
# store whether workflow objects have been setup, which is done lazily,
# and predefine all attributes that are set by it
self._workflow_initialized = False
self._workflow_cls = None
self._workflow_proxy = None
# initially set branches
self._initial_branches = tuple(self.branches)
else:
# caches
self._workflow_task = None
@workflow_property(attr="_cache_branch_map")
def cache_branch_map(self):
return self._cache_branch_map
@property
def _cache_branches(self):
# deprecation warning until v0.1
logger.warning(
"accessing {0}._cache_branches is deprecated, use {0}.cache_branch_map instead".format(
self.__class__.__name__,
),
)
return self._cache_branch_map
@_cache_branches.setter
def _cache_branches(self, cache_branches):
logger.warning(
"setting {0}._cache_branches is deprecated, use {0}.cache_branch_map instead".format(
self.__class__.__name__,
),
)
self._cache_branch_map = cache_branches
def _initialize_workflow(self, force=False):
if self.is_branch():
return
if self._workflow_initialized and not force:
return
self._workflow_cls = self.find_workflow_cls(self.effective_workflow)
self._workflow_proxy = self._workflow_cls.workflow_proxy_cls(task=self)
logger.debug(
"created workflow proxy instance of type '{}'".format(self.effective_workflow),
)
self._workflow_initialized = True
@property
def workflow_cls(self):
self._initialize_workflow()
return self.as_workflow()._workflow_cls
@property
def workflow_proxy(self):
self._initialize_workflow()
return self.as_workflow()._workflow_proxy
def repr(self, *args, **kwargs):
if self.create_branch_map_before_repr:
self.get_branch_map()
return super(BaseWorkflow, self).repr(*args, **kwargs)
def cli_args(self, exclude=None, replace=None):
exclude = set() if exclude is None else set(make_list(exclude))
# exclude certain branch/workflow parameters
exclude |= self.exclude_params_branch if self.is_branch() else self.exclude_params_workflow
# always exclude workflow parameters
exclude |= set(self._workflow_param_names)
return super(BaseWorkflow, self).cli_args(exclude=exclude, replace=replace)
def _repr_params(self, *args, **kwargs):
params = super(BaseWorkflow, self)._repr_params(*args, **kwargs)
if self.is_workflow():
# when this is a workflow, add the requested or effective workflow type,
# depending on whether the requested one is to be passed through
workflow = (
self.workflow
if self.passthrough_requested_workflow
else self.effective_workflow
)
params.setdefault("workflow", workflow)
# skip branches when empty
if not params.get("branches"):
params.pop("branches", None)
else:
# when this is a branch, remove workflow parameters
for param in self.exclude_params_branch:
params.pop(param, None)
return params
def req_branch(self, branch, **kwargs):
if branch == -1:
raise ValueError(
"branch must not be -1 when creating a new branch task via req_branch(), "
"but got {}".format(branch),
)
# default kwargs
kwargs.setdefault("_skip_task_excludes", True)
kwargs["_exclude"] = make_set(kwargs.get("_exclude", ())) | set(self._workflow_param_names)
if self.is_workflow():
kwargs["_exclude"] |= set(self.exclude_params_branch)
# create the task
task = self.req(self, branch=branch, **kwargs)
# set the _workflow_task attribute if known
if task._workflow_task is None:
task._workflow_task = self if self.is_workflow() else self._workflow_task
return task
def req_workflow(self, **kwargs):
# default kwargs
kwargs.setdefault("_skip_task_excludes", True)
kwargs["_exclude"] = make_set(kwargs.get("_exclude", ())) | set(self._workflow_param_names)
if self.is_branch():
kwargs["_exclude"] |= set(self.exclude_params_workflow)
return self.req(self, branch=-1, **kwargs)
[docs] def is_branch(self):
"""
Returns whether or not this task refers to a *branch*.
"""
return self.branch != -1
[docs] def is_workflow(self):
"""
Returns whether or not this task refers to the *workflow*.
"""
return not self.is_branch()
[docs] def as_branch(self, branch=None):
"""
When this task refers to the workflow, a re-instantiated task with identical parameters and
a certain *branch* value, defaulting to 0, is returned. When this task is already a branch
task, the task itself is returned when *branch* is *None* or matches this task's branch
value. Otherwise, a new branch task with that value and identical parameters is created and
returned.
"""
if branch == -1:
raise ValueError("branch must not be -1 when selecting a branch task")
if self.is_branch() and branch in (None, self.branch):
return self
return self.req_branch(branch or 0)
[docs] def as_workflow(self):
"""
When this task refers to a branch task, a re-instantiated task with ``branch=-1`` and
identical parameters is returned. Otherwise, the workflow itself is returned.
"""
if self.is_workflow():
return self
if self._workflow_task is None:
self._workflow_task = self.req_workflow()
return self._workflow_task
[docs] @abstractmethod
def create_branch_map(self):
"""
Abstract method that must be overwritten by inheriting tasks to define the branch map.
"""
return
def _reset_branch_boundaries(self, full_branch_map):
if self.is_branch():
raise Exception("calls to _reset_branch_boundaries are forbidden for branch tasks")
# rejoin branch ranges when given
if self.branches:
# get minimum and maximum branches
branches = set(full_branch_map.keys())
min_branch = min(branches)
max_branch = max(branches) + 1
# get expanded branch values
branches = range_expand(
list(self.branches),
min_value=min_branch,
max_value=max_branch,
)
# assign back to branches attribute, use an empty tuple in case all branches are used
use_all = (
len(branches) == len(full_branch_map) and
set(branches) == set(full_branch_map)
)
self.branches = () if use_all else tuple(range_join(branches))
def _reduce_branch_map(self, branch_map):
if self.is_branch():
raise Exception("calls to _reduce_branch_map are forbidden for branch tasks")
# create a set of branches to remove
remove_branches = set()
# apply branch ranges
if self.branches:
branches = set(branch_map.keys())
min_branch = min(branches)
max_branch = max(branches) + 1
requested = range_expand(
list(self.branches),
min_value=min_branch,
max_value=max_branch,
)
remove_branches |= branches - set(requested)
# remove from branch map
for b in remove_branches:
del branch_map[b]
[docs] def get_branch_map(self, reset_boundaries=True, reduce_branches=True):
"""
Creates and returns the branch map defined in :py:meth:`create_branch_map`. If
*reset_boundaries* is *True*, the branch numbers and ranges defined in :py:attr:`branches`
are rearranged to not exceed the actual branch map length. If *reduce_branches* is *True*,
the branch map is additionally filtered accordingly. The branch map is cached internally.
"""
if self.is_branch():
return self.as_workflow().get_branch_map(
reset_boundaries=reset_boundaries,
reduce_branches=reduce_branches,
)
branch_map = self._branch_map
if branch_map is None:
# create a new branch map
args = ()
if is_classmethod(self.create_branch_map, self.__class__):
params = OrderedDict([
(param_name, getattr(self, param_name))
for param_name, _ in self.get_params()
])
args = (params,)
branch_map = self.create_branch_map(*args)
# some type and sanity checks
branch_map = self._sanitize_branch_map(branch_map, self.force_contiguous_branches)
# post-process
if reset_boundaries:
self._reset_branch_boundaries(branch_map)
if reduce_branches:
self._reduce_branch_map(branch_map)
# cache it
if self.cache_branch_map:
self._branch_map = branch_map
return branch_map
@property
def branch_map(self):
return self.get_branch_map()
@property
def branch_data(self):
if self.is_workflow():
raise Exception("calls to branch_data are forbidden for workflow tasks")
branch_map = self.get_branch_map()
if self.branch not in branch_map:
raise ValueError("invalid branch '{}', not found in branch map".format(self.branch))
return branch_map[self.branch]
[docs] def get_branch_tasks(self):
"""
Returns a dictionary that maps branch numbers to instantiated branch tasks. As this might be
computationally intensive, the return value is cached.
"""
if self.is_branch():
return self.as_workflow().get_branch_tasks()
if self._branch_tasks is None:
# get all branch tasks according to the map
branch_tasks = OrderedDict()
for b in self.get_branch_map():
branch_tasks[b] = self.as_branch(branch=b)
# return the task when we are not going to cache it
if not self.cache_branch_map:
return branch_tasks
# cache it
self._branch_tasks = branch_tasks
return self._branch_tasks
[docs] def get_branch_chunks(self, chunk_size):
"""
Returns a list of chunks of branch numbers defined in this workflow with a certain
*chunk_size*. Example:
.. code-block:: python
wf = SomeWorkflowTask() # has 8 branches
print(wf.get_branch_chunks(3))
# -> [[0, 1, 2], [3, 4, 5], [6, 7]]
wf2 = SomeWorkflowTask(branches=[(0, 5)]) # has 5 branches
print(wf2.get_branch_chunks(3))
# -> [[0, 1, 2], [3, 4]]
"""
if self.is_branch():
return self.as_workflow().get_branch_chunks(chunk_size)
# get the branch map and create chunks of its branch values
branch_chunks = iter_chunks(self.get_branch_map().keys(), chunk_size)
return list(branch_chunks)
[docs] def get_all_branch_chunks(self, chunk_size, **kwargs):
"""
Returns a list of chunks of all branch numbers of this workflow (i.e. without
*branches* parameters applied) with a certain *chunk_size*. Internally, a new instance of
this workflow is created using :py:meth:`BaseTask.req`, forwarding all *kwargs*, with
*_exclude* parameters extended by ``{"branches"}`` in order to use all possible branch
values. Example:
.. code-block:: python
wf = SomeWorkflowTask() # has 8 branches
print(wf.get_all_branch_chunks(3))
# -> [[0, 1, 2], [3, 4, 5], [6, 7]]
wf2 = SomeWorkflowTask(branches=[(0, 5)]) # has 5 branches
print(wf2.get_all_branch_chunks(3))
# -> [[0, 1, 2], [3, 4, 5], [6, 7]]
"""
if self.is_branch():
return self.as_workflow().get_all_branch_chunks(chunk_size, **kwargs)
# create a new workflow instance
kwargs["_exclude"] = set(kwargs.get("_exclude", set())) | {"branches"}
kwargs["_skip_task_excludes"] = True
wf = self.req_workflow(self, **kwargs)
# return its branch chunks
return wf.get_branch_chunks(chunk_size)
[docs] def get_branches_repr(self, max_ranges=10):
"""
Creates a string representation of the selected branches that can be used as a readable
description or postfix in output paths. When the branches of this workflow are configured
via the *branches* parameter, and there are more than *max_ranges* identified ranges, the
string will contain a unique hash describing those ranges.
"""
branch_map = self.get_branch_map()
if not self.branches:
return "{}To{}".format(min(branch_map.keys()), max(branch_map.keys()) + 1)
ranges = range_join(list(branch_map.keys()))
if len(ranges) > max_ranges:
return "{}_ranges_{}".format(len(ranges), create_hash(ranges))
return "_".join(
str(r[0]) if len(r) == 1 else "{}To{}".format(r[0], r[1] + 1)
for r in ranges
)
[docs] def workflow_complete(self):
"""
Hook to define the completeness status of the workflow.
"""
return NotImplemented
[docs] def workflow_requires(self):
"""
Hook to add workflow requirements. This method is expected to return a dictionary. When
this method is called from a branch task, an exception is raised.
"""
if self.is_branch():
return self.as_workflow().workflow_requires()
return DotDict()
[docs] def requires_from_branch(self):
"""
Returns the requirements defined in the standard ``requires()`` method, but called in the
context of the workflow. This method is only recommended in case all required tasks that
would normally take a branch number, are intended to be instantiated with ``branch=-1``.
When this method is called from a branch task, an exception is raised.
"""
if self.is_branch():
raise Exception("calls to requires_from_branch are forbidden for branch tasks")
return self.__class__.requires(self)
def _handle_scheduler_messages(self):
if self.scheduler_messages:
while not self.scheduler_messages.empty():
msg = self.scheduler_messages.get()
self.handle_scheduler_message(msg)
[docs] def handle_scheduler_message(self, msg, _attr_value=None):
""" handle_scheduler_message(msg)
Hook that is called when a scheduler message *msg* is received. Returns *True* when the
messages was handled, and *False* otherwise.
Handled messages:
- ``tolerance = <int/float>``
- ``acceptance = <int/float>``
"""
attr, value = _attr_value or (None, None)
# handle "tolerance"
if attr is None:
m = re.match(r"^\s*(tolerance)\s*(\=|\:)\s*(.*)\s*$", str(msg))
if m:
attr = "tolerance"
try:
self.tolerance = float(m.group(3))
value = self.tolerance
except ValueError as e:
value = e
# handle "acceptance"
if attr is None:
m = re.match(r"^\s*(acceptance)\s*(\=|\:)\s*(.*)\s*$", str(msg))
if m:
attr = "acceptance"
try:
self.acceptance = float(m.group(3))
value = self.acceptance
except ValueError as e:
value = e
# respond
if attr:
if isinstance(value, Exception):
msg.respond("cannot set {}: {}".format(attr, value))
logger.warning("cannot set {} of task {}: {}".format(attr, self.live_task_id, value))
else:
msg.respond("{} set to {}".format(attr, value))
logger.info("{} of task {} set to {}".format(attr, self.live_task_id, value))
return True
msg.respond("task cannot handle scheduler message: {}".format(msg))
return False