# coding: utf-8
"""
CMS CRAB remote workflow implementation. See
https://twiki.cern.ch/twiki/bin/view/CMSPublic/SWGuideCrab.
"""
from __future__ import annotations
__all__ = ["CrabWorkflow"]
import uuid
import abc
import contextlib
import pathlib
from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy, JobData, PollData
from law.job.base import JobArguments, JobInputFile
from law.target.file import get_path, get_scheme, remove_scheme, FileSystemDirectoryTarget
from law.target.local import LocalDirectoryTarget, LocalFileTarget
from law.task.proxy import ProxyCommand
from law.util import no_value, law_src_path, merge_dicts, human_duration, DotDict, InsertableDict
from law.logger import get_logger
from law._types import Any, Type, Generator
from law.contrib.wlcg import get_vomsproxy_file, check_vomsproxy_validity, get_myproxy_info
from law.contrib.cms.job import CrabJobManager, CrabJobFileFactory
from law.contrib.cms.util import renew_vomsproxy, delegate_myproxy
logger = get_logger(__name__)
class CrabWorkflowProxy(BaseRemoteWorkflowProxy):
workflow_type: str = "crab"
# job script error codes are not transferred, so disable them
job_error_messages = {}
def create_job_manager(self, **kwargs) -> CrabJobManager:
return self.task.crab_create_job_manager(**kwargs) # type: ignore[attr-defined]
def setup_job_manager(self) -> dict[str, Any]:
cfg = Config.instance()
password_file = cfg.get_expanded("job", "crab_password_file")
# determine the proxy file first
proxy_file = get_vomsproxy_file()
# ensure a VOMS proxy exists
if not check_vomsproxy_validity():
renew_vomsproxy(proxy_file=proxy_file, password_file=password_file)
# ensure that it has been delegated to the myproxy server
info = get_myproxy_info(proxy_file=proxy_file, encode_username=True, silent=True)
delegate = False
if not info:
delegate = True
elif "username" not in info:
logger.warning("field 'username' not in myproxy info")
delegate = True
elif "timeleft" not in info:
logger.warning("field 'timeleft' not in myproxy info")
delegate = True
elif info["timeleft"] < 5 * 86400: # type: ignore[operator]
timeleft = human_duration(seconds=info["timeleft"])
logger.warning(f"myproxy lifetime below 5 days ({timeleft})")
delegate = True
# actual delegation
if delegate:
myproxy_username = delegate_myproxy(
proxy_file=proxy_file,
password_file=password_file,
encode_username=True,
)
else:
myproxy_username = info["username"] # type: ignore[index, assignment]
return {"proxy_file": proxy_file, "myproxy_username": myproxy_username}
def create_job_file_factory(self, **kwargs) -> CrabJobFileFactory:
return self.task.crab_create_job_file_factory(**kwargs) # type: ignore[attr-defined]
def create_job_file_group(
self,
submit_jobs: dict[int, list[int]],
) -> dict[str, str | pathlib.Path | CrabJobFileFactory.Config | None]:
task: CrabWorkflow = self.task # type: ignore[assignment]
# create the config
c = self.job_file_factory.get_config() # type: ignore[union-attr]
c.input_files = {}
c.output_files = []
c.render_variables = {}
c.custom_content = []
# get remote job file, force remote rendering
law_job_file = JobInputFile(task.crab_job_file())
law_job_file = JobInputFile(str(law_job_file.path), copy=False, render_job=True) # type: ignore[has-type]
c.executable = law_job_file
c.input_files["job_file"] = c.executable
# collect task parameters
exclude_args = (
task.exclude_params_branch |
task.exclude_params_workflow |
task.exclude_params_remote_workflow |
task.exclude_params_crab_workflow |
{"workflow", "effective_workflow"}
)
proxy_cmd = ProxyCommand(
task.as_branch(),
exclude_task_args=list(exclude_args),
exclude_global_args=["workers", f"{task.task_family}-*"],
)
if task.crab_use_local_scheduler():
proxy_cmd.add_arg("--local-scheduler", "True", overwrite=True)
for key, value in dict(task.crab_cmdline_args()).items():
proxy_cmd.add_arg(key, value, overwrite=True)
# job script arguments per job number
c.arguments = []
for job_num, branches in submit_jobs.items():
dashboard_data = None
if self.dashboard:
dashboard_data = self.dashboard.remote_hook_data(
job_num,
self.job_data.attempts.get(job_num, 0),
)
job_args = JobArguments(
task_cls=task.__class__,
task_params=proxy_cmd.build(skip_run=True),
branches=branches,
workers=task.job_workers, # type: ignore[arg-type]
auto_retry=False,
dashboard_data=dashboard_data,
)
c.arguments.append(job_args.join())
# add the work area
c.work_area = get_path(task.crab_work_area())
# add the request name
c.request_name = task.crab_request_name(submit_jobs).replace(".", "_")
# add the storage site and output base
stageout_location = task.crab_stageout_location()
if not isinstance(stageout_location, (list, tuple)) or len(stageout_location) != 2:
raise ValueError(
"the return value of crab_stageout_location() is expected to be a 2-tuple, got "
f"'{stageout_location}'",
)
c.storage_site, c.output_lfn_base = stageout_location
# add the bootstrap file
bootstrap_file = task.crab_bootstrap_file()
if bootstrap_file:
c.input_files["bootstrap_file"] = bootstrap_file
# add the stageout file
stageout_file = task.crab_stageout_file()
if stageout_file:
c.input_files["stageout_file"] = stageout_file
# does the dashboard have a hook file?
dashboard_file = self.dashboard.remote_hook_file() if self.dashboard else None
if dashboard_file:
c.input_files["dashboard_file"] = dashboard_file
# log file
if task.transfer_logs:
c.custom_log_file = "stdall.txt"
# task hook
c = task.crab_job_config(c, list(submit_jobs.keys()), list(submit_jobs.values())) # type: ignore[call-arg, arg-type] # noqa
# build the job file and get the sanitized config
job_file, c = self.job_file_factory(**c.__dict__) # type: ignore[misc]
# return job and log file entry
# (the latter is None but will be synced from query data)
return {"job": job_file, "config": c, "log": None}
def _status_error_pairs(self, job_num: int, job_data: JobData) -> InsertableDict:
pairs = super()._status_error_pairs(job_num, job_data)
# add site history
pairs.insert_before("log", "site history", job_data["extra"].get("site_history", no_value))
return pairs
def destination_info(self) -> InsertableDict:
info = super().destination_info()
info = self.task.crab_destination_info(info) # type: ignore[attr-defined]
return info
[docs]
class CrabWorkflow(BaseRemoteWorkflow):
workflow_proxy_cls = CrabWorkflowProxy
crab_workflow_run_decorators: list | None = None
crab_job_manager_defaults: dict | None = None
crab_job_file_factory_defaults: dict | None = None
crab_job_kwargs: list[str] = []
crab_job_kwargs_submit: dict | None = None
crab_job_kwargs_cancel: dict | None = None
crab_job_kwargs_cleanup: dict | None = None
crab_job_kwargs_query: dict | None = None
exclude_params_branch = set()
exclude_params_crab_workflow: set[str] = set()
exclude_index = True
[docs]
@abc.abstractmethod
def crab_stageout_location(self) -> tuple[str, str]:
"""
Hook to define both the "Site.storageSite" and "Data.outLFNDirBase" settings in a 2-tuple,
i.e., the name of the storage site to use and the base directory for crab's own output
staging. An example would be ``("T2_DE_DESY", "/store/user/...")``.
In case this is not used, the choice of the output base has no affect, but is still required
for crab's job submission to work.
"""
...
[docs]
@abc.abstractmethod
def crab_output_directory(self) -> FileSystemDirectoryTarget:
"""
Hook to define the location of submission output files, such as the json files containing
job data. This method should return a :py:class:`FileSystemDirectoryTarget`.
"""
...
[docs]
def crab_request_name(self, submit_jobs: dict[int, list[int]]) -> str:
"""
Returns a random name for a request, i.e., the project directory inside the crab job working
area.
"""
return f"{self.live_task_id}_{str(uuid.uuid4())[:8]}"
[docs]
def crab_work_area(self) -> str | LocalDirectoryTarget:
"""
Returns the location of the crab working area, defaulting to the value of
:py:meth:`crab_output_directory` in case it refers to a local directory. When *None*, the
value of the "job.crab_work_area" configuration options is used.
"""
# when job files are cleaned, try to use the output directory when local
if self.workflow_proxy.job_file_factory and self.workflow_proxy.job_file_factory.cleanup: # type: ignore[attr-defined] # noqa
out_dir = self.crab_output_directory()
# when local, return the directory
if isinstance(out_dir, LocalDirectoryTarget):
return out_dir
# when not a target and no remote scheme, return the directory
if (
not isinstance(out_dir, FileSystemDirectoryTarget) and
get_scheme(out_dir) in (None, "file")
):
return remove_scheme(out_dir)
# relative to the job file directory
return ""
[docs]
@contextlib.contextmanager
def crab_workflow_run_context(self) -> Generator[None, None, None]:
"""
Hook to provide a context manager in which the workflow run implementation is placed. This
can be helpful in situations where resurces should be acquired before and released after
running a workflow.
"""
yield
[docs]
def crab_workflow_requires(self) -> DotDict:
"""
Hook to define requirements for the workflow itself and that need to be resolved before any
submission can happen.
"""
return DotDict()
[docs]
def crab_job_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFile:
"""
Hook to return the location of the job file that is executed on job nodes.
"""
return JobInputFile(law_src_path("job", "law_job.sh"))
[docs]
def crab_bootstrap_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFile | None:
"""
Hook to define the location of an optional, so-called bootstrap file that is sent alongside
jobs and called prior to the actual job payload. It is meant to run a custom setup routine
in order for the payload to run successfully (e.g. software setup, data retrieval).
"""
return None
[docs]
def crab_stageout_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFile | None:
"""
Hook to define the location of an optional, so-called stageout file that is sent alongside
jobs and called after to the actual job payload. It is meant to run a custom output stageout
routine if required so by your workflow or target storage element.
"""
return None
[docs]
def crab_output_postfix(self) -> str:
"""
Hook to define the postfix of outputs, for instance such that workflows with different
parameters do not write their intermediate job status information into the same json file.
"""
return ""
[docs]
def crab_output_uri(self) -> str:
"""
Hook to return the URI of the remote crab output directory.
"""
return self.crab_output_directory().uri(return_all=False) # type: ignore[return-value]
[docs]
def crab_job_resources(self, job_num: int, branches: list[int]) -> dict[str, int]:
"""
Hook to define resources for a specific job with number *job_num*, processing *branches*.
This method should return a dictionary.
"""
return {}
[docs]
def crab_job_manager_cls(self) -> Type[CrabJobManager]:
"""
Hook to define a custom job managet class to use.
"""
return CrabJobManager
[docs]
def crab_create_job_manager(self, **kwargs) -> CrabJobManager:
"""
Hook to configure how the underlying job manager is instantiated and configured.
"""
kwargs = merge_dicts(self.crab_job_manager_defaults, kwargs)
return self.crab_job_manager_cls()(**kwargs)
[docs]
def crab_job_file_factory_cls(self) -> Type[CrabJobFileFactory]:
"""
Hook to define a custom job file factory class to use.
"""
return CrabJobFileFactory
[docs]
def crab_create_job_file_factory(self, **kwargs) -> CrabJobFileFactory:
"""
Hook to configure how the underlying job file factory is instantiated and configured.
"""
# get the file factory cls
factory_cls = self.crab_job_file_factory_cls()
# job file fectory config priority: kwargs > class defaults
kwargs = merge_dicts({}, self.crab_job_file_factory_defaults, kwargs)
# default mkdtemp value which might require task-level info
if kwargs.get("mkdtemp") is None:
cfg = Config.instance()
mkdtemp = cfg.get_expanded(
"job",
cfg.find_option("job", "crab_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
)
if isinstance(mkdtemp, str) and mkdtemp.lower() not in {"true", "false"}:
kwargs["mkdtemp"] = factory_cls._expand_template_path(
mkdtemp,
variables={"task_id": self.live_task_id, "task_family": self.task_family},
)
return factory_cls(**kwargs)
[docs]
def crab_job_config(
self,
config: CrabJobFileFactory.Config,
job_num: list[int],
branches: list[list[int]],
) -> CrabJobFileFactory.Config:
"""
Hook to inject custom settings into the job *config*, which is an instance of the
:py:attr:`Config` class defined inside the job manager.
"""
return config
[docs]
def crab_use_local_scheduler(self) -> bool:
"""
Whether remote jobs should use a local scheduler.
"""
return True
[docs]
def crab_post_submit_delay(self) -> float | int:
"""
Configurable delay in seconds to wait after submitting jobs and before starting the status
polling.
"""
return self.poll_interval * 60 # type: ignore[operator]
[docs]
def crab_check_job_completeness(self) -> bool:
"""
Hook to define whether after job report successful completion, the job manager should check
the completion status of the branch tasks run by the finished jobs.
"""
return False
[docs]
def crab_check_job_completeness_delay(self) -> float | int:
"""
Grace period before :py:meth:`crab_check_job_completeness` is called to ensure that output
files are accessible. Especially useful on distributed file systems with possibly
asynchronous behavior.
"""
return 0.0
[docs]
def crab_poll_callback(self, poll_data: PollData) -> None:
"""
Configurable callback that is called after each job status query and before potential
resubmission. It receives the variable polling attributes *poll_data* (:py:class:`PollData`)
that can be changed within this method.
If *False* is returned, the polling loop is gracefully terminated. Returning any other value
does not have any effect.
"""
return
[docs]
def crab_post_poll_callback(self, success: bool, duration: float | int) -> None:
"""
Configurable callback that is called after the polling loop has ended. It receives a boolean *success* that
indicates whether the job polling was successful, and the duration of the job polling in seconds.
"""
return
[docs]
def crab_cmdline_args(self) -> dict[str, str]:
"""
Hook to add additional cli parameters to "law run" commands executed on job nodes.
"""
return {}
[docs]
def crab_destination_info(self, info: InsertableDict) -> InsertableDict:
"""
Hook to add additional information behind each job status query line by extending an *info*
dictionary whose values will be shown separated by comma.
"""
return info