Source code for law.workflow.local

# coding: utf-8

"""
Local workflow implementation.
"""

__all__ = ["LocalWorkflow"]

from collections.abc import Generator

import luigi

from law import luigi_version_info
from law.workflow.base import BaseWorkflow, BaseWorkflowProxy
from law.target.collection import SiblingFileCollectionBase
from law.logger import get_logger
from law.util import mp_manager, DotDict


logger = get_logger(__name__)


[docs]class LocalWorkflowProxy(BaseWorkflowProxy): """ Workflow proxy class for the local workflow implementation. The workflow type is ``"local"``. """ workflow_type = "local" @property def _local_workflow_has_yielded(self): tasks_yielded = mp_manager.get("local_workflow_tasks_yielded", "dict") return self.live_task_id in tasks_yielded @_local_workflow_has_yielded.setter def _local_workflow_has_yielded(self, value): tasks_yielded = mp_manager.get("local_workflow_tasks_yielded", "dict") if value: tasks_yielded[self.live_task_id] = True else: tasks_yielded.pop(self.live_task_id, None)
[docs] def requires(self): reqs = super(LocalWorkflowProxy, self).requires() local_reqs = self.task.local_workflow_requires() if local_reqs: reqs.update(local_reqs) # when local_workflow_require_branches is True, add all branch tasks as dependencies if self.task.local_workflow_require_branches: reqs["branches"] = self.task.get_branch_tasks() return reqs
[docs] def run(self): """ When *local_workflow_require_branches* of the task was set to *False*, starts all branch tasks via dynamic dependencies by yielding them in a list, or simply does nothing otherwise. """ pre_run_gen = self.task.local_workflow_pre_run() if isinstance(pre_run_gen, Generator): yield pre_run_gen super_run_gen = super(LocalWorkflowProxy, self).run() if isinstance(super_run_gen, Generator): yield super_run_gen if not self.task.local_workflow_require_branches and not self._local_workflow_has_yielded: self._local_workflow_has_yielded = True # use branch tasks as requirements branch_tasks = self.task.get_branch_tasks() reqs = list(branch_tasks.values()) # helper to get the output collection get_col = lambda: self.get_cached_output().get("collection") # wrap into DynamicRequirements when available, otherwise just yield the list if luigi_version_info[:3] >= (3, 1, 2): # in case the workflows creates a sibling file collection, per-branch completion # checks are possible in advance and can be stored in luigi's completion cache def custom_complete(complete_fn): # get the cache (stored as a specified keyword of a partial'ed function) cache = getattr(complete_fn, "keywords", {}).get("completion_cache") if cache is None: if complete_fn(self): return True # show a warning for large workflows that use sibling file collections and # that could profit from the cache_task_completion feature if len(reqs) >= 100 and isinstance(get_col(), SiblingFileCollectionBase): url = "https://luigi.readthedocs.io/en/stable/configuration.html#worker" logger.warning_once( "cache_task_completion_hint", "detected SiblingFileCollection for LocalWorkflow with {} branches " "whose completness checks will be performed manually by luigi; " "consider enabling luigi's cache_task_completion feature to speed " "up these checks; fore more info, see {}".format(len(reqs), url), ) return False # the output collection must be a sibling file collection col = get_col() if not isinstance(col, SiblingFileCollectionBase): return complete_fn(self) # get existing branches and populate the cache with completeness states existing_branches = set(col.count(keys=True)[1]) for b, task in branch_tasks.items(): cache[task.task_id] = b in existing_branches # finally, evaluate the normal completeness check on the workflow return complete_fn(self) yield luigi.DynamicRequirements(reqs, custom_complete) else: # old, possibly slow behavior yield reqs
[docs]class LocalWorkflow(BaseWorkflow): """ Local workflow implementation. The workflow type is ``"local"``. There are two ways how a local workflow starts its branch tasks. See the :py:attr:`local_workflow_require_branches` attribute for more information. Since local workflows trigger their branch tasks via requirements or dynamic dependencies, their run methods do not support decorators. See :py:attr:`BaseWorkflow.workflow_run_decorators` for more info. .. py:classattribute:: workflow_proxy_cls type: :py:class:`BaseWorkflowProxy` Reference to the :py:class:`LocalWorkflowProxy` class. .. py:classattribute:: local_workflow_require_branches type: bool When *True*, the workflow will require its branch tasks within :py:meth:`LocalWorkflowProxy.requires` so that the execution of the workflow indirectly starts all branch tasks. When *False*, the workflow uses dynamic dependencies by yielding its branch tasks within its own run method. """ workflow_proxy_cls = LocalWorkflowProxy local_workflow_require_branches = False exclude_index = True def local_workflow_requires(self): return DotDict() def local_workflow_pre_run(self): return