# coding: utf-8
"""
"law index" cli subprogram.
"""
import os
import sys
import traceback
from importlib import import_module
from collections import OrderedDict
import luigi
import six
from law.config import Config
from law.task.base import Task, ExternalTask
from law.util import multi_match, colored, abort, makedirs, brace_expand
from law.logger import get_logger
logger = get_logger(__name__)
_cfg = Config.instance()
[docs]def setup_parser(sub_parsers):
"""
Sets up the command line parser for the *index* subprogram and adds it to *sub_parsers*.
"""
parser = sub_parsers.add_parser(
"index",
prog="law index",
description="Create or update the (human-readable) law task index file ({}). This is only "
"required for the shell auto-completion.".format(_cfg.get_expanded("core", "index_file")),
)
parser.add_argument(
"--modules",
"-m",
nargs="+",
help="additional modules to traverse",
)
parser.add_argument(
"--no-externals",
"-e",
action="store_true",
help="skip external tasks",
)
parser.add_argument(
"--remove",
"-r",
action="store_true",
help="remove the index file and exit",
)
parser.add_argument(
"--show",
"-s",
action="store_true",
help="print the content of the index file and exit",
)
parser.add_argument(
"--location",
"-l",
action="store_true",
help="print the location of the index file and exit",
)
parser.add_argument(
"--quiet",
"-q",
action="store_true",
help="quiet mode without output",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="verbose output, disables the quiet mode when set",
)
[docs]def execute(args):
"""
Executes the *index* subprogram with parsed commandline *args*.
"""
# update args
if args.verbose:
args.quiet = False
cfg = Config.instance()
index_file = cfg.get_expanded("core", "index_file")
# just print the file location?
if args.location:
print(index_file)
return
# just show the file content?
if args.show:
if os.path.exists(index_file):
with open(index_file, "r") as f:
print(f.read())
return
else:
abort("index file {} does not exist".format(index_file))
# just remove the index file?
if args.remove:
if os.path.exists(index_file):
os.remove(index_file)
print("removed index file {}".format(index_file))
return
# get modules to lookup
lookup = [m.strip() for m in cfg.options("modules")]
if args.modules:
lookup += args.modules
# expand braces
lookup = sum(map(brace_expand, lookup), [])
if not args.quiet:
print("indexing tasks in {} module(s)".format(len(lookup)))
exit_code = 0
# loop through modules, import everything to load tasks
for modid in lookup:
if not modid:
continue
if args.verbose:
sys.stdout.write("loading module '{}'".format(modid))
try:
import_module(modid)
except Exception as e:
exit_code += 1
if not args.verbose:
print("error in module '{}': {}".format(colored(modid, "red"), str(e)))
else:
print("\n\nerror in module '{}':".format(colored(modid, "red")))
traceback.print_exc()
continue
if args.verbose:
print(", {}".format(colored("done", style="bright")))
# determine tasks to write into the index file
seen_families = []
task_classes = []
lookup = [Task]
while lookup:
cls = lookup.pop(0)
lookup.extend(cls.__subclasses__())
# skip tasks in __main__ module in interactive sessions
if cls.__module__ == "__main__":
continue
# skip when explicitly excluded
if cls.exclude_index:
continue
# skip external tasks
is_external_task = issubclass(cls, ExternalTask)
if args.no_externals and is_external_task:
continue
# skip non-external tasks without run implementation
run_is_callable = callable(getattr(cls, "run", None))
run_is_abstract = getattr(cls.run, "__isabstractmethod__", False)
if not is_external_task and (not run_is_callable or run_is_abstract):
continue
# show an error when there is a "-" in the task family as the luigi command line parser will
# automatically map it to "_", i.e., it will fail to lookup the actual task class
# skip the task
task_family = cls.get_task_family()
if "-" in task_family:
logger.critical(
"skipping task '{}' as its family '{}' contains a '-' which cannot be interpreted "
"by luigi's command line parser, please use '_' or alike".format(cls, task_family),
)
continue
# show an error when there is a "_" after a "." in the task family, i.e., when there is a
# "_" in the class name (which is bad python practice anyway), as the shell autocompletion
# is not able to decide whether it should complete the task family or a task-level parameter
# skip the task
if "_" in task_family.rsplit(".", 1)[-1]:
logger.error(
"skipping task '{}' as its family '{}' contains a '_' after the namespace "
"definition which would lead to ambiguities between task families and task-level "
"parameters in the law shell autocompletion".format(cls, task_family),
)
continue
# skip already seen task families and warn when the class is not added yet in the classes to
# index as this is usually a sign of multiple definitions of the same task, e.g. through
# imports of the same physical file via different module ids
if task_family in seen_families:
if cls not in task_classes:
logger.error(
"skipping task '{}' as a task with the same family '{}' but a different "
"different address was already seen; this is likely due to multiple imports of "
"the same physical file through different module ids and since it is no longer "
"unique, luigi's task lookup will probably fail".format(cls, task_family),
)
continue
seen_families.append(task_family)
task_classes.append(cls)
def get_task_params(cls):
params = []
for attr in dir(cls):
member = getattr(cls, attr)
if isinstance(member, luigi.Parameter):
exclude = getattr(cls, "exclude_params_index", set())
if not multi_match(attr, exclude, any):
params.append(attr.replace("_", "-"))
return params
def index_line(cls, params):
# format: "module_id:task_family:param param ..."
return "{}:{}:{}".format(cls.__module__, cls.get_task_family(), " ".join(params))
stats = OrderedDict()
# write the index file
makedirs(os.path.dirname(index_file))
with open(index_file, "w") as f:
for cls in task_classes:
# get prams
params = get_task_params(cls)
# fill stats
if cls.__module__ not in stats:
stats[cls.__module__] = []
stats[cls.__module__].append((cls.get_task_family(), params))
f.write(index_line(cls, params) + "\n")
# print stats
if args.verbose:
for mod, data in six.iteritems(stats):
print("\nmodule '{}', {} task(s):".format(colored(mod, style="bright"), len(data)))
for task_family, _ in data:
print(" - {}".format(colored(task_family, "green")))
print("")
if not args.quiet:
print("written {} task(s) to index file '{}'".format(len(task_classes), index_file))
return exit_code
[docs]def get_global_parameters(config_names=("core", "scheduler", "worker", "retcode")):
"""
Returns a list of global, luigi-internal configuration parameters. Each list item is a 4-tuple
containing the configuration class, the parameter instance, the parameter name, and the full
parameter name in the cli. When *config_names* is set, it should be a list of configuration
class names that are exclusively taken into account.
"""
params = []
for cls in luigi.task.Config.__subclasses__():
if config_names and cls.__name__ not in config_names:
continue
for attr in dir(cls):
param = getattr(cls, attr)
if not isinstance(param, luigi.Parameter):
continue
full_name = attr.replace("_", "-")
if getattr(cls, "use_cmdline_section", True):
full_name = "{}-{}".format(cls.__name__.replace("_", "-"), full_name)
params.append((cls, param, attr, full_name))
return params