law.workflow.base#
Workflow and workflow proxy base class definitions.
Class BaseWorkflow#
- class BaseWorkflow(*args, **kwargs)[source]#
Bases:
ProxyAttributeTaskBase class of all workflows.
- classattribute workflow#
type:
luigi.ParameterWorkflow type that refers to the workflow proxy implementation at instantiation / execution time. Empty default value.
- classattribute acceptance#
type:
luigi.FloatParameterNumber of complete tasks to consider the workflow successful. Values larger than one are interpreted as absolute numbers, and as fractions otherwise. Defaults to 1.0.
- classattribute tolerance#
type:
luigi.FloatParameterNumber of failed tasks to still consider the workflow successful. Values larger than one are interpreted as absolute numbers, and as fractions otherwise. Defaults to 0.0.
- classattribute branch#
type:
luigi.IntParameterThe branch number to run this task for. -1 means that this task is the actual workflow, rather than a branch task. Defaults to -1.
- classattribute branches#
type:
law.MultiRangeParameterExplicit list of branches or branch ranges to process. Empty default value.
- classattribute workflow_proxy_cls#
type:
BaseWorkflowProxyReference to the workflow proxy class associated to this workflow.
- classattribute output_collection_cls#
type:
law.TargetCollectionConfigurable target collection class to use, such as
target.collection.TargetCollection,target.collection.FileCollectionortarget.collection.SiblingFileCollection.
- classattribute force_contiguous_branches#
type: bool
Flag that denotes if this workflow is forced to use contiguous branch numbers, starting from 0. If False, an exception is raised otherwise.
- classattribute reset_branch_map_before_run#
type: bool
Flag that denotes whether the branch map should be recreated from scratch before the run method of the underlying workflow proxy is called.
- classattribute create_branch_map_before_repr#
type: bool
Flag that denotes whether the branch map should be created (if not already done) before the task representation is created via
repr().
- classattribute cache_workflow_requirements#
type: bool
Whether workflow requirements should be evaluated only cached and cached afterwards in the
_cached_workflow_requirementsattribute. Defaults to False.
- classattribute cache_branch_map_default#
type: bool
The initial default value of the
cache_branch_mapattribute that decides whether the branch map be created only once and then cached in the_branch_mapattribute. Defaults to True.
- classattribute workflow_run_decorators#
type: sequence, None
Sequence of decorator functions that will be conveniently used to decorate the workflow proxy’s run method. This way, there is no need to subclass and reset the
workflow_proxy_clsjust to add a decorator. The value is None by default.
- workflow_cls#
type:
law.RegisterReference to the class of the realized workflow. This is especially helpful in case your derived class inherits from multiple workflows.
- workflow_proxy#
type:
BaseWorkflowProxyReference to the underlying workflow proxy instance.
- branch_map#
type: dict (read-only)
Shorthand for
get_branch_map().
- branch_data#
type: any (read-only)
Shorthand for
self.branch_map[self.branch].
- workflow_proxy_cls#
alias of
BaseWorkflowProxy
- classmethod modify_param_values(params: dict[str, Any]) dict[str, Any][source]#
Hook to modify command line arguments before instances of this class are created.
- classmethod req_different_branching(inst: Task, **kwargs) BaseWorkflow[source]#
Variation of
Task.req()that should be used when defining requirements between workflows that implement a different branch granularity (e.g. task B with 10 branches requires task A with 2 branches). The only difference to the base method is that workflow specific parameters such as branches or tolerance are automatically skipped when not added explicitly in kwargs.
- as_branch(branch: int | None = None, **kwargs) BaseWorkflow[source]#
When this task refers to the workflow, a re-instantiated task with identical parameters and a certain branch value, defaulting to 0, is returned. When this task is already a branch task, the task itself is returned when branch is None or matches this task’s branch value. Otherwise, a new branch task with that value, receiving all kwargs and otherwise identical parameters is created and returned.
- as_workflow(**kwargs) BaseWorkflow[source]#
When this task refers to a branch task, a re-instantiated task with
branch=-1and identical parameters is returned. Otherwise, the workflow itself is returned. All kwargs are passed toreq_workflow().
- abstractmethod create_branch_map() dict[int, Any][source]#
Abstract method that must be overwritten by inheriting tasks to define the branch map.
- get_branch_map(reset_boundaries: bool = True, reduce_branches: bool = True) dict[int, Any][source]#
Creates and returns the branch map defined in
create_branch_map(). If reset_boundaries is True, the branch numbers and ranges defined inbranchesare rearranged to not exceed the actual branch map length. If reduce_branches is True, the branch map is additionally filtered accordingly. The branch map is cached internally.
- get_branch_tasks(**kwargs) dict[int, BaseWorkflow][source]#
Returns a dictionary that maps branch numbers to instantiated branch tasks. As this might be computationally intensive, the return value is cached. For the first initialization, all kwargs are passed to
as_branch()for each created branch task.
- get_branch_chunks(chunk_size: int) list[list[int]][source]#
Returns a list of chunks of branch numbers defined in this workflow with a certain chunk_size. Example:
wf = SomeWorkflowTask() # has 8 branches print(wf.get_branch_chunks(3)) # -> [[0, 1, 2], [3, 4, 5], [6, 7]] wf2 = SomeWorkflowTask(branches=[(0, 5)]) # has 5 branches print(wf2.get_branch_chunks(3)) # -> [[0, 1, 2], [3, 4]]
- get_all_branch_chunks(chunk_size: int, **kwargs) list[list[int]][source]#
Returns a list of chunks of all branch numbers of this workflow (i.e. without branches parameters applied) with a certain chunk_size. Internally, a new instance of this workflow is created using
BaseTask.req(), forwarding all kwargs, with _exclude parameters extended by{"branches"}in order to use all possible branch values. Example:wf = SomeWorkflowTask() # has 8 branches print(wf.get_all_branch_chunks(3)) # -> [[0, 1, 2], [3, 4, 5], [6, 7]] wf2 = SomeWorkflowTask(branches=[(0, 5)]) # has 5 branches print(wf2.get_all_branch_chunks(3)) # -> [[0, 1, 2], [3, 4, 5], [6, 7]]
- get_branches_repr(max_ranges: int = 10) str[source]#
Creates a string representation of the selected branches that can be used as a readable description or postfix in output paths. When the branches of this workflow are configured via the branches parameter, and there are more than max_ranges identified ranges, the string will contain a unique hash describing those ranges.
- workflow_requires() Any[source]#
Hook to add workflow requirements. This method is expected to return a dictionary. When this method is called from a branch task, an exception is raised.
- workflow_input() Any[source]#
Returns the output targets of all workflow requirements, comparable to the normal
input()method of plain tasks.
- requires_from_branch() Any[source]#
Returns the requirements defined in the standard
requires()method, but called in the context of the workflow. This method is only recommended in case all required tasks that would normally take a branch number, are intended to be instantiated withbranch=-1. When this method is called from a branch task, an exception is raised.
Class BaseWorkflowProxy#
- class BaseWorkflowProxy(*args, **kwargs)[source]#
Bases:
ProxyTaskBase class of all workflow proxies.
- classattribute workflow_type#
type: string
The named type of the workflow. This attribute refers to the value of the
--workflowparameter on the command line to select a particular workflow.
- task#
type: Task
Reference to the actual workflow task.
- complete() bool[source]#
Custom completion check that invokes the task’s workflow_complete method and if it returns anything else than NotImplemented returns the value, or just does the default completion check otherwise.
- requires() Any[source]#
Returns the default workflow requirements in an ordered dictionary, which is updated with the return value of the task’s workflow_requires method.
- output() dict[str, Any][source]#
Returns the default workflow outputs in an ordered dictionary. At the moment this is just the collection of outputs of the branch tasks, stored with the key
"collection".
- get_cached_output(update: bool = False) dict[str, Any][source]#
If already cached, returns the previously computed output, and otherwise computes it via
output()and caches it for subsequent calls, ifcache_brach_mapof the task is True.
- threshold(n: int | None = None) float | int[source]#
Returns the threshold number of tasks that need to be complete in order to consider the workflow as being complete itself. This takes into account the
law.BaseWorkflow.acceptanceparameter of the workflow. The threshold is passed to thelaw.TargetCollection(orlaw.SiblingFileCollection) withinoutput(). By default, the maximum number of tasks is taken from the length of the branch map. For performance purposes, you can set this value, n, directly.
Functions#
- dynamic_workflow_condition(condition_fn: Callable[[], bool] | None = None, create_branch_map_fn: Callable[[], Any] | None = None, requires_fn: Callable[[], Any] | None = None, requires_eager_fn: Callable[[], Any] | None = None, output_fn: Callable[[], Any] | None = None, condition_as_workflow: bool = False, cache_met_condition: bool = True) Callable[[Callable[[], bool]], DynamicWorkflowCondition] | DynamicWorkflowCondition[source]#
Decorator factory that is meant to wrap a workflow methods that defines a dynamic workflow condition, returning a
DynamicWorkflowConditioninstance.
- workflow_property(func: Callable | None = None, attr: str | None = None, setter: bool = True, cache: bool = False, empty_value: Any | NoValue = law.util.no_value) Callable[source]#
Decorator to declare an attribute that is stored only on a workflow and optionally cached for subsequent calls. Therefore, the decorated method is expected to (lazily) provide the value to cache if enabled. When the value is equal to empty_value, it is not cached and the next access to the property will invoke the decorated method again. The resulting value is stored as either
_workflow_<func.__name__>or_workflow_cached_<func.__name__>on the workflow. By default, a setter is provded to overwrite the the attribute. Set setter to False to disable this feature. Example:class MyTask(Workflow): @workflow_property def common_data(self): # this method is always called with *self* being the *workflow* return some_demanding_computation() @workflow_property(attr="my_own_property", setter=False, cache=True) def common_data2(self): return some_other_computation()