Source code for law.contrib.lsf.job

# coding: utf-8

"""
LSF job manager. See https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.3.
"""

from __future__ import annotations

__all__ = ["LSFJobManager", "LSFJobFileFactory"]

import os
import stat
import time
import re
import pathlib
import shlex
import subprocess

from law.config import Config
from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile
from law.target.file import get_path
from law.target.local import LocalDirectoryTarget
from law.util import interruptable_popen, make_list, make_unique, quote_cmd, parse_duration
from law.logger import get_logger
from law._types import Any, Sequence

from law.contrib.lsf.util import get_lsf_version


logger = get_logger(__name__)

_cfg = Config.instance()


[docs] class LSFJobManager(BaseJobManager): # chunking settings chunk_size_submit = 0 chunk_size_cancel = _cfg.get_expanded_int("job", "lsf_chunk_size_cancel") chunk_size_query = _cfg.get_expanded_int("job", "lsf_chunk_size_query") submission_job_id_cre = re.compile(r"^Job <(\d+)> is submitted.+$") def __init__(self, queue: str | None = None, emails: bool = False, threads: int = 1) -> None: super().__init__() self.queue = queue self.emails = emails self.threads = threads # determine the LSF version once self.lsf_version = get_lsf_version() # flags for versions with some important changes self.lsf_v912 = self.lsf_version and self.lsf_version >= (9, 1, 2)
[docs] def cleanup(self, *args, **kwargs) -> None: # type: ignore[override] raise NotImplementedError("LSFJobManager.cleanup is not implemented")
[docs] def cleanup_batch(self, *args, **kwargs) -> None: # type: ignore[override] raise NotImplementedError("LSFJobManager.cleanup_batch is not implemented")
[docs] def submit( # type: ignore[override] self, job_file: str | pathlib.Path, queue: str | None = None, emails: bool | None = None, retries: int = 0, retry_delay: float | int = 3, silent: bool = False, _processes: list | None = None, ) -> str | None: # default arguments if queue is None: queue = self.queue if emails is None: emails = self.emails # get the job file location as the submission command is run it the same directory job_file_dir, job_file_name = os.path.split(os.path.abspath(str(job_file))) # build the command cmd = _cfg.get_expanded("job", "lsf_cmd_bsub") cmd = f"LSB_JOB_REPORT_MAIL={'Y' if emails else 'N'} {cmd}" if queue: cmd += f" -q {queue}" cmd += f" < {job_file_name}" # define the actual submission in a loop to simplify retries while True: # run the command logger.debug(f"submit lsf job with command '{cmd}'") out: str err: str code, out, err = interruptable_popen( # type: ignore[assignment] cmd, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=job_file_dir, kill_timeout=2, processes=_processes, ) # get the job id if code == 0: m = self.submission_job_id_cre.match(out.strip()) if m: job_id = m.group(1) else: code = 1 err = f"cannot parse job id from output:\n{out}" # retry or done? if code == 0: return job_id logger.debug(f"submission of lsf job '{job_file}' failed with code {code}:\n{err}") if retries > 0: retries -= 1 time.sleep(retry_delay) continue if silent: return None raise Exception(f"submission of lsf job '{job_file}' failed: \n{err}")
[docs] def cancel( # type: ignore[override] self, job_id: str | Sequence[str], queue: str | None = None, silent: bool = False, _processes: list | None = None, ) -> dict[int, None] | None: # default arguments if queue is None: queue = self.queue chunking = isinstance(job_id, (list, tuple)) job_ids = make_list(job_id) # build the command cmd = shlex.split(_cfg.get_expanded("job", "lsf_cmd_bkill")) if queue: cmd += ["-q", queue] cmd += job_ids cmd_str = quote_cmd(cmd) # run it logger.debug(f"cancel lsf job(s) with command '{cmd_str}'") code, out, err = interruptable_popen( cmd_str, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2, processes=_processes, ) # check success if code != 0 and not silent: raise Exception(f"cancellation of lsf job(s) '{job_id}' failed with code {code}:\n{err}") return {job_id: None for job_id in job_ids} if chunking else None
[docs] def query( # type: ignore[override] self, job_id: str | Sequence[str], queue: str | None = None, silent: bool = False, _processes: list | None = None, ) -> dict[int, dict[str, Any]] | dict[str, Any] | None: # default arguments if queue is None: queue = self.queue chunking = isinstance(job_id, (list, tuple)) job_ids = make_list(job_id) # build the command cmd = shlex.split(_cfg.get_expanded("job", "lsf_cmd_bjobs")) if self.lsf_v912: cmd.append("-noheader") if queue: cmd += ["-q", queue] cmd += job_ids # optionally prepend timeout query_timeout = _cfg.get_expanded("job", _cfg.find_option("job", "lsf_job_query_timeout", "job_query_timeout")) if query_timeout: query_timeout_sec = parse_duration(query_timeout, input_unit="s") cmd = self.prepend_timeout_command(cmd, query_timeout_sec) # run it cmd_str = quote_cmd(cmd) logger.debug(f"query lsf job(s) with command '{cmd_str}'") out: str err: str code, out, err = interruptable_popen( # type: ignore[assignment] cmd, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stderr=subprocess.PIPE, kill_timeout=2, processes=_processes, ) # handle errors if code != 0: if silent: return None raise Exception(f"status query of lsf job(s) '{job_id}' failed with code {code}:\n{err}") # parse the output and extract the status per job query_data = self.parse_query_output(out) # compare to the requested job ids and perform some checks for _job_id in job_ids: if _job_id not in query_data: if not chunking: if silent: return None raise Exception(f"lsf job(s) '{job_id}' not found in query response") else: query_data[_job_id] = self.job_status_dict( job_id=_job_id, status=self.FAILED, error="job not found in query response", ) return query_data if chunking else query_data[job_id] # type: ignore[index]
[docs] @classmethod def parse_query_output(cls, out: str) -> dict[str, dict[str, Any]]: """ Example output to parse: 141914132 user_name DONE queue_name exec_host b63cee711a job_name Feb 8 14:54 """ query_data = {} for line in out.strip().split("\n"): parts = line.split() if len(parts) < 6: continue job_id = parts[0] status_flag = parts[2] # map the status status = cls.map_status(status_flag) # save the result query_data[job_id] = cls.job_status_dict(job_id=job_id, status=status) return query_data
@classmethod def map_status(cls, status_flag: str | None) -> str: # https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.2/lsf_command_ref/bjobs.1.html if status_flag in ("PEND", "PROV", "PSUSP", "USUSP", "SSUSP", "WAIT"): return cls.PENDING if status_flag in ("RUN",): return cls.RUNNING if status_flag in ("DONE",): return cls.FINISHED if status_flag in ("EXIT", "UNKWN", "ZOMBI"): return cls.FAILED logger.debug(f"unknown lsf job state '{status_flag}'") return cls.FAILED
[docs] class LSFJobFileFactory(BaseJobFileFactory): config_attrs = BaseJobFileFactory.config_attrs + [ "file_name", "command", "executable", "arguments", "queue", "cwd", "input_files", "output_files", "postfix_output_files", "manual_stagein", "manual_stageout", "job_name", "stdout", "stderr", "shell", "emails", "custom_content", "absolute_paths", ] def __init__( self, *, file_name: str = "lsf_job.job", command: str | Sequence[str] | None = None, executable: str | None = None, arguments: str | Sequence[str] | None = None, queue: str | None = None, cwd: str | pathlib.Path | LocalDirectoryTarget | None = None, input_files: dict[str, str | pathlib.Path | JobInputFile] | None = None, output_files: list[str] | None = None, postfix_output_files: bool = True, manual_stagein: bool = False, manual_stageout: bool = False, job_name: str | None = None, stdout: str = "stdout.txt", stderr: str = "stderr.txt", shell: str = "bash", emails: bool = False, custom_content: str | Sequence[str] | None = None, absolute_paths: bool = False, **kwargs, ) -> None: # get some default kwargs from the config if kwargs.get("dir") is None: kwargs["dir"] = _cfg.get_expanded( "job", _cfg.find_option("job", "lsf_job_file_dir", "job_file_dir"), ) if kwargs.get("mkdtemp") is None: kwargs["mkdtemp"] = _cfg.get_expanded_bool( "job", _cfg.find_option("job", "lsf_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"), force_type=False, ) if kwargs.get("cleanup") is None: kwargs["cleanup"] = _cfg.get_expanded_bool( "job", _cfg.find_option("job", "lsf_job_file_dir_cleanup", "job_file_dir_cleanup"), ) super().__init__(**kwargs) self.file_name = file_name self.command = command self.executable = executable self.arguments = arguments self.queue = queue self.cwd = cwd self.input_files = input_files or {} self.output_files = output_files or [] self.postfix_output_files = postfix_output_files self.manual_stagein = manual_stagein self.manual_stageout = manual_stageout self.job_name = job_name self.stdout = stdout self.stderr = stderr self.shell = shell self.emails = emails self.custom_content = custom_content self.absolute_paths = absolute_paths
[docs] def create( self, postfix: str | None = None, **kwargs, ) -> tuple[str, LSFJobFileFactory.Config]: # merge kwargs and instance attributes c = self.get_config(**kwargs) # some sanity checks if not c.file_name: raise ValueError("file_name must not be empty") if not c.command and not c.executable: raise ValueError("either command or executable must not be empty") if not c.shell: raise ValueError("shell must not be empty") # ensure that the custom log file is an output file if c.custom_log_file and c.custom_log_file not in c.output_files: c.output_files.append(c.custom_log_file) # postfix certain output files c.output_files = list(map(str, c.output_files)) if c.postfix_output_files: skip_postfix_cre = re.compile(r"^(/dev/).*$") skip_postfix = lambda s: bool(skip_postfix_cre.match(str(s))) c.output_files = [ path if skip_postfix(path) else self.postfix_output_file(path, postfix) for path in c.output_files ] for attr in ["stdout", "stderr", "custom_log_file"]: if c[attr] and not skip_postfix(c[attr]): c[attr] = self.postfix_output_file(c[attr], postfix) # ensure that all input files are JobInputFile objects c.input_files = { key: JobInputFile(f) for key, f in c.input_files.items() } # ensure that the executable is an input file, remember the key to access it if c.executable: executable_keys = [ k for k, v in c.input_files.items() if get_path(v) == get_path(c.executable) ] if executable_keys: executable_key = executable_keys[0] else: executable_key = "executable_file" c.input_files[executable_key] = JobInputFile(c.executable) # prepare input files def prepare_input(f): # when not copied, just return the absolute, original path abs_path = os.path.abspath(f.path) if not f.copy or f.forward: return abs_path # copy the file abs_path = self.provide_input( src=abs_path, postfix=postfix if f.postfix and not f.share else None, dir=c.dir, skip_existing=f.share, ) return abs_path # absolute input paths for key, f in c.input_files.items(): f.path_sub_abs = prepare_input(f) # input paths relative to the submission or initial dir # forwarded files are skipped as they are not treated as normal inputs for key, f in c.input_files.items(): if f.forward: continue f.path_sub_rel = ( os.path.basename(f.path_sub_abs) if f.copy and not c.absolute_paths else f.path_sub_abs ) # input paths as seen by the job, before and after potential rendering for key, f in c.input_files.items(): f.path_job_pre_render = ( f.path_sub_abs if f.forward else os.path.basename(f.path_sub_abs) ) f.path_job_post_render = ( f.path_sub_abs if f.forward and not f.render_job else os.path.basename(f.path_sub_abs) ) # update files in render variables with version after potential rendering c.render_variables.update({ key: f.path_job_post_render for key, f in c.input_files.items() }) # add space separated input files before potential rendering to render variables c.render_variables["input_files"] = " ".join( f.path_job_pre_render for f in c.input_files.values() ) # add space separated list of input files for rendering c.render_variables["input_files_render"] = " ".join( f.path_job_pre_render for f in c.input_files.values() if f.render_job ) # add the custom log file to render variables if c.custom_log_file: c.render_variables["log_file"] = c.custom_log_file # add the file postfix to render variables if postfix and "file_postfix" not in c.render_variables: c.render_variables["file_postfix"] = postfix # linearize render variables render_variables = self.linearize_render_variables(c.render_variables) # prepare the job description file job_file = self.postfix_input_file(os.path.join(c.dir, str(c.file_name)), postfix) # render copied, non-forwarded input files for key, f in c.input_files.items(): if not f.copy or f.forward or not f.render_local: continue self.render_file( f.path_sub_abs, f.path_sub_abs, render_variables, postfix=postfix if f.postfix else None, ) # prepare the executable when given if c.executable: c.executable = get_path(c.input_files[executable_key].path_job_post_render) # make the file executable for the user and group path = os.path.join(c.dir, os.path.basename(c.executable)) if os.path.exists(path): os.chmod(path, os.stat(path).st_mode | stat.S_IXUSR | stat.S_IXGRP) # job file content content: list[str | tuple[str] | tuple[str, Any]] = [] content.append("#!/usr/bin/env {}".format(c.shell)) if c.job_name: content.append(("-J", c.job_name)) if c.queue: content.append(("-q", c.queue)) if c.cwd: content.append(("-cwd", get_path(c.cwd))) if c.stdout: content.append(("-o", c.stdout)) if c.stderr: content.append(("-e", c.stderr)) if c.emails: content.append(("-N",)) if c.custom_content: content += c.custom_content if not c.manual_stagein: paths = [f.path_sub_rel for f in c.input_files.values() if f.path_sub_rel] for path in make_unique(paths): content.append(("-f", f"\"{path} > {os.path.basename(path)}\"")) if not c.manual_stageout: for path in make_unique(c.output_files): content.append(("-f", f"\"{path} < {os.path.basename(path)}\"")) if c.manual_stagein: tmpl = "cp " + ("{}" if c.absolute_paths else "$LS_EXECCWD/{}") + " $PWD/{}" paths = [f.path_sub_rel for f in c.input_files.values() if f.path_sub_rel] for path in make_unique(paths): content.append(tmpl.format(path, os.path.basename(path))) if c.command: content.append(c.command) else: content.append("./" + c.executable) if c.arguments: args = quote_cmd(c.arguments) if isinstance(c.arguments, (list, tuple)) else c.arguments content[-1] += f" {args}" # type: ignore[operator] if c.manual_stageout: tmpl = "cp $PWD/{} $LS_EXECCWD/{}" for path in c.output_files: content.append(tmpl.format(path, path)) # write the job file with open(job_file, "w") as f: for line in content: if not isinstance(line, str): line = self.create_line(*make_list(line)) f.write(f"{line}\n") logger.debug(f"created lsf job file at '{job_file}'") return job_file, c
@classmethod def create_line(cls, key: str, value: Any | None = None) -> str: if value is None: return f"#BSUB {key}" return f"#BSUB {key} {value}"