Source code for law.cli.run

# coding: utf-8

"""
"law run" cli subprogram.
"""

from __future__ import annotations

import os
import sys
import pathlib
import argparse

from law.config import Config
from law.task.base import Task
from law.util import abort
from law.logger import get_logger


logger = get_logger(__name__)


[docs] def setup_parser(sub_parsers: argparse._SubParsersAction) -> None: """ Sets up the command line parser for the *run* subprogram and adds it to *sub_parsers*. """ parser = sub_parsers.add_parser( "run", prog="law run", description="Run a task with configurable parameters. See " "http://luigi.rtfd.io/en/stable/running_luigi.html for more info.", ) parser.add_argument( "task_family", help="a task family registered in the task index file or a module and task class in the " "format <module>.<class>", ) parser.add_argument( "parameter", nargs="*", help="task parameters", )
[docs] def execute(args: argparse.Namespace, argv: list[str]) -> int: """ Executes the *run* subprogram with parsed commandline *args*. """ task_family = None error = None # try to infer the task module from the passed task family and import it parts = args.task_family.rsplit(".", 1) if len(parts) == 2: modid, cls_name = parts try: mod = __import__(modid, globals(), locals(), [cls_name]) task_cls = getattr(mod, cls_name, None) if task_cls is not None: if not issubclass(task_cls, Task): return abort(f"object '{args.task_family}' is not a Task") task_family = task_cls.get_task_family() except ImportError as e: # distinguish import errors resulting from an unknown modid from all other cases modid_parts = modid.lower().split(".") modid_unknown = any( str(e).lower() == f"no module named '{'.'.join(modid_parts[:i + 1])}'" for i in range(len(modid_parts)) ) if not modid_unknown: raise # keep the error in case the task family cannot be inferred from the index file error = e # read task info from the index file and import it if task_family is None: cfg = Config.instance() index_file = cfg.get_expanded("core", "index_file") if os.path.exists(index_file): info = read_task_from_index(args.task_family, index_file) if info is None: return abort(f"task family '{args.task_family}' not found in index") modid, task_family, _ = info __import__(modid, globals(), locals()) # complain when no task could be found if task_family is None: if error: raise error return abort(f"task '{args.task_family}' not found") # run luigi from luigi.cmdline import luigi_run # type: ignore[import-untyped] sys.argv[0] += " run" success = luigi_run([task_family] + argv[3:]) return 0 if success else 1
[docs] def read_task_from_index( task_family: str, index_file: str | pathlib.Path | None = None, ) -> tuple[str, str, str] | None: """ Returns module id, task family and space-separated parameters in a tuple for a task given by *task_family* from the *index_file*. When *None*, the *index_file* refers to the default as defined in :py:mod:`law.config`. Returns *None* when the task could not be found. """ # read task information from the index file given a task family if index_file is None: cfg = Config.instance() index_file = cfg.get_expanded("core", "index_file") # open and go through lines with open(index_file, "r") as f: for line in f.readlines(): line = line.strip() if line.count(":") >= 2: modid, family, params = line.split(":", 2) if family == task_family: return modid, family, params return None