Source code for law.contrib.cms.job

# coding: utf-8

"""
Crab job manager and CMS-related job helpers.
"""

from __future__ import annotations

__all__ = ["CrabJobManager", "CrabJobFileFactory", "CMSJobDashboard"]

import os
import stat
import time
import pathlib
import socket
import threading
import queue
import re
import json
import shlex
import subprocess
import shutil
import collections

import law
from law.config import Config
from law.task.base import Task
from law.sandbox.base import Sandbox
from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile
from law.job.dashboard import BaseJobDashboard
from law.workflow.remote import JobData
from law.target.file import get_path
from law.util import (
    DotDict, interruptable_popen, make_list, make_unique, quote_cmd, no_value, rel_path, parse_duration,
)
from law.logger import get_logger
from law._types import Any, MutableMapping, Callable, Type, Hashable, T, Sequence

import law.contrib.cms.sandbox

law.contrib.load("wlcg")


logger = get_logger(__name__)

_cfg = Config.instance()


[docs] class CrabJobManager(BaseJobManager): submission_task_name_cre = re.compile(r"^Task\s+name\s*\:\s+([^\s]+)\s*$") submission_log_file_cre = re.compile(r"^Log\s+file\s+is\s+([^\s]+\.log)\s*$") query_server_status_cre = re.compile(r"^Status\s+on\s+the\s+CRAB\s+server\s*\:\s+([^\s].*)$") query_server_failure_cre = re.compile(r"^Failure\s+message\s+from\s+server\s*\:\s+([^\s].*)$") query_user_cre = re.compile(r"^Task\s+name\s*:\s+\d+_\d+\:([^_]+)_.+$") query_scheduler_cre = re.compile(r"^Grid\s+scheduler\s+-\s+Task\s+Worker\s*\:\s+([^\s]+).+$") query_scheduler_id_cre = re.compile(r"^Grid\s+scheduler\s+-\s+Task\s+Worker\s*\:\s+crab.*\@.+[^\d](\d+)\..+$") # noqa query_scheduler_status_cre = re.compile(r"^Status\s+on\s+the\s+scheduler\s*\:\s+([^\s].*)$") query_monitoring_url_cre = re.compile(r"^Dashboard\s+monitoring\s+URL\s*\:\s+([^\s].*)$") query_json_line_cre = re.compile(r"^\s*(\{.+\})\s*$") log_n_jobs_cre = re.compile(r"^config\.Data\.totalUnits\s+\=\s+(\d+)\s*$") log_disable_output_collection_cre = re.compile(r"^config\.JobType\.disableAutomaticOutputCollection\s+\=\s+([^\s]+)\s*$") # noqa log_task_name_cre = re.compile(r"^.+\s+Task\s+name\s*\:\s+([^\s]+)\s*$") log_file_pattern = "https://cmsweb.cern.ch:8443/scheddmon/{scheduler_id}/{user}/{task_name}/job_out.{crab_num}.{attempt}.txt" # noqa job_grouping_submit = True job_grouping_query = True job_grouping_cancel = True job_grouping_cleanup = True JobId = collections.namedtuple("JobId", ["crab_num", "task_name", "proj_dir"]) def __init__( self, sandbox_name: str | None = None, proxy_file: str | None = None, myproxy_username: str | None = None, instance: str | None = None, threads: int = 1, ) -> None: super().__init__() # default sandbox name if sandbox_name is None: sandbox_name = _cfg.get_expanded("job", "crab_sandbox_name") # create the cmssw sandbox self.cmssw_sandbox = Sandbox.new( sandbox_name if sandbox_name.startswith("cmssw::") else f"cmssw::{sandbox_name}", ) # store attributes self.proxy_file = proxy_file self.myproxy_username = myproxy_username self.instance = instance self.threads = threads
[docs] @classmethod def cast_job_id(cls, job_id: tuple[str]) -> CrabJobManager.JobId: """ Converts a *job_id*, for instance after json deserialization, into a :py:class:`JobId` object. """ if isinstance(job_id, cls.JobId): return job_id if isinstance(job_id, (list, tuple)): return cls.JobId(*job_id) # type: ignore[call-arg] raise ValueError(f"cannot cast to {cls.JobId.__name__}: '{job_id!r}'")
@property def cmssw_env(self) -> MutableMapping[str, Any]: return self.cmssw_sandbox.env
[docs] def group_job_ids(self, job_ids: list[JobId]) -> dict[str, list[JobId]]: # type: ignore[override] # noqa groups: dict[str, list[CrabJobManager.JobId]] = {} # group by project directory for job_id in job_ids: if job_id.proj_dir not in groups: groups[job_id.proj_dir] = [] groups[job_id.proj_dir].append(job_id) return groups
def _apply_group( self, func: Callable, result_type: Type[T], group_func: Callable[[list[Any]], dict[Hashable, list[Any]]], job_objs: list[Any], threads: int | None = None, callback: Callable[[int, Any], Any] | None = None, **kwargs, ) -> T: # when job_objs is a string or a sequence of strings, interpret them as project dirs, read # their log files to extract task names, build actual job ids and forward them if func != self.submit: job_ids = [] for i, job_id in enumerate(make_list(job_objs)): if not isinstance(job_id, (str, pathlib.Path)): job_ids.append(job_id) continue # get n_jobs and task_name from log file proj_dir = job_id log_file = os.path.join(proj_dir, "crab.log") if not os.path.exists(log_file): job_ids.append(job_id) continue log_data = self._parse_log_file(log_file) if "n_jobs" not in log_data or "task_name" not in log_data: job_ids.append(job_id) continue # expand ids log_data: dict[str, str | None] for crab_num in range(1, int(log_data["n_jobs"]) + 1): job_ids.append(self.JobId(crab_num, log_data["task_name"], proj_dir)) job_objs = job_ids return super()._apply_group( func, result_type, group_func, job_objs, threads=threads, callback=callback, **kwargs, ) def _check_proj_dir(self, proj_dir: str | pathlib.Path) -> None: if not os.path.isdir(str(proj_dir)): raise Exception(f"project directory '{proj_dir}' does not exist")
[docs] def submit( # type: ignore[override] self, job_file: str | pathlib.Path, *, job_files: Sequence[str | pathlib.Path] | None = None, proxy_file: str | None = None, myproxy_username: str | None = None, instance: str | None = None, retries: int = 0, retry_delay: int | float = 3, silent: bool = False, _processes: list | None = None, ) -> list[JobId] | None: # default arguments if proxy_file is None: proxy_file = self.proxy_file if myproxy_username is None: myproxy_username = self.myproxy_username if instance is None: instance = self.instance # get the job file location as the submission command is run it the same directory job_file_dir, job_file_name = os.path.split(os.path.abspath(get_path(job_file))) # define the actual submission in a loop to simplify retries while True: # build the command cmd = shlex.split(_cfg.get_expanded("job", "crab_cmd_crab")) cmd += ["submit", "--config", job_file_name] if proxy_file: cmd += ["--proxy", proxy_file] if instance: cmd += ["--instance", instance] cmd_str = quote_cmd(cmd) # run the command # crab prints everything to stdout logger.debug(f"submit crab jobs with command '{cmd_str}'") out: str code, out, _ = interruptable_popen( # type: ignore[assignment] cmd_str, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stdin=None, cwd=job_file_dir, env=self.cmssw_env, kill_timeout=2, processes=_processes, ) # handle errors if code != 0: logger.debug(f"submission of crab job '{job_file}' failed with code {code}:\n{out}") # remove the project directory proj_dir = self._proj_dir_from_job_file(job_file, self.cmssw_env) if proj_dir and os.path.isdir(proj_dir): logger.debug(f"removing crab project '{proj_dir}' from previous attempt") shutil.rmtree(proj_dir) if retries > 0: retries -= 1 time.sleep(retry_delay) continue if silent: return None raise Exception( f"submission of crab job '{job_file}' failed with code {code}:\n{out}", ) # parse outputs task_name, log_file = None, None for line in out.replace("\r", "").split("\n"): if not task_name: m_task_name = self.submission_task_name_cre.match(line) if m_task_name: task_name = m_task_name.group(1) if not log_file: m_log_file = self.submission_log_file_cre.match(line) if m_log_file: log_file = m_log_file.group(1) if task_name and log_file: break if not task_name: raise Exception(f"no valid task name found in submission output:\n\n{out}") if not log_file: raise Exception(f"no valid log file found in submission output:\n\n{out}") # create job ids with log data proj_dir = os.path.dirname(log_file) job_ids = self._job_ids_from_proj_dir(proj_dir) # checks if job_files is not None and len(job_files) != len(job_ids): raise Exception( f"number of submited jobs ({len(job_ids)}) does not match number of job files " f"({len(job_files)})", ) return job_ids
[docs] def cancel( # type: ignore[override] self, proj_dir: str | pathlib.Path, *, job_ids: list[JobId] | None = None, proxy_file: str | None = None, myproxy_username: str | None = None, instance: str | None = None, silent: bool = False, _processes: list | None = None, ) -> dict[JobId, None]: self._check_proj_dir(proj_dir) # default arguments if job_ids is None: job_ids = self._job_ids_from_proj_dir(proj_dir) if proxy_file is None: proxy_file = self.proxy_file if myproxy_username is None: myproxy_username = self.myproxy_username if instance is None: instance = self.instance # build the command cmd = shlex.split(_cfg.get_expanded("job", "crab_cmd_crab")) cmd += ["kill", "--dir", str(proj_dir)] if proxy_file: cmd += ["--proxy", proxy_file] if instance: cmd += ["--instance", instance] cmd_str = quote_cmd(cmd) # run it logger.debug(f"cancel crab job(s) with command '{cmd_str}'") code, out, _ = interruptable_popen( cmd_str, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stdin=None, env=self.cmssw_env, kill_timeout=2, processes=_processes, ) # check success if code != 0 and not silent: # crab prints everything to stdout raise Exception( f"cancellation of crab jobs from project '{proj_dir}' failed with code {code}:\n" f"{out}", ) return {job_id: None for job_id in job_ids}
[docs] def cleanup( # type: ignore[override] self, proj_dir: str | pathlib.Path, *, job_ids: list[JobId] | None = None, proxy_file: str | None = None, myproxy_username: str | None = None, instance: str | None = None, silent: bool = False, _processes: list | None = None, ) -> dict[JobId, None]: if job_ids is None: job_ids = self._job_ids_from_proj_dir(proj_dir) # just delete the project directory proj_dir = str(proj_dir) if os.path.isdir(proj_dir): shutil.rmtree(proj_dir) return {job_id: None for job_id in job_ids}
[docs] def query( # type: ignore[override] self, proj_dir: str | pathlib.Path, *, job_ids: list[JobId] | None = None, proxy_file: str | None = None, myproxy_username: str | None = None, instance: str | None = None, skip_transfers: bool | None = None, silent: bool = False, _processes: list | None = None, ) -> dict[JobId, dict[str, Any]] | None: self._check_proj_dir(proj_dir) # default arguments proj_dir = str(proj_dir) log_data = self._parse_log_file(os.path.join(proj_dir, "crab.log")) if job_ids is None: job_ids = self._job_ids_from_proj_dir(proj_dir, log_data=log_data) if proxy_file is None: proxy_file = self.proxy_file if myproxy_username is None: myproxy_username = self.myproxy_username if instance is None: instance = self.instance # when output collection is disabled, we can consider all "transferring" states as finished if skip_transfers is None: skip_transfers = str(log_data.get("disable_output_collection")).lower() == "true" # build the command cmd = shlex.split(_cfg.get_expanded("job", "crab_cmd_crab")) cmd += ["status", "--dir", proj_dir, "--json"] if proxy_file: cmd += ["--proxy", proxy_file] if instance: cmd += ["--instance", instance] # optionally prepend timeout query_timeout = _cfg.get_expanded("job", _cfg.find_option("job", "crab_job_query_timeout", "job_query_timeout")) if query_timeout: query_timeout_sec = parse_duration(query_timeout, input_unit="s") cmd = self.prepend_timeout_command(cmd, query_timeout_sec) # run it cmd_str = quote_cmd(cmd) logger.debug(f"query crab job(s) with command '{cmd_str}'") out: str code, out, _ = interruptable_popen( # type: ignore[assignment] cmd_str, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=None, env=self.cmssw_env, kill_timeout=2, processes=_processes, ) # handle errors if code != 0: if silent: return None # crab prints everything to stdout raise Exception( f"status query of crab jobs from project '{proj_dir}' failed with code {code}:\n" f"{out}", ) # parse the output and extract the status per job query_data = self.parse_query_output(out, proj_dir, job_ids, skip_transfers=skip_transfers) # compare to the requested job ids and perform some checks for job_id in job_ids: if job_id not in query_data: query_data[job_id] = self.job_status_dict( job_id=job_id, status=self.FAILED, error="job not found in query response", ) return query_data
@classmethod def parse_query_output( cls, out: str, proj_dir: str | pathlib.Path, job_ids: list[JobId], skip_transfers: bool = False, ) -> dict[JobId, dict[str, Any]]: # parse values using compiled regexps cres = [ cls.query_user_cre, cls.query_server_status_cre, cls.query_scheduler_id_cre, cls.query_scheduler_status_cre, cls.query_json_line_cre, cls.query_monitoring_url_cre, cls.query_server_failure_cre, ] values: list[str | None] = len(cres) * [None] # type: ignore[assignment] for line in out.replace("\r", "").split("\n"): for i, (cre, value) in enumerate(zip(cres, values)): if value: continue m = cre.match(line) if m: values[i] = m.group(1) if all(values): break # unpack ( username, server_status, scheduler_id, scheduler_status, json_line, monitoring_url, server_failure, ) = values # helper to build extra info def extra( job_id: CrabJobManager.JobId, job_data: dict[str, Any] | None = None, ) -> dict[str, Any]: extra = {} if username and scheduler_id and job_data: extra["log_file"] = cls.log_file_pattern.format( scheduler_id=scheduler_id, user=username, task_name=job_id.task_name, crab_num=job_id.crab_num, attempt=job_data.get("Retries", 0), ) if monitoring_url: extra["tracking_url"] = monitoring_url return extra # in case scheduler status or the json line is missing, the submission could be too new or # it failed entirely if not scheduler_status or not json_line: pending_server_states = { "HOLDING on command SUBMIT", "NEW on command SUBMIT", "QUEUED on command SUBMIT", "WAITING on command SUBMIT", "SUBMITTED", } failed_server_states = {"SUBMITFAILED"} error = None if server_status in pending_server_states: status = cls.PENDING elif server_status in failed_server_states: status = cls.FAILED error = server_failure or "submission failed" else: s = ",".join(map("'{}'".format, pending_server_states | failed_server_states)) raise Exception( "no per-job information available (yet?), which is only accepted if the crab " f"server status is any of {s}, but got '{server_status}'", ) return { job_id: cls.job_status_dict( job_id=job_id, status=status, error=error, extra=extra(job_id), ) for job_id in job_ids } # parse json data if not json_line: raise Exception( "no per-job information available in status response, crab server " f"status '{server_status}', scheduler status '{scheduler_status}'", ) # map of crab job numbers to full ids for faster lookup num_to_id_map = {job_id.crab_num: job_id for job_id in job_ids} # build query data query_data = {} for crab_num_str, data in json.loads(json_line).items(): crab_num = int(crab_num_str) if crab_num not in num_to_id_map: continue job_id = num_to_id_map[crab_num] # parse error info code = None error = data.get("Error") if isinstance(error, list) and len(error) >= 2: code = error[0] error = str(error[1]).strip() # extra info _extra = extra(job_id, data) or {} _extra["site_history"] = data.get("SiteHistory") # fill query data query_data[job_id] = cls.job_status_dict( job_id=job_id, status=cls.map_status(data["State"], skip_transfers=skip_transfers), code=code, error=error, extra=_extra, ) return query_data @classmethod def _proj_dir_from_job_file( cls, job_file: str | pathlib.Path, cmssw_env: MutableMapping[str, Any], ) -> str | None: work_area = None request_name = None job_file = str(job_file) with open(job_file, "r") as f: # fast approach: parse the job file for line in f.readlines(): if work_area and request_name: break if not work_area: m = re.match(r"^.+[^\w]workArea\s*=\s(\'|\")(.+)(\'|\").*$", line.strip()) if m: work_area = m.group(2) continue if not request_name: m = re.match(r"^.+[^\w]requestName\s*=\s(\'|\")(.+)(\'|\").*$", line.strip()) if m: request_name = m.group(2) continue # when the combination is correct, return if work_area and request_name and 0: path = os.path.join(work_area, "crab_{}".format(request_name)) path = os.path.expandvars(os.path.expanduser(path)) if os.path.isdir(path): return path # long approach: read the file in the cmssw env and manually print m = re.match(r"^CMSSW(_.+|)_(\d)+_\d+_\d+.*$", cmssw_env["CMSSW_VERSION"]) cmssw_major = int(m.group(2)) if m else None py_exec = "python3" if cmssw_major is None or cmssw_major >= 11 else "python" cmd = f"""{py_exec} -c ' from os.path import join with open("{job_file}", "r") as f: mod = dict() exec(f.read(), mod) cfg = mod["cfg"] print(join(cfg.General.workArea, "crab_" + cfg.General.requestName))'""" out: str code, out, _ = interruptable_popen( # type: ignore[assignment] cmd, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, env=cmssw_env, ) if code == 0: path = out.strip().replace("\r\n", "\n").split("\n")[-1] path = os.path.expandvars(os.path.expanduser(path)) if os.path.isdir(path): return path return None @classmethod def _parse_log_file(cls, log_file: str | pathlib.Path) -> dict[str, str | int]: log_file = os.path.expandvars(os.path.expanduser(str(log_file))) if not os.path.exists(log_file): raise FileNotFoundError(f"log file '{log_file}' does not exist") cres = [cls.log_n_jobs_cre, cls.log_task_name_cre, cls.log_disable_output_collection_cre] names = ["n_jobs", "task_name", "disable_output_collection"] values: list[str | int | None] = len(cres) * [None] # type: ignore[assignment] types = [int, str, bool] with open(log_file, "r") as f: for line in f.readlines(): for i, (cre, value, t) in enumerate(zip(cres, values, types)): if value is not None: continue m = cre.match(line) if m: values[i] = t(m.group(1)) if all(values): break return {n: v for n, v in zip(names, values) if v is not None} @classmethod def _job_ids_from_proj_dir( cls, proj_dir: str | pathlib.Path, log_data: dict[str, str | int] | None = None, ) -> list[JobId]: # read log data proj_dir = str(proj_dir) if log_data is None: log_data = cls._parse_log_file(os.path.join(proj_dir, "crab.log")) if "n_jobs" not in log_data or "task_name" not in log_data: raise ValueError(f"log data does not contain 'n_jobs' or 'task_name': {log_data}") # build and return ids return [ cls.JobId(crab_num, log_data["task_name"], proj_dir) for crab_num in range(1, int(log_data["n_jobs"]) + 1) ] @classmethod def map_status(cls, status: str | None, skip_transfers: bool = False) -> str: # see https://twiki.cern.ch/twiki/bin/view/CMSPublic/Crab3HtcondorStates if status in ("cooloff", "unsubmitted", "idle"): return cls.PENDING if status in ("running",): return cls.RUNNING if status in ("transferring", "transferred"): return cls.FINISHED if skip_transfers else cls.RUNNING if status in ("finished",): return cls.FINISHED if status in ("killing", "failed", "held"): return cls.FAILED logger.debug(f"unknown crab job state '{status}'") return cls.FAILED
[docs] class CrabJobFileFactory(BaseJobFileFactory): config_attrs = BaseJobFileFactory.config_attrs + [ "file_name", "executable", "arguments", "work_area", "request_name", "input_files", "output_files", "storage_site", "output_lfn_base", "vo_group", "vo_role", "crab", "custom_content", "absolute_paths", ] def __init__( self, *, file_name: str = "crab_job.py", executable: str | None = None, arguments: Sequence[str] | None = None, work_area: str | None = None, request_name: str | None = None, input_files: dict[str, str | pathlib.Path | JobInputFile] | None = None, output_files: list[str] | None = None, storage_site: str | None = None, output_lfn_base: str | None = None, vo_group: str | None = None, vo_role: str | None = None, custom_content: str | Sequence[str] | None = None, absolute_paths: bool = False, **kwargs, ) -> None: # get some default kwargs from the config default_dir = _cfg.get_expanded( "job", _cfg.find_option("job", "crab_job_file_dir", "job_file_dir"), ) if kwargs.get("dir") is None: kwargs["dir"] = _cfg.get_expanded( "job", _cfg.find_option("job", "crab_job_file_dir", "job_file_dir"), ) if kwargs.get("cleanup") is None: kwargs["cleanup"] = _cfg.get_expanded_bool( "job", _cfg.find_option("job", "crab_job_file_dir_cleanup", "job_file_dir_cleanup"), ) if kwargs.get("mkdtemp") is None: kwargs["mkdtemp"] = _cfg.get_expanded_bool( "job", _cfg.find_option("job", "crab_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"), force_type=False, ) super().__init__(**kwargs) self.file_name = file_name self.executable = executable self.arguments = arguments self.work_area = work_area self.request_name = request_name self.input_files = input_files or {} self.output_files = output_files or [] self.storage_site = storage_site self.output_lfn_base = output_lfn_base self.vo_group = vo_group self.vo_role = vo_role self.custom_content = custom_content self.absolute_paths = absolute_paths # defaults if not self.work_area: self.work_area = default_dir # crab config # "no_value" marks required settings, None marks optional settings self.crab = DotDict([ ("General", DotDict([ ("requestName", no_value), ("workArea", no_value), ("transferLogs", False), ("transferOutputs", no_value), ])), ("JobType", DotDict([ ("pluginName", "Analysis"), ("psetName", rel_path(__file__, "crab", "PSet.py")), ("scriptExe", no_value), ("maxMemoryMB", 2048), ("allowUndistributedCMSSW", True), ("disableAutomaticOutputCollection", True), ("inputFiles", no_value), ("outputFiles", no_value), ])), ("Data", DotDict([ ("inputDBS", "global"), ("splitting", "FileBased"), ("unitsPerJob", 1), ("totalUnits", no_value), ("inputDataset", None), ("userInputFiles", None), ("allowNonValidInputDataset", True), ("outLFNDirBase", no_value), ("publication", False), ("ignoreLocality", False), ])), ("Site", DotDict([ ("storageSite", no_value), ])), ("User", DotDict([ ("voGroup", None), ("voRole", None), ])), ])
[docs] def create(self, **kwargs) -> tuple[str, CrabJobFileFactory.Config]: # merge kwargs and instance attributes c = self.get_config(**kwargs) # some sanity checks if not c.file_name: raise ValueError("file_name must not be empty") if not c.executable: raise ValueError("executable must not be empty") if not c.request_name: raise ValueError("request_name must not be empty") if "." in c.request_name: raise ValueError(f"request_name should not contain '.', got {c.request_name}") if len(c.request_name) > 100: raise ValueError( f"request_name must be less then 100 characters long, got {len(c.request_name)}: " f"{c.request_name}", ) if not c.output_lfn_base: raise ValueError("output_lfn_base must not be empty") if not c.storage_site: raise ValueError("storage_site must not be empty") if not isinstance(c.arguments, (list, tuple)): raise ValueError(f"arguments must be a list, got '{c.arguments}'") if "job_file" not in c.input_files: raise ValueError("an input file with key 'job_file' is required") # ensure that all log files are output files for attr in ["custom_log_file"]: if c[attr] and c[attr] not in c.output_files: c.output_files.append(c[attr]) c.output_files = list(map(str, c.output_files)) # ensure that all input files are JobInputFile's c.input_files = { key: JobInputFile(f) for key, f in c.input_files.items() } # ensure that the executable is an input file, remember the key to access it if c.executable: executable_keys = [ k for k, v in c.input_files.items() if get_path(v) == get_path(c.executable) ] if executable_keys: executable_key = executable_keys[0] else: executable_key = "executable_file" c.input_files[executable_key] = JobInputFile(c.executable) # add the wrapper file to the inputs c.input_files["crab_wrapper"] = JobInputFile( path=rel_path(__file__, "crab", "crab_wrapper.sh"), copy=True, render=True, increment=True, ) # prepare input files def prepare_input(f): # when not copied, just return the absolute, original path abs_path = os.path.abspath(f.path) if f.copy: # copy the file and apply other transformations abs_path = self.provide_input( src=abs_path, dir=c.dir, skip_existing=f.share, increment_existing=f.increment and not f.share, ) return abs_path # absolute input paths for key, f in c.input_files.items(): f.path_sub_abs = prepare_input(f) # input paths relative to the submission dir for key, f in c.input_files.items(): f.path_sub_rel = ( os.path.basename(f.path_sub_abs) if f.copy and not c.absolute_paths else f.path_sub_abs ) # input paths as seen by the job, before and after potential job-side rendering for key, f in c.input_files.items(): f.path_job_pre_render = os.path.basename(f.path_sub_abs) f.path_job_post_render = f.path_job_pre_render # update files in render variables with that after potential rendering c.render_variables.update({ key: f.path_job_post_render for key, f in c.input_files.items() }) # add space separated input files before potential rendering to render variables c.render_variables["input_files"] = " ".join( f.path_job_pre_render for f in c.input_files.values() ) # add space separated list of input files for rendering c.render_variables["input_files_render"] = " ".join( f.path_job_pre_render for f in c.input_files.values() if f.render_job ) # add the custom log file to render variables if c.custom_log_file: c.render_variables["log_file"] = c.custom_log_file # inject arguments into the crab wrapper via render variables c.render_variables["crab_job_arguments_map"] = ("\n" + 8 * " ").join( f"['{i + 1}']=\"{args}\"" for i, args in enumerate(c.arguments) ) # linearize render variables render_variables = self.linearize_render_variables( c.render_variables, drop_base64_keys=["crab_job_arguments_map"], ) # prepare the job file job_file = self.postfix_input_file(os.path.join(c.dir, str(c.file_name))) # render copied input files for key, f in c.input_files.items(): if not f.copy or not f.render_local: continue self.render_file( f.path_sub_abs, f.path_sub_abs, render_variables, ) # prepare the executable when given if c.executable: c.executable = get_path(c.input_files[executable_key].path_job_post_render) # make the file executable for the user and group path = os.path.join(c.dir, os.path.basename(c.executable)) if os.path.exists(path): os.chmod(path, os.stat(path).st_mode | stat.S_IXUSR | stat.S_IXGRP) # resolve work_area relative to self.dir if c.work_area: work_area = os.path.expandvars(os.path.expanduser(str(c.work_area))) c.work_area = os.path.join(self.dir, work_area) else: c.work_area = self.dir # General c.crab.General.requestName = c.request_name c.crab.General.workArea = c.work_area c.crab.General.transferOutputs = bool(c.output_files) # JobType c.crab.JobType.scriptExe = c.input_files["crab_wrapper"].path_sub_rel c.crab.JobType.inputFiles = make_unique([ f.path_sub_rel for f in c.input_files.values() ]) c.crab.JobType.outputFiles = make_unique(c.output_files) or None # Data c.crab.Data.totalUnits = len(c.arguments) c.crab.Data.outLFNDirBase = c.output_lfn_base # define custom input files when no inputDataset is given # note: they don't have to exist but crab requires a list of length totalUnits if not c.crab.Data.inputDataset: c.crab.Data.userInputFiles = [ f"input_{i + 1}.root" for i in range(c.crab.Data.totalUnits) ] # Site c.crab.Site.storageSite = c.storage_site # User if c.vo_group: c.crab.User.voGroup = c.vo_group if c.vo_role: c.crab.User.voRole = c.vo_role # write the job file self.write_crab_config(job_file, c.crab, custom_content=c.custom_content) logger.debug(f"created crab job file at '{job_file}'") return job_file, c
@classmethod def write_crab_config( cls, job_file: str | pathlib.Path, crab_config: DotDict, custom_content: str | Sequence[str] | None = None, ) -> None: fmt_flat = lambda s: "\"{}\"".format(s) if isinstance(s, str) else str(s) with open(job_file, "w") as f: # header f.write("# coding: utf-8\n") f.write("\n") f.write("from CRABClient.UserUtilities import config\n") f.write("\n") f.write("cfg = config()\n") f.write("\n") # sections for section, cfg in crab_config.items(): f.write(f"cfg.section_(\"{section}\")\n") # options for option, value in cfg.items(): if value == no_value: raise Exception(f"cannot assign {value} to crab config {section}.{option}") if value is None: continue value_str = ( "[\n{}\n]".format("\n".join(" {},".format(fmt_flat(v)) for v in value)) if isinstance(value, (list, tuple)) else fmt_flat(value) ) f.write(f"cfg.{section}.{option} = {value_str}\n") f.write("\n") # custom content if custom_content is not None: for line in make_list(custom_content or []): f.write(f"{line}\n")
[docs] class CMSJobDashboard(BaseJobDashboard): """ This CMS job dashboard interface requires ``apmon`` to be installed on your system. See http://monalisa.caltech.edu/monalisa__Documentation__ApMon_User_Guide__apmon_ug_py.html and https://twiki.cern.ch/twiki/bin/view/ArdaGrid/CMSJobMonitoringCollector. """ PENDING = "pending" RUNNING = "running" CANCELLED = "cancelled" POSTPROC = "postproc" SUCCESS = "success" FAILED = "failed" tracking_url = ( "http://dashb-cms-job.cern.ch/dashboard/templates/task-analysis/#" "table=Jobs&p=1&activemenu=2&refresh=60&tid={dashboard_task_id}" ) persistent_attributes = ["task_id", "cms_user", "voms_user", "init_timestamp"] def __init__( self, task: Task, cms_user: str, voms_user: str, apmon_config: dict[str, Any] | None = None, log_level: str = "WARNING", max_rate: int = 20, task_type: str = "analysis", site: str | None = None, executable: str = "law", application: str | None = None, application_version: str | int | None = None, submission_tool: str = "law", submission_type: str = "direct", submission_ui: str | None = None, init_timestamp: str | None = None, ) -> None: super().__init__(max_rate=max_rate) # setup the apmon thread try: self.apmon = Apmon(apmon_config, self.max_rate, log_level) except ImportError as e: e.args = (f"{e} (required for {self.__class__.__name__})",) + e.args[1:] raise e # get the task family for use as default application name task_family = task.get_task_family() if isinstance(task, law.Task) else task # mandatory (persistent) attributes self.task_id = task.task_id if isinstance(task, law.Task) else task self.cms_user = cms_user self.voms_user = voms_user self.init_timestamp = init_timestamp or self.create_timestamp() # optional attributes self.task_type = task_type self.site = site self.executable = executable self.application = application or task_family self.application_version = application_version or self.task_id.rsplit("_", 1)[1] self.submission_tool = submission_tool self.submission_type = submission_type self.submission_ui = submission_ui or socket.gethostname() # start the apmon thread self.apmon.daemon = True self.apmon.start() def __del__(self) -> None: if getattr(self, "apmon", None) is None or not self.apmon.is_alive(): return self.apmon.stop() self.apmon.join() @classmethod def create_timestamp(cls) -> str: return time.strftime("%y%m%d_%H%M%S") @classmethod def create_dashboard_task_id( cls, task_id: str, cms_user: str, timestamp: str | None = None, ) -> str: if timestamp is None: timestamp = cls.create_timestamp() return f"{timestamp}:{cms_user}_{task_id}" @classmethod def create_dashboard_job_id(cls, job_num: str, job_id: str, attempt: int = 0) -> str: return f"{job_num}_{job_id}_{attempt}" @classmethod def params_from_status(cls, dashboard_status: str, fail_code: int = 1) -> dict[str, Any]: if dashboard_status == cls.PENDING: return {"StatusValue": "pending", "SyncCE": None} if dashboard_status == cls.RUNNING: return {"StatusValue": "running"} if dashboard_status == cls.CANCELLED: return {"StatusValue": "cancelled", "SyncCE": None} if dashboard_status == cls.POSTPROC: return {"StatusValue": "running", "JobExitCode": 0} if dashboard_status == cls.SUCCESS: return {"StatusValue": "success", "JobExitCode": 0} if dashboard_status == cls.FAILED: return {"StatusValue": "failed", "JobExitCode": fail_code} raise ValueError(f"invalid dashboard status '{dashboard_status}'")
[docs] @classmethod def map_status(cls, job_status: str, event: str) -> str | None: # when starting with "status.", event must end with the job status if event.startswith("status.") and event.split(".", 1)[-1] != job_status: raise ValueError(f"event '{event}' does not match job status '{job_status}'") status = lambda attr: f"status.{getattr(BaseJobManager, attr)}" return { "action.submit": cls.PENDING, "action.cancel": cls.CANCELLED, "custom.running": cls.RUNNING, "custom.postproc": cls.POSTPROC, "custom.failed": cls.FAILED, status("FINISHED"): cls.SUCCESS, }.get(event)
[docs] def remote_hook_file(self) -> str: return law.util.rel_path(__file__, "scripts", "cmsdashb_hooks.sh")
[docs] def remote_hook_data(self, job_num: int, attempt: int) -> dict[str, Any]: data = { "task_id": self.task_id, "cms_user": self.cms_user, "voms_user": self.voms_user, "init_timestamp": self.init_timestamp, "job_num": job_num, "attempt": attempt, } if self.site: data["site"] = self.site return data
[docs] def create_tracking_url(self) -> str: dashboard_task_id = self.create_dashboard_task_id(self.task_id, self.cms_user, self.init_timestamp) return self.tracking_url.format(dashboard_task_id=dashboard_task_id)
def create_message( self, job_data, event, job_num, attempt=0, custom_params=None, **kwargs, ) -> tuple[str, str, dict[str, Any]] | None: # we need the voms user, which must start with "/CN=" voms_user = self.voms_user if not voms_user: return None if not voms_user.startswith("/CN="): voms_user = "/CN=" + voms_user # map to job status to a valid dashboard status dashboard_status = self.map_status(job_data.get("status"), event) if not dashboard_status: return None # build the dashboard task id dashboard_task_id = self.create_dashboard_task_id( self.task_id, self.cms_user, self.init_timestamp, ) # build the id of the particular job dashboard_job_id = self.create_dashboard_job_id( job_num, job_data["job_id"], attempt=attempt, ) # build the parameters to send params = { "TaskId": dashboard_task_id, "JobId": dashboard_job_id, "GridJobId": job_data["job_id"], "CMSUser": self.cms_user, "GridName": voms_user, "JSToolUI": kwargs.get("submission_ui", self.submission_ui), } # add optional params params.update({ "TaskType": kwargs.get("task_type", self.task_type), "SyncCE": kwargs.get("site", self.site), "Executable": kwargs.get("executable", self.executable), "Application": kwargs.get("application", self.application), "ApplicationVersion": kwargs.get("application_version", self.application_version), "JSTool": kwargs.get("submission_tool", self.submission_tool), "SubmissionType": kwargs.get("submission_type", self.submission_type), }) # add status params params.update(self.params_from_status(dashboard_status, fail_code=job_data.get("code", 1))) # add custom params if custom_params: params.update(custom_params) # finally filter None's and convert everything to strings params = {key: str(value) for key, value in params.items() if value is not None} return (dashboard_task_id, dashboard_job_id, params)
[docs] @BaseJobDashboard.cache_by_status # type: ignore[misc] def publish(self, job_data: JobData, event: str, job_num: int, *args, **kwargs) -> None: # type: ignore[override] # noqa message = self.create_message(job_data, event, job_num, *args, **kwargs) if message: self.apmon.send(*message)
apmon_lock = threading.Lock() class Apmon(threading.Thread): default_config = { "cms-jobmon.cern.ch:8884": { "sys_monitoring": 0, "general_info": 0, "job_monitoring": 0, }, } def __init__( self, config: dict[str, dict[str, Any]] | None = None, max_rate: int = 20, log_level: str = "INFO", ) -> None: super().__init__() import apmon # type: ignore[import-untyped, import-not-found] log_level = getattr(apmon.Logger, log_level.upper()) self._apmon = apmon.ApMon(config or self.default_config, log_level) self._apmon.maxMsgRate = int(max_rate * 1.5) # hotfix of a bug occurring in apmon for too large pids for key, value in self._apmon.senderRef.items(): value["INSTANCE_ID"] = value["INSTANCE_ID"] & 0x7fffffff self._max_rate = max_rate self._queue: queue.Queue = queue.Queue() self._stop_event = threading.Event() def send(self, *args, **kwargs) -> None: self._queue.put((args, kwargs)) def _send(self, *args, **kwargs) -> None: self._apmon.sendParameters(*args, **kwargs) def stop(self) -> None: self._stop_event.set() def run(self) -> None: while True: # handling stopping self._stop_event.wait(0.5) if self._stop_event.is_set(): break if self._queue.empty(): continue with apmon_lock: while not self._queue.empty(): args, kwargs = self._queue.get() self._send(*args, **kwargs) time.sleep(1.0 / self._max_rate)