# coding: utf-8
"""
HTCondor job manager. See https://research.cs.wisc.edu/htcondor.
"""
from __future__ import annotations
__all__ = ["HTCondorJobManager", "HTCondorJobFileFactory"]
import os
import stat
import time
import re
import pathlib
import tempfile
import shlex
import subprocess
from law.config import Config
from law.job.base import BaseJobManager, BaseJobFileFactory, JobInputFile
from law.target.file import get_path
from law.util import interruptable_popen, make_list, make_unique, quote_cmd, parse_duration
from law.logger import get_logger
from law._types import Any, Sequence
from law.contrib.htcondor.util import get_htcondor_version
logger = get_logger(__name__)
_cfg = Config.instance()
[docs]
class HTCondorJobManager(BaseJobManager):
# whether to use job grouping or batched submission
job_grouping_submit = _cfg.get_expanded_bool("job", "htcondor_job_grouping_submit")
# settings depending on job grouping or batched submission
merge_job_files = False
chunk_size_submit = 0
if not job_grouping_submit:
# whether to merge jobs files for batched submission
merge_job_files = _cfg.get_expanded_bool("job", "htcondor_merge_job_files")
# chunking for batched submission
chunk_size_submit = (
_cfg.get_expanded_int("job", "htcondor_chunk_size_submit")
if merge_job_files
else 0
)
# other chunking settings
chunk_size_cancel = _cfg.get_expanded_int("job", "htcondor_chunk_size_cancel")
chunk_size_query = _cfg.get_expanded_int("job", "htcondor_chunk_size_query")
submission_job_id_cre = re.compile(r"^(\d+) job\(s\) submitted to cluster (\d+)\.$")
long_block_cre = re.compile(r"(\w+) \= \"?([^\"\n]*)\"?\n")
def __init__(
self,
pool: str | None = None,
scheduler: str | None = None,
user: str | None = None,
threads: int = 1,
) -> None:
super().__init__()
self.pool = pool
self.scheduler = scheduler
self.user = user
self.threads = threads
# determine the htcondor version once
self.htcondor_version = get_htcondor_version()
# flags for versions with some important changes
self.htcondor_ge_v833 = self.htcondor_version and self.htcondor_version >= (8, 3, 3)
self.htcondor_ge_v856 = self.htcondor_version and self.htcondor_version >= (8, 5, 6)
[docs]
def cleanup(self, *args, **kwargs) -> None: # type: ignore[override]
raise NotImplementedError("HTCondorJobManager.cleanup is not implemented")
[docs]
def cleanup_batch(self, *args, **kwargs) -> None: # type: ignore[override]
raise NotImplementedError("HTCondorJobManager.cleanup_batch is not implemented")
[docs]
def submit( # type: ignore[override]
self,
job_file: str | pathlib.Path | Sequence[str | pathlib.Path],
job_files: Sequence[str | pathlib.Path] | None = None,
pool: str | None = None,
scheduler: str | None = None,
spool: bool = False,
retries: int = 0,
retry_delay: float | int = 3,
silent: bool = False,
_processes: list | None = None,
) -> str | Sequence[str] | None:
# signature is the superset for both grouped and batched submission, and the dispatching to
# the actual submission implementation is based on the presence of job_files
kwargs = {
"pool": pool,
"scheduler": scheduler,
"spool": spool,
"retries": retries,
"retry_delay": retry_delay,
"silent": silent,
"_processes": _processes,
}
if job_files is None:
return self._submit_impl_batched(job_file, **kwargs) # type: ignore[arg-type]
if isinstance(job_file, (list, tuple)):
if len(job_file) != 1:
raise ValueError(
f"job_file must be a single file when job_files is given, got {job_file}",
)
_job_file = job_file[0]
else:
_job_file = job_file
return self._submit_impl_grouped(_job_file, job_files=job_files, **kwargs) # type: ignore[arg-type] # noqa
def _submit_impl_batched( # type: ignore[override]
self,
job_file: str | pathlib.Path | Sequence[str | pathlib.Path],
pool: str | None = None,
scheduler: str | None = None,
spool: bool = False,
retries: int = 0,
retry_delay: float | int = 3,
silent: bool = False,
_processes: list | None = None,
) -> str | Sequence[str] | None:
# default arguments
if pool is None:
pool = self.pool
if scheduler is None:
scheduler = self.scheduler
# when job_file is a sequence of files, merge them all into one and submit it
# however, this only for job files being located in the same directory or if they have an
# "initialdir" defined
def has_initialdir(job_file):
with open(job_file, "r") as f:
for line in f.readlines():
if line.lower().strip().replace(" ", "").startswith("initialdir="):
return True
return False
chunking = isinstance(job_file, (list, tuple))
job_files = list(map(get_path, make_list(job_file)))
job_file_dir = None
for i, job_file in enumerate(job_files):
dirname, basename = os.path.split(job_file)
if job_file_dir is None:
if i == len(job_files) - 1 or not has_initialdir(job_file):
job_file_dir = dirname
elif dirname != job_file_dir:
if not has_initialdir(job_file):
raise Exception(
f"cannot performed chunked submission as job file '{job_file}' is not "
f"located in a previously seen directory '{job_file_dir}' and has no "
"initialdir",
)
# define a single, merged job file if necessary
if self.merge_job_files and len(job_files) > 1:
_job_file = tempfile.mkstemp(prefix="merged_job_", suffix=".jdl", dir=job_file_dir)[1]
with open(_job_file, "w") as f:
for job_file in job_files:
with open(job_file, "r") as _f:
f.write(f"{_f.read()}\n")
job_files = [_job_file]
# build the command
cmd = shlex.split(_cfg.get_expanded("job", "htcondor_cmd_submit"))
if pool:
cmd += ["-pool", pool]
if scheduler:
cmd += ["-name", scheduler]
if spool:
cmd.append("-spool")
cmd += list(map(os.path.basename, job_files))
cmd_str = quote_cmd(cmd)
# define the actual submission in a loop to simplify retries
while True:
# run the command
logger.debug(f"submit htcondor job with command '{cmd_str}'")
out: str
err: str
code, out, err = interruptable_popen( # type: ignore[assignment]
cmd_str,
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=os.path.dirname(job_files[0]),
kill_timeout=2,
processes=_processes,
)
# get the job id(s)
if code == 0:
# loop through all lines and try to match the expected pattern
job_ids = []
for line in out.strip().split("\n"):
m = self.submission_job_id_cre.match(line.strip())
if m:
job_ids.extend([f"{m.group(2)}.{i}" for i in range(int(m.group(1)))])
if not job_ids:
code = 1
err = f"cannot parse htcondor job id(s) from output:\n{out}"
# retry or done?
if code == 0:
return job_ids if chunking else job_ids[0]
job_files_repr = ",".join(map(os.path.basename, job_files))
logger.debug(
f"submission of htcondor job(s) '{job_files_repr}' failed with code {code}:\n{err}",
)
if retries > 0:
retries -= 1
time.sleep(retry_delay)
continue
if silent:
return None
raise Exception(f"submission of htcondor job(s) '{job_files_repr}' failed:\n{err}")
def _submit_impl_grouped( # type: ignore[override]
self,
job_file: str | pathlib.Path,
job_files: Sequence[str | pathlib.Path] | None = None,
pool: str | None = None,
scheduler: str | None = None,
spool: bool = False,
retries: int = 0,
retry_delay: float | int = 3,
silent: bool = False,
_processes: list | None = None,
) -> Sequence[str] | None:
# default arguments
if pool is None:
pool = self.pool
if scheduler is None:
scheduler = self.scheduler
# build the command
cmd = shlex.split(_cfg.get_expanded("job", "htcondor_cmd_submit"))
if pool:
cmd += ["-pool", pool]
if scheduler:
cmd += ["-name", scheduler]
if spool:
cmd.append("-spool")
cmd.append(os.path.basename(job_file))
cmd_str = quote_cmd(cmd)
# define the actual submission in a loop to simplify retries
while True:
# run the command
logger.debug(f"submit htcondor job with command '{cmd_str}'")
out: str
err: str
code, out, err = interruptable_popen( # type: ignore[assignment]
cmd_str,
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=os.path.dirname(job_file),
kill_timeout=2,
processes=_processes,
)
# get the job id(s)
if code == 0:
# loop through all lines and try to match the expected pattern
job_ids = []
for line in out.strip().split("\n"):
m = self.submission_job_id_cre.match(line.strip())
if m:
job_ids.extend([f"{m.group(2)}.{i}"for i in range(int(m.group(1)))])
if not job_ids:
code = 1
err = f"cannot parse htcondor job id(s) from output:\n{out}"
# retry or done?
if code == 0:
return job_ids
logger.debug(
f"submission of htcondor job(s) '{job_file}' failed with code {code}:\n{err}",
)
if retries > 0:
retries -= 1
time.sleep(retry_delay)
continue
if silent:
return None
raise Exception(f"submission of htcondor job(s) '{job_file}' failed:\n{err}")
[docs]
def cancel( # type: ignore[override]
self,
job_id: str | Sequence[str],
pool: str | None = None,
scheduler: str | None = None,
silent: bool = False,
_processes: list | None = None,
) -> dict[str, None] | None:
# default arguments
if pool is None:
pool = self.pool
if scheduler is None:
scheduler = self.scheduler
chunking = isinstance(job_id, (list, tuple))
job_ids = make_list(job_id)
# build the command
cmd = shlex.split(_cfg.get_expanded("job", "htcondor_cmd_rm"))
if pool:
cmd += ["-pool", pool]
if scheduler:
cmd += ["-name", scheduler]
cmd += job_ids
cmd_str = quote_cmd(cmd)
# run it
logger.debug(f"cancel htcondor job(s) with command '{cmd_str}'")
out: str
err: str
code, out, err = interruptable_popen( # type: ignore[assignment]
cmd_str,
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
kill_timeout=2,
processes=_processes,
)
# check success
if code != 0 and not silent:
raise Exception(
f"cancellation of htcondor job(s) '{job_id}' failed with code {code}:\n{err}",
)
return {job_id: None for job_id in job_ids} if chunking else None
[docs]
def query( # type: ignore[override]
self,
job_id: str | Sequence[str],
pool: str | None = None,
scheduler: str | None = None,
user: str | None = None,
silent: bool = False,
_processes: list | None = None,
) -> dict[int, dict[str, Any]] | dict[str, Any] | None:
# default arguments
if pool is None:
pool = self.pool
if scheduler is None:
scheduler = self.scheduler
if user is None:
user = self.user
chunking = isinstance(job_id, (list, tuple))
job_ids = make_list(job_id)
# condor_q ClassAds to get
q_ads = "ClusterId ProcId JobStatus ExitCode ExitStatus HoldReason RemoveReason MemoryUsage RemoteHost"
# build the condor_q command
cmd = shlex.split(_cfg.get_expanded("job", "htcondor_cmd_q"))
cmd += job_ids
if pool:
cmd += ["-pool", pool]
if scheduler:
cmd += ["-name", scheduler]
cmd += ["-af:lng"] + q_ads.split()
# since v8.3.3 one can limit the number of jobs to query
if self.htcondor_ge_v833:
cmd += ["-limit", str(len(job_ids))]
# optionally prepend timeout
query_timeout = _cfg.get_expanded(
"job",
_cfg.find_option("job", "htcondor_job_query_timeout", "job_query_timeout"),
)
if query_timeout:
query_timeout_sec = parse_duration(query_timeout, input_unit="s")
cmd = self.prepend_timeout_command(cmd, query_timeout_sec)
# run it
cmd_str = quote_cmd(cmd)
logger.debug(f"query htcondor job(s) with command '{cmd_str}'")
out: str
err: str
code, out, err = interruptable_popen( # type: ignore[assignment]
cmd_str,
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
kill_timeout=2,
processes=_processes,
)
# handle errors
if code != 0:
if silent:
return None
raise Exception(
f"queue query of htcondor job(s) '{job_id}' failed with code {code}:\n{err}",
)
# parse the output and extract the status per job
query_data = self.parse_long_output(out)
# some jobs might already be in the condor history, so query for missing job ids
missing_ids = [_job_id for _job_id in job_ids if _job_id not in query_data]
if missing_ids:
# condor_q ClassAds to get
h_ads = "ClusterId ProcId JobStatus ExitCode ExitStatus HoldReason RemoveReason MemoryUsage LastRemoteHost"
# build the condor_history command, which is fairly similar to the condor_q command
cmd = shlex.split(_cfg.get_expanded("job", "htcondor_cmd_history"))
cmd += missing_ids
if pool:
cmd += ["-pool", pool]
if scheduler:
cmd += ["-name", scheduler]
cmd += ["-af:lng"] + h_ads.split()
# since v8.3.3 one can limit the number of jobs to query
if self.htcondor_ge_v833:
cmd += ["-limit", str(len(missing_ids))]
# optionally prepend timeout
if query_timeout:
cmd = self.prepend_timeout_command(cmd, query_timeout_sec)
# run it
cmd_str = quote_cmd(cmd)
logger.debug(f"query htcondor job history with command '{cmd_str}'")
code, out, err = interruptable_popen( # type: ignore[assignment]
cmd,
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
kill_timeout=2,
processes=_processes,
)
# handle errors
if code != 0:
if silent:
return None
raise Exception(
f"history query of htcondor job(s) '{job_id}' failed with code {code}:\n{err}",
)
# parse the output and update query data
query_data.update(self.parse_long_output(out))
# compare to the requested job ids and perform some checks
for _job_id in job_ids:
if _job_id not in query_data:
if not chunking:
if silent:
return None
raise Exception(f"htcondor job(s) '{job_id}' not found in query response")
query_data[_job_id] = self.job_status_dict(
job_id=_job_id,
status=self.FAILED,
error="job not found in query response",
)
return query_data if chunking else query_data[job_id] # type: ignore[index]
@classmethod
def parse_long_output(cls, out: str) -> dict[str, dict[str, Any]]:
# retrieve information per block mapped to the job id
query_data = {}
for block in out.strip().split("\n\n"):
data = dict(cls.long_block_cre.findall(block + "\n"))
if not data:
continue
# build the job id
if "ClusterId" not in data and "ProcId" not in data:
continue
job_id = "{ClusterId}.{ProcId}".format(**data)
# get the job status code
status = cls.map_status(data.get("JobStatus"))
# get the exit code
code = 0
for key in ["ExitCode", "ExitStatus"]:
if data.get(key, "undefined").isdigit():
code = int(data[key])
break
# get the error message, undefined counts as None
error = data.get("HoldReason", "undefined")
if error.lower() == "undefined":
error = None
remove_error = data.get("RemoveReason", "undefined")
if remove_error.lower() == "undefined":
remove_error = None
# prefer remove error
if remove_error:
error = remove_error
# handle inconsistencies between status, code and the presence of an error message
if code != 0:
if status != cls.FAILED:
status = cls.FAILED
if not error:
error = f"job status set to '{cls.FAILED}' due to non-zero exit code {code}"
# extra info
extra = {}
if "MemoryUsage" in data:
mem = float(data["MemoryUsage"]) if data.get("MemoryUsage", "undefined").isdigit() else None
extra["mem_peak_mb"] = mem
if "RemoteHost" in data:
extra["remote_host"] = data["RemoteHost"]
elif "LastRemoteHost" in data:
extra["remote_host"] = data["LastRemoteHost"]
# store it
query_data[job_id] = cls.job_status_dict(
job_id=job_id,
status=status,
code=code,
error=error,
extra=extra or None,
)
return query_data
@classmethod
def map_status(cls, status_flag: str | None) -> str:
# see http://pages.cs.wisc.edu/~adesmet/status.html
if status_flag in ("0", "1", "U", "I"):
return cls.PENDING
if status_flag in ("2", "R"):
return cls.RUNNING
if status_flag in ("4", "C"):
return cls.FINISHED
if status_flag in ("5", "6", "H", "E"):
return cls.FAILED
logger.debug(f"unknown htcondor job state '{status_flag}'")
return cls.FAILED
[docs]
class HTCondorJobFileFactory(BaseJobFileFactory):
config_attrs = BaseJobFileFactory.config_attrs + [
"file_name", "command", "executable", "arguments", "input_files", "output_files", "log",
"stdout", "stderr", "postfix_output_files", "postfix", "universe", "notification",
"custom_content", "absolute_paths",
]
def __init__(
self,
file_name: str = "htcondor_job.jdl",
command: str | Sequence[str] | None = None,
executable: str | None = None,
arguments: str | Sequence[str] | None = None,
input_files: dict[str, str | pathlib.Path | JobInputFile] | None = None,
output_files: dict[str | pathlib.Path, str | pathlib.Path] | None = None,
log: str = "log.txt",
stdout: str = "stdout.txt",
stderr: str = "stderr.txt",
postfix_output_files: bool = True,
postfix: str | None = None,
universe: str = "vanilla",
notification: str = "Never",
custom_content: str | Sequence[str] | None = None,
absolute_paths: bool = False,
**kwargs,
) -> None:
# get some default kwargs from the config
if kwargs.get("dir") is None:
kwargs["dir"] = _cfg.get_expanded(
"job",
_cfg.find_option("job", "htcondor_job_file_dir", "job_file_dir"),
)
if kwargs.get("mkdtemp") is None:
kwargs["mkdtemp"] = _cfg.get_expanded_bool(
"job",
_cfg.find_option("job", "htcondor_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"),
force_type=False,
)
if kwargs.get("cleanup") is None:
kwargs["cleanup"] = _cfg.get_expanded_bool(
"job",
_cfg.find_option("job", "htcondor_job_file_dir_cleanup", "job_file_dir_cleanup"),
)
super().__init__(**kwargs)
self.file_name = file_name
self.command = command
self.executable = executable
self.arguments = arguments
self.input_files = input_files or {}
self.output_files = output_files or {}
self.log = log
self.stdout = stdout
self.stderr = stderr
self.postfix_output_files = postfix_output_files
self.postfix = postfix
self.universe = universe
self.notification = notification
self.custom_content = custom_content
self.absolute_paths = absolute_paths
[docs]
def create(
self,
grouped_submission: bool = False,
**kwargs,
) -> tuple[str, HTCondorJobFileFactory.Config]:
# merge kwargs and instance attributes
c = self.get_config(**kwargs)
# some sanity checks
if not c.file_name:
raise ValueError("file_name must not be empty")
if not c.arguments:
raise ValueError("arguments must not be empty")
c.arguments = make_list(c.arguments)
if grouped_submission and c.postfix:
c.postfix = make_list(c.postfix)
if len(c.postfix) != len(c.arguments):
raise ValueError("number of postfixes does not match the number of arguments")
if c.postfix_output_files and not c.postfix:
raise ValueError("postfix must not be empty when postfix_output_files is set")
if not c.command and not c.executable:
raise ValueError("either command or executable must not be empty")
if not c.universe:
raise ValueError("universe must not be empty")
# ensure that output_files is a dict mapping remote paths on the job node
# to local paths on the submission node
# (relative local paths will be resolved relative to the initial dir)
c.output_files = {
str(k): str(v)
for k, v in (
c.output_files.items()
if isinstance(c.output_files, dict)
else zip(c.output_files, c.output_files)
)
}
# ensure that the custom log file is an output file
if c.custom_log_file:
c.custom_log_file = str(c.custom_log_file)
custom_log_file_base = os.path.basename(c.custom_log_file)
if custom_log_file_base not in c.output_files:
c.output_files[custom_log_file_base] = c.custom_log_file
c.custom_log_file = custom_log_file_base
# postfix certain output files
postfix = "$(law_job_postfix)" if grouped_submission else c.postfix
if c.postfix_output_files:
skip_postfix_cre = re.compile(r"^(/dev/).*$")
skip_postfix = lambda s: bool(skip_postfix_cre.match(str(s)))
add_postfix = lambda s: s if skip_postfix(s) else self.postfix_output_file(s, postfix)
c.output_files = {add_postfix(k): add_postfix(v) for k, v in c.output_files.items()}
for attr in ["log", "stdout", "stderr", "custom_log_file"]:
if c[attr]:
c[attr] = add_postfix(c[attr])
# ensure that all input files are JobInputFile objects
c.input_files = {
key: JobInputFile(f)
for key, f in c.input_files.items()
}
# ensure that the executable is an input file, remember the key to access it
if c.executable:
executable_keys = [
k
for k, v in c.input_files.items()
if get_path(v) == get_path(c.executable)
]
if executable_keys:
executable_key = executable_keys[0]
else:
executable_key = "executable_file"
c.input_files[executable_key] = JobInputFile(c.executable)
# prepare input files
def prepare_input(f):
# when not copied or forwarded, just return the absolute, original path
abs_path = os.path.abspath(f.path)
if not f.copy or f.forward:
return abs_path
# copy the file
abs_path = self.provide_input(
src=abs_path,
postfix=c.postfix if f.postfix and not f.share and not grouped_submission else None,
dir=c.dir,
skip_existing=f.share,
increment_existing=f.increment and not f.share and grouped_submission,
)
return abs_path
# absolute input paths
for key, f in c.input_files.items():
f.path_sub_abs = prepare_input(f)
# input paths relative to the submission or initial dir
# forwarded files are skipped as they are not treated as normal inputs
for key, f in c.input_files.items():
if f.forward:
continue
f.path_sub_rel = (
os.path.basename(f.path_sub_abs)
if f.copy and not c.absolute_paths else
f.path_sub_abs
)
# input paths as seen by the job, before and after potential rendering
for key, f in c.input_files.items():
f.path_job_pre_render = (
f.path_sub_abs
if f.forward else
os.path.basename(f.path_sub_abs)
)
f.path_job_post_render = (
f.path_sub_abs
if f.forward and not f.render_job else
os.path.basename(f.path_sub_abs)
)
# update files in render variables with version after potential rendering
c.render_variables.update({
key: f.path_job_post_render
for key, f in c.input_files.items()
})
# add space separated input files before potential rendering to render variables
c.render_variables["input_files"] = " ".join(
f.path_job_pre_render
for f in c.input_files.values()
)
# add space separated list of input files for rendering
c.render_variables["input_files_render"] = " ".join(
f.path_job_pre_render
for f in c.input_files.values()
if f.render_job
)
# add the custom log file to render variables
if c.custom_log_file:
c.render_variables["log_file"] = c.custom_log_file
# add the file postfix to render variables
# (this is done in the wrapper script for grouped submission)
if not grouped_submission and c.postfix and "file_postfix" not in c.render_variables:
c.render_variables["file_postfix"] = c.postfix
# inject arguments into the htcondor wrapper via render variables
if grouped_submission:
c.render_variables["htcondor_job_arguments_map"] = ("\n" + 8 * " ").join(
f"['{job_num}']=\"{args}\""
for job_num, args in enumerate(c.arguments, 1)
)
# linearize render variables
render_variables = self.linearize_render_variables(
c.render_variables,
drop_base64_keys=["htcondor_job_arguments_map"],
)
# prepare the job description file
job_file = os.path.join(c.dir, str(c.file_name))
if not grouped_submission:
job_file = self.postfix_input_file(job_file, c.postfix)
# render copied, non-forwarded input files
for key, f in c.input_files.items():
if not f.copy or f.forward or not f.render_local:
continue
self.render_file(
f.path_sub_abs,
f.path_sub_abs,
render_variables,
postfix=c.postfix if not grouped_submission and f.postfix else None,
)
# prepare the executable when given
if c.executable:
c.executable = get_path(c.input_files[executable_key].path_job_post_render)
# make the file executable for the user and group
path = os.path.join(c.dir, os.path.basename(c.executable))
if os.path.exists(path):
os.chmod(path, os.stat(path).st_mode | stat.S_IXUSR | stat.S_IXGRP)
# helper to encode lists
def encode_list(items: Any, sep: str = " ", quote: bool = True) -> str:
items = make_list(items)
s = sep.join(map(str, items))
if quote:
s = "\"{s}\""
return s
# helper to encode dicts
def encode_dict(d: dict, sep: str = " ; ", quote: bool = True) -> str:
s = sep.join(f"{k} = {v}" for k, v in d.items())
if quote:
s = f"\"{s}\"" # noqa: Q003
return s
# job file content
content: list[str | tuple[str, Any]] = []
content.append(("universe", c.universe))
output_remaps = {}
if c.command:
cmd = quote_cmd(c.command) if isinstance(c.command, (list, tuple)) else c.command
content.append(("executable", cmd))
else:
content.append(("executable", c.executable))
if c.log:
content.append(("log", c.log))
if c.stdout:
c.stdout = str(c.stdout)
stdout_base = os.path.basename(c.stdout)
content.append(("output", stdout_base))
if stdout_base != c.stdout:
output_remaps[stdout_base] = c.stdout
if c.stderr:
c.stderr = str(c.stderr)
stderr_base = os.path.basename(c.stderr)
content.append(("error", stderr_base))
if stderr_base != c.stderr:
output_remaps[stderr_base] = c.stderr
if c.input_files or c.output_files:
content.append(("should_transfer_files", "YES"))
if c.input_files:
content.append(("transfer_input_files", encode_list(
make_unique(
f.path_sub_rel
for f in c.input_files.values()
if f.path_sub_rel
),
sep=",",
quote=False,
)))
if c.output_files:
content.append(("transfer_output_files", encode_list(
c.output_files.keys(),
sep=",",
quote=False,
)))
# add mapping to local paths when different
output_remaps.update({
remote_path: local_path
for remote_path, local_path in c.output_files.items()
if remote_path != local_path
})
content.append(("when_to_transfer_output", "ON_EXIT"))
if output_remaps:
content.append(("transfer_output_remaps", encode_dict(output_remaps)))
if c.notification:
content.append(("notification", c.notification))
# add custom content
if c.custom_content:
content += c.custom_content
# add htcondor specific env variables
env_vars = []
_content = []
for obj in content:
if isinstance(obj, tuple) and len(obj) == 2 and obj[0].lower() == "environment":
env_vars.append(obj[1].strip("\"")) # noqa: Q003
else:
_content.append(obj)
content = _content
# add new ones and add back to content
env_vars.append("LAW_HTCONDOR_JOB_CLUSTER=$(Cluster)")
env_vars.append("LAW_HTCONDOR_JOB_PROCESS=$(Process)")
content.append(("environment", encode_list(env_vars, sep=" ", quote=True)))
# queue
if grouped_submission:
content.append("queue law_job_postfix, arguments from (")
for i in range(len(c.arguments)):
pf = log = "''"
if c.postfix_output_files:
pf = c.postfix[i]
if c.custom_log_file:
log = c.custom_log_file
content.append(f" {pf}, {pf} {log}")
content.append(")")
elif c.arguments:
for _arguments in c.arguments:
content.append(("arguments", _arguments))
content.append("queue")
else:
content.append("queue")
# write the job file
with open(job_file, "w") as f:
for obj in content:
line = self.create_line(*make_list(obj))
f.write(f"{line}\n")
logger.debug(f"created htcondor job file at '{job_file}'")
return job_file, c
@classmethod
def create_line(cls, key: str, value: Any | None = None) -> str:
if value is None:
return str(key)
return f"{key} = {value}"