Source code for law.contrib.arc.job

# coding: utf-8

"""
Simple ARC job manager. See http://www.nordugrid.org/arc and
http://www.nordugrid.org/documents/xrsl.pdf.
"""

from __future__ import annotations

__all__ = ["ARCJobManager", "ARCJobFileFactory"]

import os
import stat
import time
import re
import shlex
import random
import pathlib
import subprocess

from law.config import Config
from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile
from law.target.file import get_path
from law.util import interruptable_popen, make_list, make_unique, quote_cmd, parse_duration
from law.logger import get_logger
from law._types import Any, Sequence


logger = get_logger(__name__)

_cfg = Config.instance()


[docs] class ARCJobManager(BaseJobManager): # chunking settings chunk_size_submit = _cfg.get_expanded_int("job", "arc_chunk_size_submit") chunk_size_cancel = _cfg.get_expanded_int("job", "arc_chunk_size_cancel") chunk_size_cleanup = _cfg.get_expanded_int("job", "arc_chunk_size_cleanup") chunk_size_query = _cfg.get_expanded_int("job", "arc_chunk_size_query") submission_job_id_cre = re.compile("^Job submitted with jobid: (.+)$") status_block_cre = re.compile(r"\s*([^:]+): (.*)\n") status_invalid_job_cre = re.compile("^.+: Job not found in job list: (.+)$") status_missing_job_cre = re.compile( "^.+: Job information not found in the information system: (.+)$", ) def __init__( self, job_list: str | None = None, ce: str | None = None, threads: int = 1, ) -> None: super().__init__() self.job_list = job_list self.ce = ce self.threads = threads
[docs] def submit( # type: ignore[override] self, job_file: str | pathlib.Path | Sequence[str | pathlib.Path], job_list: str | None = None, ce: str | None = None, retries: int = 0, retry_delay: float | int = 3, silent: bool = False, _processes: list | None = None, ) -> str | list[str] | None: # default arguments if job_list is None: job_list = self.job_list if ce is None: ce = self.ce # check arguments if not ce: raise ValueError("ce must not be empty") _ce = make_list(ce) # arc supports multiple jobs to be submitted with a single arcsub call, # so job_file can be a sequence of files # when this is the case, we have to make the assumption that their input files are all # absolute, or they are relative but all in the same directory chunking = isinstance(job_file, (list, tuple)) job_files = list(map(str, make_list(job_file))) job_file_dir = os.path.dirname(os.path.abspath(job_files[0])) job_file_names = [os.path.basename(jf) for jf in job_files] # define the actual submission in a loop to simplify retries while True: # build the command cmd = shlex.split(_cfg.get_expanded("job", "arc_cmd_arcsub")) cmd += ["-c", random.choice(_ce)] if job_list: cmd += ["-j", job_list] cmd += job_file_names cmd_str = quote_cmd(cmd) # run the command logger.debug(f"submit arc job(s) with command '{cmd_str}'") out: str code, out, _ = interruptable_popen( # type: ignore[assignment] cmd_str, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, cwd=job_file_dir, kill_timeout=2, processes=_processes, ) # in some cases, the return code is 0 but the ce did not respond valid job ids job_ids = [] if code == 0: for line in out.strip().split("\n"): m = self.submission_job_id_cre.match(line.strip()) if m: job_id = m.group(1) job_ids.append(job_id) if not job_ids: code = 1 out = f"cannot find job id(s) in output:\n{out}" elif len(job_ids) != len(job_files): raise Exception( f"number of job ids in output ({len(job_ids)}) does not match number of " f"jobs to submit ({len(job_files)}) in output:\n{out}", ) # retry or done? if code == 0: return job_ids if chunking else job_ids[0] logger.debug(f"submission of arc job(s) '{job_files}' failed with code {code}:\n{out}") if retries > 0: retries -= 1 time.sleep(retry_delay) continue if silent: return None raise Exception(f"submission of arc job(s) '{job_files}' failed:\n{out}")
[docs] def cancel( # type: ignore[override] self, job_id: str | Sequence[str], job_list: str | None = None, silent: bool = False, _processes: list | None = None, ) -> dict[str, None] | None: # default arguments if job_list is None: job_list = self.job_list chunking = isinstance(job_id, (list, tuple)) job_ids = make_list(job_id) # build the command cmd = shlex.split(_cfg.get_expanded("job", "arc_cmd_arckill")) if job_list: cmd += ["-j", job_list] cmd += job_ids cmd_str = quote_cmd(cmd) # run it logger.debug(f"cancel arc job(s) with command '{cmd_str}'") out: str code, out, _ = interruptable_popen( # type: ignore[assignment] cmd_str, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, kill_timeout=2, processes=_processes, ) # check success if code != 0 and not silent: # arc prints everything to stdout raise Exception(f"cancellation of arc job(s) '{job_id}' failed with code {code}:\n{out}") return {job_id: None for job_id in job_ids} if chunking else None
[docs] def cleanup( # type: ignore[override] self, job_id: str | Sequence[str], job_list: str | None = None, silent: bool = False, _processes: list | None = None, ) -> dict[str, None] | None: # default arguments if job_list is None: job_list = self.job_list chunking = isinstance(job_id, (list, tuple)) job_ids = make_list(job_id) # build the command cmd = shlex.split(_cfg.get_expanded("job", "arc_cmd_arcclean")) if job_list: cmd += ["-j", job_list] cmd += job_ids cmd_str = quote_cmd(cmd) # run it logger.debug(f"cleanup arc job(s) with command '{cmd_str}'") out: str code, out, _ = interruptable_popen( # type: ignore[assignment] cmd_str, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, kill_timeout=2, processes=_processes, ) # check success if code != 0 and not silent: # arc prints everything to stdout raise Exception(f"cleanup of arc job(s) '{job_id}' failed with code {code}:\n{out}") return {job_id: None for job_id in job_ids} if chunking else None
[docs] def query( # type: ignore[override] self, job_id: str | Sequence[str], job_list: str | None = None, silent: bool = False, _processes: list | None = None, ) -> dict[int, dict[str, Any]] | dict[str, Any] | None: # default arguments if job_list is None: job_list = self.job_list chunking = isinstance(job_id, (list, tuple)) job_ids = make_list(job_id) # build the command cmd = shlex.split(_cfg.get_expanded("job", "arc_cmd_arcstat")) if job_list: cmd += ["-j", job_list] cmd += job_ids # optionally prepend timeout query_timeout = _cfg.get_expanded("job", _cfg.find_option("job", "arc_job_query_timeout", "job_query_timeout")) if query_timeout: query_timeout_sec = parse_duration(query_timeout, input_unit="s") cmd = self.prepend_timeout_command(cmd, query_timeout_sec) # run it cmd_str = quote_cmd(cmd) logger.debug(f"query arc job(s) with command '{cmd_str}'") out: str code, out, _ = interruptable_popen( # type: ignore[assignment] cmd_str, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, kill_timeout=2, processes=_processes, ) # handle errors if code != 0: if silent: return None # arc prints everything to stdout raise Exception(f"status query of arc job(s) '{job_id}' failed with code {code}:\n{out}") # parse the output and extract the status per job query_data = self.parse_query_output(out) # compare to the requested job ids and perform some checks for _job_id in job_ids: if _job_id not in query_data: if not chunking: if silent: return None raise Exception(f"arc job(s) '{job_id}' not found in query response") else: query_data[_job_id] = self.job_status_dict( job_id=_job_id, status=self.FAILED, error="job not found in query response", ) return query_data if chunking else query_data[job_id] # type: ignore[index]
@classmethod def parse_query_output(cls, out: str) -> dict[str, dict[str, Any]]: query_data = {} # first, check for invalid and missing jobs for line in out.strip().split("\n"): line = line.strip() # invalid job? m = cls.status_invalid_job_cre.match(line) if m: job_id = m.group(1) query_data[job_id] = cls.job_status_dict( job_id=job_id, status=cls.FAILED, code=1, error="job not found", ) continue # missing job? this means that the job is not yet present in the information system # so it can be considered pending m = cls.status_missing_job_cre.match(line) if m: job_id = m.group(1) query_data[job_id] = cls.job_status_dict(job_id=job_id, status=cls.PENDING) continue # retrieve actual job status information per block # remove the summary line and check if there is any valid job status out = out.split("\nStatus of ", 1)[0].strip() if "Job: " not in out: return query_data blocks = out.split("Job: ", 1)[1].strip().split("\nJob: ") for block in blocks: data = dict(cls.status_block_cre.findall(f"Job: {block}\n")) if not data: continue # get the job id if "Job" not in data: continue job_id = data["Job"] # interpret data status = cls.map_status(data.get("State")) code = data.get("Exit Code") and int(data["Exit Code"]) error = data.get("Job Error") or None # special cases if status == cls.FAILED and code in (0, None): code = 1 # store it query_data[job_id] = cls.job_status_dict( job_id=job_id, status=status, code=code, error=error, ) return query_data @classmethod def map_status(cls, status: str | None) -> str: # see http://www.nordugrid.org/documents/arc-ui.pdf if status in ("Queuing", "Accepted", "Preparing", "Submitting"): return cls.PENDING if status in ("Running", "Finishing"): return cls.RUNNING if status in ("Finished",): return cls.FINISHED if status in ("Failed", "Deleted"): return cls.FAILED logger.debug(f"unknown arc job state '{status}'") return cls.FAILED
[docs] class ARCJobFileFactory(BaseJobFileFactory): config_attrs = BaseJobFileFactory.config_attrs + [ "file_name", "command", "executable", "arguments", "input_files", "output_files", "postfix_output_files", "output_uri", "overwrite_output_files", "job_name", "log", "stdout", "stderr", "custom_content", "absolute_paths", ] def __init__( self, file_name: str = "arc_job.xrsl", command: str | Sequence[str] | None = None, executable: str | None = None, arguments: str | Sequence[str] | None = None, input_files: dict[str, str | pathlib.Path | JobInputFile] | None = None, output_files: list[str] | None = None, postfix_output_files: bool = True, output_uri: str | None = None, overwrite_output_files: bool = True, job_name: str | None = None, log: str = "log.txt", stdout: str = "stdout.txt", stderr: str = "stderr.txt", custom_content: str | Sequence[str] | None = None, absolute_paths: bool = True, **kwargs, ) -> None: # get some default kwargs from the config if kwargs.get("dir") is None: kwargs["dir"] = _cfg.get_expanded( "job", _cfg.find_option("job", "arc_job_file_dir", "job_file_dir"), ) if kwargs.get("mkdtemp") is None: kwargs["mkdtemp"] = _cfg.get_expanded_bool( "job", _cfg.find_option("job", "arc_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"), force_type=False, ) if kwargs.get("cleanup") is None: kwargs["cleanup"] = _cfg.get_expanded_bool( "job", _cfg.find_option("job", "arc_job_file_dir_cleanup", "job_file_dir_cleanup"), ) super().__init__(**kwargs) self.file_name = file_name self.command = command self.executable = executable self.arguments = arguments self.input_files = input_files or {} self.output_files = output_files or [] self.postfix_output_files = postfix_output_files self.output_uri = output_uri self.overwrite_output_files = overwrite_output_files self.job_name = job_name self.log = log self.stdout = stdout self.stderr = stderr self.absolute_paths = absolute_paths self.custom_content = custom_content
[docs] def create( self, postfix: str | None = None, **kwargs, ) -> tuple[str, ARCJobFileFactory.Config]: # merge kwargs and instance attributes c = self.get_config(**kwargs) # some sanity checks if not c.file_name: raise ValueError("file_name must not be empty") if not c.command and not c.executable: raise ValueError("either command or executable must not be empty") # ensure that all log files are output files for attr in ["log", "stdout", "stderr", "custom_log_file"]: if c[attr] and c[attr] not in c.output_files: c.output_files.append(c[attr]) # postfix certain output files c.output_files = list(map(str, c.output_files)) if c.postfix_output_files: c.output_files = [self.postfix_output_file(path, postfix) for path in c.output_files] for attr in ["log", "stdout", "stderr", "custom_log_file"]: if c[attr]: c[attr] = self.postfix_output_file(c[attr], postfix) # ensure that all input files are JobInputFile's c.input_files = { key: JobInputFile(f) for key, f in c.input_files.items() } # special case: remote input files must never be copied for f in c.input_files.values(): if f.is_remote: f.copy = False # ensure that the executable is an input file, remember the key to access it if c.executable: executable_keys = [ k for k, v in c.input_files.items() if get_path(v) == get_path(c.executable) ] if executable_keys: executable_key = executable_keys[0] else: executable_key = "executable_file" c.input_files[executable_key] = JobInputFile(c.executable) # prepare input files def prepare_input(f): if f.is_remote: return f.path # when not copied or forwarded, just return the absolute, original path abs_path = os.path.abspath(f.path) if not f.copy or f.forward: return abs_path # copy the file abs_path = self.provide_input( src=abs_path, postfix=postfix if f.postfix and not f.share else None, dir=c.dir, skip_existing=f.share, ) return abs_path # absolute input paths for key, f in c.input_files.items(): f.path_sub_abs = prepare_input(f) # input paths relative to the submission or initial dir # forwarded files are included but remote ones are skipped for key, f in c.input_files.items(): if f.is_remote: continue f.path_sub_rel = ( os.path.basename(f.path_sub_abs) if f.copy and not c.absolute_paths else f.path_sub_abs ) # input paths as seen by the job, before and after potential rendering for key, f in c.input_files.items(): f.path_job_pre_render = ( f.path_sub_abs if f.is_remote else os.path.basename(f.path_sub_abs) ) f.path_job_post_render = ( os.path.basename(f.path_sub_abs) if f.render_job else f.path_sub_abs ) # update files in render variables with version after potential rendering c.render_variables.update({ key: f.path_job_post_render for key, f in c.input_files.items() }) # add space separated input files before potential rendering to render variables c.render_variables["input_files"] = " ".join( f.path_job_pre_render for f in c.input_files.values() ) # add space separated list of input files for rendering c.render_variables["input_files_render"] = " ".join( f.path_job_pre_render for f in c.input_files.values() if f.render_job ) # add the custom log file to render variables if c.custom_log_file: c.render_variables["log_file"] = c.custom_log_file # add the file postfix to render variables if postfix and "file_postfix" not in c.render_variables: c.render_variables["file_postfix"] = postfix # add output_uri to render variables if c.output_uri and "output_uri" not in c.render_variables: c.render_variables["output_uri"] = c.output_uri # linearize render variables render_variables = self.linearize_render_variables(c.render_variables) # prepare the job file job_file = self.postfix_input_file(os.path.join(c.dir, str(c.file_name)), postfix) # render copied, non-remote input files for key, f in c.input_files.items(): if not f.copy or f.is_remote or not f.render_local: continue self.render_file( f.path_sub_abs, f.path_sub_abs, render_variables, postfix=postfix if f.postfix else None, ) # create arc-style input file pairs input_file_pairs = [ (os.path.basename(f.path_sub_abs), "" if f.copy else f.path_sub_abs) for key, f in c.input_files.items() ] # prepare the executable when given if c.executable: c.executable = get_path(c.input_files[executable_key].path_job_post_render) # make the file executable for the user and group path = os.path.join(c.dir, os.path.basename(c.executable)) if os.path.exists(path): os.chmod(path, os.stat(path).st_mode | stat.S_IXUSR | stat.S_IXGRP) # ensure a correct format of output files def prepare_output(path): # consider strings to be the filename and when output_uri is set, use it # as the URL, otherwise it's also empty if c.postfix_output_files: path = self.postfix_output_file(path, postfix) dst = os.path.join(c.output_uri, os.path.basename(path)) if c.output_uri else "" opts = "overwrite=yes" if c.overwrite_output_files else None return (path, dst, opts) if opts else (path, dst) c.output_files = list(map(prepare_output, c.output_files)) # job file content content = [] if c.command: cmd = quote_cmd(c.command) if isinstance(c.command, (list, tuple)) else c.command content.append(("executable", cmd)) elif c.executable: content.append(("executable", c.executable)) if c.arguments: args = quote_cmd(c.arguments) if isinstance(c.arguments, (list, tuple)) else c.arguments content.append(("arguments", args)) if c.job_name: content.append(("jobName", c.job_name)) if c.input_files: content.append(("inputFiles", make_unique(input_file_pairs))) if c.output_files: content.append(("outputFiles", make_unique(c.output_files))) if c.log: content.append(("gmlog", c.log)) if c.stdout: content.append(("stdout", c.stdout)) if c.stderr: content.append(("stderr", c.stderr)) # add custom content if c.custom_content: content += c.custom_content # write the job file with open(job_file, "w") as f: f.write("&\n") for key, value in content: line = self.create_line(key, value) f.write(f"{line}\n") logger.debug(f"created arc job file at '{job_file}'") return job_file, c
@classmethod def create_line(cls, key: str, value: Any) -> str: def flat_value(value): if isinstance(value, list): return " ".join(flat_value(v) for v in value) if isinstance(value, tuple): return f"({' '.join(flat_value(v) for v in value)})" return f"\"{value}\"" return f"({key} = {flat_value(value)})"