law.workflow.base#

Workflow and workflow proxy base class definitions.

Class BaseWorkflow#

class BaseWorkflow(*args, **kwargs)[source]#

Bases: ProxyAttributeTask

Base class of all workflows.

classattribute workflow#

type: luigi.Parameter

Workflow type that refers to the workflow proxy implementation at instantiation / execution time. Empty default value.

classattribute acceptance#

type: luigi.FloatParameter

Number of complete tasks to consider the workflow successful. Values larger than one are interpreted as absolute numbers, and as fractions otherwise. Defaults to 1.0.

classattribute tolerance#

type: luigi.FloatParameter

Number of failed tasks to still consider the workflow successful. Values larger than one are interpreted as absolute numbers, and as fractions otherwise. Defaults to 0.0.

classattribute branch#

type: luigi.IntParameter

The branch number to run this task for. -1 means that this task is the actual workflow, rather than a branch task. Defaults to -1.

classattribute branches#

type: law.MultiRangeParameter

Explicit list of branches or branch ranges to process. Empty default value.

classattribute workflow_proxy_cls#

type: BaseWorkflowProxy

Reference to the workflow proxy class associated to this workflow.

classattribute output_collection_cls#

type: law.TargetCollection

Configurable target collection class to use, such as target.collection.TargetCollection, target.collection.FileCollection or target.collection.SiblingFileCollection.

classattribute force_contiguous_branches#

type: bool

Flag that denotes if this workflow is forced to use contiguous branch numbers, starting from 0. If False, an exception is raised otherwise.

classattribute reset_branch_map_before_run#

type: bool

Flag that denotes whether the branch map should be recreated from scratch before the run method of the underlying workflow proxy is called.

classattribute create_branch_map_before_repr#

type: bool

Flag that denotes whether the branch map should be created (if not already done) before the task representation is created via repr().

classattribute cache_workflow_requirements#

type: bool

Whether workflow requirements should be evaluated only cached and cached afterwards in the _cached_workflow_requirements attribute. Defaults to False.

classattribute cache_branch_map_default#

type: bool

The initial default value of the cache_branch_map attribute that decides whether the branch map be created only once and then cached in the _branch_map attribute. Defaults to True.

classattribute workflow_run_decorators#

type: sequence, None

Sequence of decorator functions that will be conveniently used to decorate the workflow proxy’s run method. This way, there is no need to subclass and reset the workflow_proxy_cls just to add a decorator. The value is None by default.

workflow_cls#

type: law.Register

Reference to the class of the realized workflow. This is especially helpful in case your derived class inherits from multiple workflows.

workflow_proxy#

type: BaseWorkflowProxy

Reference to the underlying workflow proxy instance.

branch_map#

type: dict (read-only)

Shorthand for get_branch_map().

branch_data#

type: any (read-only)

Shorthand for self.branch_map[self.branch].

workflow_proxy_cls#

alias of BaseWorkflowProxy

classmethod modify_param_values(params)[source]#

Hook to modify command line arguments before instances of this class are created.

classmethod req_different_branching(inst, **kwargs)[source]#

Variation of Task.req() that should be used when defining requirements between workflows that implement a different branch granularity (e.g. task B with 10 branches requires task A with 2 branches). The only difference to the base method is that workflow specific parameters such as branches or tolerance are automatically skipped when not added explicitly in kwargs.

is_branch()[source]#

Returns whether or not this task refers to a branch.

is_workflow()[source]#

Returns whether or not this task refers to the workflow.

as_branch(branch=None)[source]#

When this task refers to the workflow, a re-instantiated task with identical parameters and a certain branch value, defaulting to 0, is returned. When this task is already a branch task, the task itself is returned when branch is None or matches this task’s branch value. Otherwise, a new branch task with that value and identical parameters is created and returned.

as_workflow()[source]#

When this task refers to a branch task, a re-instantiated task with branch=-1 and identical parameters is returned. Otherwise, the workflow itself is returned.

abstract create_branch_map()[source]#

Abstract method that must be overwritten by inheriting tasks to define the branch map.

get_branch_map(reset_boundaries=True, reduce_branches=True)[source]#

Creates and returns the branch map defined in create_branch_map(). If reset_boundaries is True, the branch numbers and ranges defined in branches are rearranged to not exceed the actual branch map length. If reduce_branches is True, the branch map is additionally filtered accordingly. The branch map is cached internally.

get_branch_tasks()[source]#

Returns a dictionary that maps branch numbers to instantiated branch tasks. As this might be computationally intensive, the return value is cached.

get_branch_chunks(chunk_size)[source]#

Returns a list of chunks of branch numbers defined in this workflow with a certain chunk_size. Example:

wf = SomeWorkflowTask()  # has 8 branches
print(wf.get_branch_chunks(3))
# -> [[0, 1, 2], [3, 4, 5], [6, 7]]

wf2 = SomeWorkflowTask(branches=[(0, 5)])  # has 5 branches
print(wf2.get_branch_chunks(3))
# -> [[0, 1, 2], [3, 4]]
get_all_branch_chunks(chunk_size, **kwargs)[source]#

Returns a list of chunks of all branch numbers of this workflow (i.e. without branches parameters applied) with a certain chunk_size. Internally, a new instance of this workflow is created using BaseTask.req(), forwarding all kwargs, with _exclude parameters extended by {"branches"} in order to use all possible branch values. Example:

wf = SomeWorkflowTask()  # has 8 branches
print(wf.get_all_branch_chunks(3))
# -> [[0, 1, 2], [3, 4, 5], [6, 7]]

wf2 = SomeWorkflowTask(branches=[(0, 5)])  # has 5 branches
print(wf2.get_all_branch_chunks(3))
# -> [[0, 1, 2], [3, 4, 5], [6, 7]]
get_branches_repr(max_ranges=10)[source]#

Creates a string representation of the selected branches that can be used as a readable description or postfix in output paths. When the branches of this workflow are configured via the branches parameter, and there are more than max_ranges identified ranges, the string will contain a unique hash describing those ranges.

workflow_complete()[source]#

Hook to define the completeness status of the workflow.

workflow_requires()[source]#

Hook to add workflow requirements. This method is expected to return a dictionary. When this method is called from a branch task, an exception is raised.

workflow_input()[source]#

Returns the output targets of all workflow requirements, comparable to the normal input() method of plain tasks.

requires_from_branch()[source]#

Returns the requirements defined in the standard requires() method, but called in the context of the workflow. This method is only recommended in case all required tasks that would normally take a branch number, are intended to be instantiated with branch=-1. When this method is called from a branch task, an exception is raised.

handle_scheduler_message(msg)[source]#

Hook that is called when a scheduler message msg is received. Returns True when the messages was handled, and False otherwise.

Handled messages:

  • tolerance = <int/float>

  • acceptance = <int/float>

Class BaseWorkflowProxy#

class BaseWorkflowProxy(*args, **kwargs)[source]#

Bases: ProxyTask

Base class of all workflow proxies.

classattribute workflow_type#

type: string

The named type of the workflow. This attribute refers to the value of the --workflow parameter on the command line to select a particular workflow.

task#

type: Task

Reference to the actual workflow task.

complete()[source]#

Custom completion check that invokes the task’s workflow_complete method and if it returns anything else than NotImplemented returns the value, or just does the default completion check otherwise.

requires()[source]#

Returns the default workflow requirements in an ordered dictionary, which is updated with the return value of the task’s workflow_requires method.

output()[source]#

Returns the default workflow outputs in an ordered dictionary. At the moment this is just the collection of outputs of the branch tasks, stored with the key "collection".

threshold(n=None)[source]#

Returns the threshold number of tasks that need to be complete in order to consider the workflow as being complete itself. This takes into account the law.BaseWorkflow.acceptance parameter of the workflow. The threshold is passed to the law.TargetCollection (or law.SiblingFileCollection) within output(). By default, the maximum number of tasks is taken from the length of the branch map. For performance purposes, you can set this value, n, directly.

run()[source]#

Default run implementation that resets the branch map once if requested.

Functions#

dynamic_workflow_condition(condition_fn=None, create_branch_map_fn=None, requires_fn=None, output_fn=None, condition_as_workflow=False, cache_met_condition=True)[source]#

Decorator factory that is meant to wrap a workflow methods that defines a dynamic workflow condition, returning a DynamicWorkflowCondition instance.

workflow_property(func=None, attr=None, setter=True, cache=False, empty_value=law.util.no_value)[source]#

Decorator to declare an attribute that is stored only on a workflow and optionally cached for subsequent calls. Therefore, the decorated method is expected to (lazily) provide the value to cache if enabled. When the value is equal to empty_value, it is not cached and the next access to the property will invoke the decorated method again. The resulting value is stored as either _workflow_<func.__name__> or _workflow_cached_<func.__name__> on the workflow. By default, a setter is provded to overwrite the the attribute. Set setter to False to disable this feature. Example:

class MyTask(Workflow):

    @workflow_property
    def common_data(self):
        # this method is always called with *self* being the *workflow*
        return some_demanding_computation()

    @workflow_property(attr="my_own_property", setter=False, cache=True)
    def common_data2(self):
        return some_other_computation()