Source code for law.cli.index

# coding: utf-8

"""
"law index" cli subprogram.
"""

from __future__ import annotations

import os
import sys
import traceback
import importlib
import argparse
import collections

import luigi  # type: ignore[import-untyped]

from law.config import Config
from law.task.base import Task, ExternalTask
from law.util import multi_match, colored, abort, makedirs, brace_expand
from law.logger import get_logger
from law._types import Sequence


logger = get_logger(__name__)

_cfg = Config.instance()


[docs] def setup_parser(sub_parsers: argparse._SubParsersAction) -> None: """ Sets up the command line parser for the *index* subprogram and adds it to *sub_parsers*. """ index_file = _cfg.get_expanded("core", "index_file") parser = sub_parsers.add_parser( "index", prog="law index", description=f"Create or update the (human-readable) law task index file ({index_file}). " "This is only required for the shell auto-completion.", ) parser.add_argument( "--modules", "-m", nargs="+", help="additional modules to traverse", ) parser.add_argument( "--no-externals", "-e", action="store_true", help="skip external tasks", ) parser.add_argument( "--remove", "-r", action="store_true", help="remove the index file and exit", ) parser.add_argument( "--show", "-s", action="store_true", help="print the content of the index file and exit", ) parser.add_argument( "--location", "-l", action="store_true", help="print the location of the index file and exit", ) parser.add_argument( "--quiet", "-q", action="store_true", help="quiet mode without output", ) parser.add_argument( "--verbose", "-v", action="store_true", help="verbose output, disables the quiet mode when set", )
[docs] def execute(args: argparse.Namespace) -> int: """ Executes the *index* subprogram with parsed commandline *args*. """ # update args if args.verbose: args.quiet = False cfg = Config.instance() index_file = cfg.get_expanded("core", "index_file") # just print the file location? if args.location: print(index_file) return 0 # just show the file content? if args.show: if os.path.exists(index_file): with open(index_file, "r") as f: print(f.read()) return 0 return abort(f"index file {index_file} does not exist") # just remove the index file? if args.remove: if os.path.exists(index_file): os.remove(index_file) print(f"removed index file {index_file}") return 0 # get modules to lookup lookup = [m.strip() for m in cfg.options("modules")] if args.modules: lookup += args.modules # expand braces lookup = sum(map(brace_expand, lookup), []) if not args.quiet: print(f"indexing tasks in {len(lookup)} module(s)") exit_code = 0 # loop through modules, import everything to load tasks for modid in lookup: if not modid: continue if args.verbose: sys.stdout.write(f"loading module '{modid}'") try: importlib.import_module(modid) except Exception as e: exit_code += 1 if not args.verbose: print(f"error in module '{colored(modid, 'red')}': {e}") else: print(f"\n\nerror in module '{colored(modid, 'red')}':") traceback.print_exc() continue if args.verbose: print(f", {colored('done', style='bright')}") # determine tasks to write into the index file seen_families = [] task_classes = [] q = collections.deque([Task]) while q: cls = q.popleft() lookup.extend(cls.__subclasses__()) # type: ignore[arg-type] # skip tasks starting with an underscore if cls.__name__.startswith("_"): continue # skip tasks in __main__ module in interactive sessions if cls.__module__ == "__main__": continue # skip when explicitly excluded if cls.exclude_index: continue # skip external tasks is_external_task = issubclass(cls, ExternalTask) if args.no_externals and is_external_task: continue # skip non-external tasks without run implementation run_is_callable = callable(getattr(cls, "run", None)) run_is_abstract = getattr(cls.run, "__isabstractmethod__", False) if not is_external_task and (not run_is_callable or run_is_abstract): continue # show an error when there is a "-" in the task family as the luigi command line parser will # automatically map it to "_", i.e., it will fail to lookup the actual task class # skip the task task_family = cls.get_task_family() if "-" in task_family: logger.critical( f"skipping task '{cls}' as its family '{task_family}' contains a '-' which cannot " "be interpreted by luigi's command line parser, please use '_' or alike", ) continue # show an error when there is a "_" after a "." in the task family, i.e., when there is a # "_" in the class name (which is bad python practice anyway), as the shell autocompletion # is not able to decide whether it should complete the task family or a task-level parameter # skip the task if "_" in task_family.rsplit(".", 1)[-1]: logger.error( f"skipping task '{cls}' as its family '{task_family}' contains a '_' after the " "namespace definition which would lead to ambiguities between task families and " "task-level parameters in the law shell autocompletion", ) continue # skip already seen task families and warn when the class is not added yet in the classes to # index as this is usually a sign of multiple definitions of the same task, e.g. through # imports of the same physical file via different module ids if task_family in seen_families: if cls not in task_classes: logger.error( f"skipping task '{cls}' as a task with the same family '{task_family}' but a " "different different address was already seen; this is likely due to multiple " "imports of the same physical file through different module ids and since it " "is no longer unique, luigi's task lookup will probably fail", ) continue seen_families.append(task_family) task_classes.append(cls) def get_task_params(cls) -> list[str]: params = [] for attr in dir(cls): member = getattr(cls, attr) if isinstance(member, luigi.Parameter): exclude: set[str] = getattr(cls, "exclude_params_index", set()) if not multi_match(attr, exclude, any): params.append(attr.replace("_", "-")) return params def index_line(cls, params): return f"{cls.__module__}:{cls.get_task_family()}:{' '.join(params)}" stats: dict[str, list[tuple[str, list[str]]]] = dict() # write the index file makedirs(os.path.dirname(index_file)) with open(index_file, "w") as f: for cls in task_classes: # get prams params = get_task_params(cls) # fill stats if cls.__module__ not in stats: stats[cls.__module__] = [] stats[cls.__module__].append((cls.get_task_family(), params)) f.write(index_line(cls, params) + "\n") # print stats if args.verbose: for mod, data in stats.items(): print(f"\nmodule '{colored(mod, style='bright')}', {len(data)} task(s):") for task_family, _ in data: print(f" - {colored(task_family, 'green')}") print("") if not args.quiet: print(f"written {len(task_classes)} task(s) to index file '{index_file}'") return exit_code
[docs] def get_global_parameters( config_names: Sequence[str] = ("core", "scheduler", "worker", "retcode"), ) -> list[tuple[type, luigi.Parameter, str, str]]: """ Returns a list of global, luigi-internal configuration parameters. Each list item is a 4-tuple containing the configuration class, the parameter instance, the parameter name, and the full parameter name in the cli. When *config_names* is set, it should be a list of configuration class names that are exclusively taken into account. """ params = [] for cls in luigi.task.Config.__subclasses__(): if config_names and cls.__name__ not in config_names: continue for attr in dir(cls): param = getattr(cls, attr) if not isinstance(param, luigi.Parameter): continue full_name = attr.replace("_", "-") if getattr(cls, "use_cmdline_section", True): full_name = f"{cls.__name__.replace('_', '-')}-{full_name}" params.append((cls, param, attr, full_name)) return params # type: ignore[return-value]