Source code for law.workflow.local
# coding: utf-8
"""
Local workflow implementation.
"""
__all__ = ["LocalWorkflow"]
from collections.abc import Generator
import luigi
from law import luigi_version_info
from law.workflow.base import BaseWorkflow, BaseWorkflowProxy
from law.util import DotDict
[docs]class LocalWorkflowProxy(BaseWorkflowProxy):
"""
Workflow proxy class for the local workflow implementation. The workflow type is ``"local"``.
"""
workflow_type = "local"
def __init__(self, *args, **kwargs):
super(LocalWorkflowProxy, self).__init__(*args, **kwargs)
self._local_workflow_has_yielded = False
[docs] def requires(self):
reqs = super(LocalWorkflowProxy, self).requires()
local_reqs = self.task.local_workflow_requires()
if local_reqs:
reqs.update(local_reqs)
# when local_workflow_require_branches is True, add all branch tasks as dependencies
if self.task.local_workflow_require_branches:
reqs["branches"] = self.task.get_branch_tasks()
return reqs
[docs] def run(self):
"""
When *local_workflow_require_branches* of the task was set to *False*, starts all branch
tasks via dynamic dependencies by yielding them in a list, or simply does nothing otherwise.
"""
pre_run_gen = self.task.local_workflow_pre_run()
if isinstance(pre_run_gen, Generator):
yield pre_run_gen
super(LocalWorkflowProxy, self).run()
if not self.task.local_workflow_require_branches and not self._local_workflow_has_yielded:
self._local_workflow_has_yielded = True
# use branch tasks as requirements
reqs = list(self.task.get_branch_tasks().values())
# wrap into DynamicRequirements when available, otherwise just yield the list
if luigi_version_info[:3] >= (3, 1, 2):
yield luigi.DynamicRequirements(reqs, lambda complete_fn: complete_fn(self))
else:
yield reqs
[docs]class LocalWorkflow(BaseWorkflow):
"""
Local workflow implementation. The workflow type is ``"local"``. There are two ways how a local
workflow starts its branch tasks. See the :py:attr:`local_workflow_require_branches` attribute
for more information.
Since local workflows trigger their branch tasks via requirements or dynamic dependencies, their
run methods do not support decorators. See :py:attr:`BaseWorkflow.workflow_run_decorators` for
more info.
.. py:classattribute:: workflow_proxy_cls
type: :py:class:`BaseWorkflowProxy`
Reference to the :py:class:`LocalWorkflowProxy` class.
.. py:classattribute:: local_workflow_require_branches
type: bool
When *True*, the workflow will require its branch tasks within
:py:meth:`LocalWorkflowProxy.requires` so that the execution of the workflow indirectly
starts all branch tasks. When *False*, the workflow uses dynamic dependencies by yielding
its branch tasks within its own run method.
"""
workflow_proxy_cls = LocalWorkflowProxy
local_workflow_require_branches = False
exclude_index = True
def local_workflow_requires(self):
return DotDict()
def local_workflow_pre_run(self):
return