Source code for law.cli.run
# coding: utf-8
"""
"law run" cli subprogram.
"""
from __future__ import annotations
import os
import sys
import pathlib
import argparse
from law.config import Config
from law.task.base import Task
from law.util import abort
from law.logger import get_logger
logger = get_logger(__name__)
[docs]
def setup_parser(sub_parsers: argparse._SubParsersAction) -> None:
"""
Sets up the command line parser for the *run* subprogram and adds it to *sub_parsers*.
"""
parser = sub_parsers.add_parser(
"run",
prog="law run",
description="Run a task with configurable parameters. See "
"http://luigi.rtfd.io/en/stable/running_luigi.html for more info.",
)
parser.add_argument(
"task_family",
help="a task family registered in the task index file or a module and task class in the "
"format <module>.<class>",
)
parser.add_argument(
"parameter",
nargs="*",
help="task parameters",
)
[docs]
def execute(args: argparse.Namespace, argv: list[str]) -> int:
"""
Executes the *run* subprogram with parsed commandline *args*.
"""
task_family = None
error = None
# try to infer the task module from the passed task family and import it
parts = args.task_family.rsplit(".", 1)
if len(parts) == 2:
modid, cls_name = parts
try:
mod = __import__(modid, globals(), locals(), [cls_name])
task_cls = getattr(mod, cls_name, None)
if task_cls is not None:
if not issubclass(task_cls, Task):
return abort(f"object '{args.task_family}' is not a Task")
task_family = task_cls.get_task_family()
except ImportError as e:
# distinguish import errors resulting from an unknown modid from all other cases
modid_parts = modid.lower().split(".")
modid_unknown = any(
str(e).lower() == f"no module named '{'.'.join(modid_parts[:i + 1])}'"
for i in range(len(modid_parts))
)
if not modid_unknown:
raise
# keep the error in case the task family cannot be inferred from the index file
error = e
# read task info from the index file and import it
if task_family is None:
cfg = Config.instance()
index_file = cfg.get_expanded("core", "index_file")
if os.path.exists(index_file):
info = read_task_from_index(args.task_family, index_file)
if info is None:
return abort(f"task family '{args.task_family}' not found in index")
modid, task_family, _ = info
__import__(modid, globals(), locals())
# complain when no task could be found
if task_family is None:
if error:
raise error
return abort(f"task '{args.task_family}' not found")
# run luigi
from luigi.cmdline import luigi_run # type: ignore[import-untyped]
sys.argv[0] += " run"
success = luigi_run([task_family] + argv[3:])
return 0 if success else 1
[docs]
def read_task_from_index(
task_family: str,
index_file: str | pathlib.Path | None = None,
) -> tuple[str, str, str] | None:
"""
Returns module id, task family and space-separated parameters in a tuple for a task given by
*task_family* from the *index_file*. When *None*, the *index_file* refers to the default as
defined in :py:mod:`law.config`. Returns *None* when the task could not be found.
"""
# read task information from the index file given a task family
if index_file is None:
cfg = Config.instance()
index_file = cfg.get_expanded("core", "index_file")
# open and go through lines
with open(index_file, "r") as f:
for line in f.readlines():
line = line.strip()
if line.count(":") >= 2:
modid, family, params = line.split(":", 2)
if family == task_family:
return modid, family, params
return None