law.workflow.base#
Workflow and workflow proxy base class definitions.
Class BaseWorkflow
#
- class BaseWorkflow(*args, **kwargs)[source]#
Bases:
ProxyAttributeTask
Base class of all workflows.
- classattribute workflow#
type:
luigi.Parameter
Workflow type that refers to the workflow proxy implementation at instantiation / execution time. Empty default value.
- classattribute acceptance#
type:
luigi.FloatParameter
Number of complete tasks to consider the workflow successful. Values larger than one are interpreted as absolute numbers, and as fractions otherwise. Defaults to 1.0.
- classattribute tolerance#
type:
luigi.FloatParameter
Number of failed tasks to still consider the workflow successful. Values larger than one are interpreted as absolute numbers, and as fractions otherwise. Defaults to 0.0.
- classattribute branch#
type:
luigi.IntParameter
The branch number to run this task for. -1 means that this task is the actual workflow, rather than a branch task. Defaults to -1.
- classattribute branches#
type:
law.MultiRangeParameter
Explicit list of branches or branch ranges to process. Empty default value.
- classattribute workflow_proxy_cls#
type:
BaseWorkflowProxy
Reference to the workflow proxy class associated to this workflow.
- classattribute output_collection_cls#
type:
law.TargetCollection
Configurable target collection class to use, such as
target.collection.TargetCollection
,target.collection.FileCollection
ortarget.collection.SiblingFileCollection
.
- classattribute force_contiguous_branches#
type: bool
Flag that denotes if this workflow is forced to use contiguous branch numbers, starting from 0. If False, an exception is raised otherwise.
- classattribute reset_branch_map_before_run#
type: bool
Flag that denotes whether the branch map should be recreated from scratch before the run method of the underlying workflow proxy is called.
- classattribute create_branch_map_before_repr#
type: bool
Flag that denotes whether the branch map should be created (if not already done) before the task representation is created via
repr()
.
- classattribute cache_workflow_requirements#
type: bool
Whether workflow requirements should be evaluated only cached and cached afterwards in the
_cached_workflow_requirements
attribute. Defaults to False.
- classattribute cache_branch_map_default#
type: bool
The initial default value of the
cache_branch_map
attribute that decides whether the branch map be created only once and then cached in the_branch_map
attribute. Defaults to True.
- classattribute workflow_run_decorators#
type: sequence, None
Sequence of decorator functions that will be conveniently used to decorate the workflow proxy’s run method. This way, there is no need to subclass and reset the
workflow_proxy_cls
just to add a decorator. The value is None by default.
- workflow_cls#
type:
law.Register
Reference to the class of the realized workflow. This is especially helpful in case your derived class inherits from multiple workflows.
- workflow_proxy#
type:
BaseWorkflowProxy
Reference to the underlying workflow proxy instance.
- branch_map#
type: dict (read-only)
Shorthand for
get_branch_map()
.
- branch_data#
type: any (read-only)
Shorthand for
self.branch_map[self.branch]
.
- workflow_proxy_cls#
alias of
BaseWorkflowProxy
- classmethod modify_param_values(params)[source]#
Hook to modify command line arguments before instances of this class are created.
- classmethod req_different_branching(inst, **kwargs)[source]#
Variation of
Task.req()
that should be used when defining requirements between workflows that implement a different branch granularity (e.g. task B with 10 branches requires task A with 2 branches). The only difference to the base method is that workflow specific parameters such as branches or tolerance are automatically skipped when not added explicitly in kwargs.
- as_branch(branch=None)[source]#
When this task refers to the workflow, a re-instantiated task with identical parameters and a certain branch value, defaulting to 0, is returned. When this task is already a branch task, the task itself is returned when branch is None or matches this task’s branch value. Otherwise, a new branch task with that value and identical parameters is created and returned.
- as_workflow()[source]#
When this task refers to a branch task, a re-instantiated task with
branch=-1
and identical parameters is returned. Otherwise, the workflow itself is returned.
- abstract create_branch_map()[source]#
Abstract method that must be overwritten by inheriting tasks to define the branch map.
- get_branch_map(reset_boundaries=True, reduce_branches=True)[source]#
Creates and returns the branch map defined in
create_branch_map()
. If reset_boundaries is True, the branch numbers and ranges defined inbranches
are rearranged to not exceed the actual branch map length. If reduce_branches is True, the branch map is additionally filtered accordingly. The branch map is cached internally.
- get_branch_tasks()[source]#
Returns a dictionary that maps branch numbers to instantiated branch tasks. As this might be computationally intensive, the return value is cached.
- get_branch_chunks(chunk_size)[source]#
Returns a list of chunks of branch numbers defined in this workflow with a certain chunk_size. Example:
wf = SomeWorkflowTask() # has 8 branches print(wf.get_branch_chunks(3)) # -> [[0, 1, 2], [3, 4, 5], [6, 7]] wf2 = SomeWorkflowTask(branches=[(0, 5)]) # has 5 branches print(wf2.get_branch_chunks(3)) # -> [[0, 1, 2], [3, 4]]
- get_all_branch_chunks(chunk_size, **kwargs)[source]#
Returns a list of chunks of all branch numbers of this workflow (i.e. without branches parameters applied) with a certain chunk_size. Internally, a new instance of this workflow is created using
BaseTask.req()
, forwarding all kwargs, with _exclude parameters extended by{"branches"}
in order to use all possible branch values. Example:wf = SomeWorkflowTask() # has 8 branches print(wf.get_all_branch_chunks(3)) # -> [[0, 1, 2], [3, 4, 5], [6, 7]] wf2 = SomeWorkflowTask(branches=[(0, 5)]) # has 5 branches print(wf2.get_all_branch_chunks(3)) # -> [[0, 1, 2], [3, 4, 5], [6, 7]]
- get_branches_repr(max_ranges=10)[source]#
Creates a string representation of the selected branches that can be used as a readable description or postfix in output paths. When the branches of this workflow are configured via the branches parameter, and there are more than max_ranges identified ranges, the string will contain a unique hash describing those ranges.
- workflow_requires()[source]#
Hook to add workflow requirements. This method is expected to return a dictionary. When this method is called from a branch task, an exception is raised.
- workflow_input()[source]#
Returns the output targets of all workflow requirements, comparable to the normal
input()
method of plain tasks.
- requires_from_branch()[source]#
Returns the requirements defined in the standard
requires()
method, but called in the context of the workflow. This method is only recommended in case all required tasks that would normally take a branch number, are intended to be instantiated withbranch=-1
. When this method is called from a branch task, an exception is raised.
Class BaseWorkflowProxy
#
- class BaseWorkflowProxy(*args, **kwargs)[source]#
Bases:
ProxyTask
Base class of all workflow proxies.
- classattribute workflow_type#
type: string
The named type of the workflow. This attribute refers to the value of the
--workflow
parameter on the command line to select a particular workflow.
- task#
type: Task
Reference to the actual workflow task.
- complete()[source]#
Custom completion check that invokes the task’s workflow_complete method and if it returns anything else than NotImplemented returns the value, or just does the default completion check otherwise.
- requires()[source]#
Returns the default workflow requirements in an ordered dictionary, which is updated with the return value of the task’s workflow_requires method.
- output()[source]#
Returns the default workflow outputs in an ordered dictionary. At the moment this is just the collection of outputs of the branch tasks, stored with the key
"collection"
.
- threshold(n=None)[source]#
Returns the threshold number of tasks that need to be complete in order to consider the workflow as being complete itself. This takes into account the
law.BaseWorkflow.acceptance
parameter of the workflow. The threshold is passed to thelaw.TargetCollection
(orlaw.SiblingFileCollection
) withinoutput()
. By default, the maximum number of tasks is taken from the length of the branch map. For performance purposes, you can set this value, n, directly.
Functions#
- dynamic_workflow_condition(condition_fn=None, create_branch_map_fn=None, requires_fn=None, output_fn=None, condition_as_workflow=False, cache_met_condition=True)[source]#
Decorator factory that is meant to wrap a workflow methods that defines a dynamic workflow condition, returning a
DynamicWorkflowCondition
instance.
- workflow_property(func=None, attr=None, setter=True, cache=False, empty_value=law.util.no_value)[source]#
Decorator to declare an attribute that is stored only on a workflow and optionally cached for subsequent calls. Therefore, the decorated method is expected to (lazily) provide the value to cache if enabled. When the value is equal to empty_value, it is not cached and the next access to the property will invoke the decorated method again. The resulting value is stored as either
_workflow_<func.__name__>
or_workflow_cached_<func.__name__>
on the workflow. By default, a setter is provded to overwrite the the attribute. Set setter to False to disable this feature. Example:class MyTask(Workflow): @workflow_property def common_data(self): # this method is always called with *self* being the *workflow* return some_demanding_computation() @workflow_property(attr="my_own_property", setter=False, cache=True) def common_data2(self): return some_other_computation()