law.job.base#
Base classes for implementing remote job management and job file creation.
Class BaseJobManager
#
- class BaseJobManager(status_names=None, status_diff_styles=None, threads=1)[source]#
Bases:
object
Base class that defines how remote jobs are submitted, queried, cancelled and cleaned up. It also defines the most common job states:
PENDING: The job is submitted and waiting to be processed.
RUNNUNG: The job is running.
FINISHED: The job is completed and successfully finished.
RETRY: The job is completed but failed. It can be resubmitted.
FAILED: The job is completed but failed. It cannot or should not be recovered.
The particular job manager implementation should match its own, native states to these common states.
status_names and status_diff_styles are used in
status_line()
and default todefault_status_names
anddefault_status_diff_styles
. threads is the default number of concurrent threads that are used insubmit_batch()
,cancel_batch()
,cleanup_batch()
andquery_batch()
.- classattribute PENDING#
type: string
Flag that represents the
PENDING
status.
- classattribute RUNNING#
type: string
Flag that represents the
RUNNING
status.
- classattribute FINISHED#
type: string
Flag that represents the
FINISHED
status.
- classattribute RETRY#
type: string
Flag that represents the
RETRY
status.
- classattribute FAILED#
type: string
Flag that represents the
FAILED
status.
- classattribute default_status_names#
type: list
The list of all default status flags that is used in
status_line()
.
- classattribute default_status_diff_styles#
type: dict
A dictionary that defines to coloring styles per job status that is used in
status_line()
.
- classattribute job_grouping#
type: bool
Whether this manager implementation groups jobs into single interactions for submission and status queries. In general, this means that the submission of a single job file can result in multiple jobs on the remote batch system.
- classattribute chunk_size_submit#
type: int
The default chunk size value when no value is given in
submit_batch()
. When the value evaluates to False, no chunking is allowed.
- classattribute chunk_size_cancel#
type: int
The default chunk size value when no value is given in
cancel_batch()
. When the value evaluates to False, no chunking is allowed.
- classattribute chunk_size_cleanup#
type: int
The default chunk size value when no value is given in
cleanup_batch()
. When the value evaluates to False, no chunking is allowed.
- classattribute chunk_size_query#
type: int
The default chunk size value when no value is given in
query_batch()
. When the value evaluates to False, no chunking is allowed.
- classmethod job_status_dict(job_id=None, status=None, code=None, error=None, extra=None)[source]#
Returns a dictionay that describes the status of a job given its job_id, status, return code, error, and additional extra data.
- classmethod cast_job_id(job_id)[source]#
Hook for casting an input job_id, for instance, after loading serialized data from json.
- abstract submit()[source]#
Abstract atomic or group job submission. Can throw exceptions. Should return a list of job ids.
- abstract cancel()[source]#
Abstract atomic or group job cancellation. Can throw exceptions. Should return a dictionary mapping job ids to per-job return values.
- abstract cleanup()[source]#
Abstract atomic or group job cleanup. Can throw exceptions. Should return a dictionary mapping job ids to per-job return values.
- abstract query()[source]#
Abstract atomic or group job status query. Can throw exceptions. Should return a dictionary mapping job ids to per-job return values.
- group_job_ids(job_ids)[source]#
Hook that needs to be implemented if the job mananger supports grouping of jobs, i.e., when
job_grouping
is True, and potentially used during status queries, job cancellation and removal. If so, it should take a sequence of job_ids and return a dictionary mapping ids of group jobs (used for queries etc) to the corresponding lists of original job ids, with an arbitrary grouping mechanism.
- submit_batch(job_files, threads=None, chunk_size=None, callback=None, **kwargs)[source]#
Submits a batch of jobs given by job_files via a thread pool of size threads which defaults to its instance attribute. When chunk_size, which defaults to
chunk_size_submit
, is not negative, job_files are split into chunks of that size which are passed tosubmit()
.When callback is set, it is invoked after each successful job submission with the index of the corresponding job file (starting at 0) and either the assigned job id or an exception if any occurred. All other kwargs are passed to
submit()
.The return value is a list containing the return values of the particular
submit()
calls, in an order that corresponds to job_files. When an exception was raised during a submission, this exception is added to the returned list.
- cancel_batch(job_ids, threads=None, chunk_size=None, callback=None, **kwargs)[source]#
Cancels a batch of jobs given by job_ids via a thread pool of size threads which defaults to its instance attribute. When chunk_size, which defaults to
chunk_size_cancel
, is not negative, job_ids are split into chunks of that size which are passed tocancel()
.When callback is set, it is invoked after each successful job (or job chunk) cancelling with the index of the corresponding job id (starting at 0) and either None or an exception if any occurred. All other kwargs are passed to
cancel()
.Exceptions that occured during job cancelling are stored in a list and returned. An empty list means that no exceptions occured.
- cleanup_batch(job_ids, threads=None, chunk_size=None, callback=None, **kwargs)[source]#
Cleans up a batch of jobs given by job_ids via a thread pool of size threads which defaults to its instance attribute. When chunk_size, which defaults to
chunk_size_cleanup
, is not negative, job_ids are split into chunks of that size which are passed tocleanup()
.When callback is set, it is invoked after each successful job (or job chunk) cleaning with the index of the corresponding job id (starting at 0) and either None or an exception if any occurred. All other kwargs are passed to
cleanup()
.Exceptions that occured during job cleaning are stored in a list and returned. An empty list means that no exceptions occured.
- query_batch(job_ids, threads=None, chunk_size=None, callback=None, **kwargs)[source]#
Queries the status of a batch of jobs given by job_ids via a thread pool of size threads which defaults to its instance attribute. When chunk_size, which defaults to
chunk_size_query
, is not negative, job_ids are split into chunks of that size which are passed toquery()
.When callback is set, it is invoked after each successful job (or job chunk) status query with the index of the corresponding job id (starting at 0) and the obtained status query data or an exception if any occurred. All other kwargs are passed to
query()
.This method returns a dictionary that maps job ids to either the status query data or to an exception if any occurred.
- submit_group(job_files, threads=None, callback=None, **kwargs)[source]#
Submits several job groups given by job_files via a thread pool of size threads which defaults to its instance attribute. As per the definition of a job group, a single job file can result in multiple jobs being processed on the remote batch system.
When callback is set, it is invoked after each successful job submission with the index of the corresponding job (starting at 0) and either the assigned job id or an exception if any occurred. All other kwargs are passed to
submit()
.The return value is a list containing the return values of the particular
submit()
calls, in an order that in general corresponds job_files, with ids of single jobs per job file properly expanded. When an exception was raised during a submission, this exception is added to the returned list.
- cancel_group(job_ids, threads=None, callback=None, **kwargs)[source]#
Takes several job_ids, groups them according to
group_job_ids()
, and cancels all groups simultaneously via a thread pool of size threads which defaults to its instance attribute.When callback is set, it is invoked after each successful job cancellation with the index of the corresponding job id (starting at 0) and either None or an exception if any occurred. All other kwargs are passed to
cancel()
.Exceptions that occured during job cancelling are stored in a list and returned. An empty list means that no exceptions occured.
- cleanup_group(job_ids, threads=None, callback=None, **kwargs)[source]#
Takes several job_ids, groups them according to
group_job_ids()
, and cleans up all groups simultaneously via a thread pool of size threads which defaults to its instance attribute.When callback is set, it is invoked after each successful job cleanup with the index of the corresponding job id (starting at 0) and either None or an exception if any occurred. All other kwargs are passed to
cleanup()
.Exceptions that occured during job cancelling are stored in a list and returned. An empty list means that no exceptions occured.
- query_group(job_ids, threads=None, callback=None, **kwargs)[source]#
Takes several job_ids, groups them according to
group_job_ids()
, and queries the status of all groups simultaneously via a thread pool of size threads which defaults to its instance attribute.When callback is set, it is invoked after each successful job status query with the index of the corresponding job id (starting at 0) and the obtained status query data or an exception if any occurred. All other kwargs are passed to
query()
.This method returns a dictionary that maps job ids to either the status query data or to an exception if any occurred.
- status_line(counts, last_counts=None, sum_counts=None, timestamp=True, align=False, color=False)[source]#
Returns a job status line containing job counts per status. When last_counts is True, the status line also contains the differences in job counts with respect to the counts from the previous call to this method. When you pass a list or tuple, those values are used intead to compute the differences.
The status line starts with the sum of jobs which is inferred from counts. When you want to use a custom value, set sum_counts. The length of counts should match the length of status_names of this instance. When timestamp is True, the status line begins with the current timestamp. When timestamp is a non-empty string, it is used as the
strftime
format.align handles the alignment of the values in the status line by using a maximum width. True will result in the default width of 4. When align evaluates to False, no alignment is used. By default, some elements of the status line are colored. Set color to False to disable this feature.
Example:
status_line((2, 0, 0, 0, 0)) # 12:45:18: all: 2, pending: 2, running: 0, finished: 0, retry: 0, failed: 0 status_line((0, 2, 0, 0), last_counts=(2, 0, 0, 0), skip=["retry"], timestamp=False) # all: 2, pending: 0 (-2), running: 2 (+2), finished: 2 (+0), failed: 0 (+0)
Class BaseJobFileFactory
#
- class BaseJobFileFactory(dir=None, render_variables=None, custom_log_file=None, mkdtemp=None, cleanup=None)[source]#
Bases:
object
Base class that handles the creation of job files. It is likely that inheriting classes only need to implement the
create()
method as well as extend the constructor to handle additional arguments.The general idea behind this class is as follows. An instance holds the path to a directory dir, defaulting to a new, temporary directory inside
job.job_file_dir
(which itself defaults to the system’s tmp path). Job input files, which are supported by almost all job / batch systems, are automatically copied into this directory. The file name can be optionally postfixed with a configurable string, so that multiple job files can be created and stored within the same dir without the risk of interfering file names. A common use case would be the use of a job number or id. Another transformation that is applied to copied files is the rendering of variables. For example, when an input file looks like#!/usr/bin/env bash echo "Hello, {{my_variable}}!"
the rendering mechanism can replace variables such as
my_variable
following a double-brace notation. Internally, the rendering is implemented inrender_file()
, but there is usually no need to call this method directly as implementations of this base class might use it in theircreate()
method.- classmethod postfix_file(path, postfix=None, add_hash=False)[source]#
Adds a postfix to a file path, right before the first file extension in the base name. When add_hash is True, a hash based on the full source path is added before the postfix. Example:
postfix_file("/path/to/file.tar.gz", "_1") # -> "/path/to/file_1.tar.gz" postfix_file("/path/to/file.txt", "_1", add_hash=True) # -> "/path/to/file_dacc4374d3_1.txt"
postfix might also be a dictionary that maps patterns to actual postfix strings. When a pattern matches the base name of the file, the associated postfix is applied and the path is returned. You might want to use an ordered dictionary to control the first match.
- classmethod postfix_input_file(path, postfix=None)[source]#
Shorthand for
postfix_file()
with add_hash set to True.
- classmethod postfix_output_file(path, postfix=None)[source]#
Shorthand for
postfix_file()
with add_hash set to False.
- classmethod render_string(s, key, value)[source]#
Renders a string s by replacing
{{key}}
with value and returns it.
- classmethod linearize_render_variables(render_variables)[source]#
Linearizes variables contained in the dictionary render_variables. In some use cases, variables may contain render expressions pointing to other variables, e.g.:
render_variables = { "variable_a": "Tom", "variable_b": "Hello, {{variable_a}}!", }
Situations like this can be simplified by linearizing the variables:
linearize_render_variables(render_variables) # -> # { # "variable_a": "Tom", # "variable_b": "Hello, Tom!", # }
- classmethod render_file(src, dst, render_variables, postfix=None, silent=True)[source]#
Renders a source file src with render_variables and copies it to a new location dst. In some cases, a render variable value might contain a path that should be subject to file postfixing (see
postfix_file()
). When postfix is not None, this method will replace substrings in the formatpostfix:<path>
the postfixedpath
. In the following example, the variablemy_command
in src will be rendered with a string that contains a postfixed path:render_file(src, dst, {"my_command": "echo postfix:some/path.txt"}, postfix="_1") # replaces "{{my_command}}" in src with "echo some/path_1.txt" in dst
In case the file content is not readable, the method returns unless silent is False in which case an exception is raised.
- provide_input(src, postfix=None, dir=None, render_variables=None, skip_existing=False)[source]#
Convenience method that copies an input file to a target directory dir which defaults to the
dir
attribute of this instance. The provided file has the same basename, which is optionally postfixed with postfix. Essentially, this method callsrender_file()
when render_variables is set, or simplyshutil.copy2
otherwise. If the file to create is already existing, it is overwritten unless skip_existing is True.
- get_config(**kwargs)[source]#
The
create()
method potentially takes a lot of keywork arguments for configuring the content of job files. It is useful if some of these configuration values default to attributes that can be set via constructor arguments of this class.This method merges keyword arguments kwargs (e.g. passed to
create()
) with default values obtained from instance attributes given inconfig_attrs
. It returns the merged values in a dictionary that can be accessed via dot-notation (attribute notation). Example:class MyJobFileFactory(BaseJobFileFactory): config_attrs = ["stdout", "stderr"] def __init__(self, stdout="stdout.txt", stderr="stderr.txt", **kwargs): super(MyJobFileFactory, self).__init__(**kwargs) self.stdout = stdout self.stderr = stderr def create(self, **kwargs): config = self.get_config(kwargs) # when called as create(stdout="log.txt"): # config.stderr is "stderr.txt" # config.stdout is "log.txt" ...
Class JobInputFile
#
- class JobInputFile(path, copy=None, share=None, forward=None, postfix=None, render=None, render_local=None, render_job=None)[source]#
Bases:
object
Wrapper around a path referring to an input file of a job, accompanied by optional flags that control how the file should be handled during job submission (mostly within
BaseJobFileFactory.provide_input()
). See the attributs below for more info.- path#
type: str
The path of the input file.
- copy#
type: bool
Whether this file should be copied into the job submission directory or not.
type: bool
Whether the file can be shared in the job submission directory. A shared file is copied only once into the submission directory and
render_local
must be False.
- forward#
type: bool
Whether this file should actually not be listed as a normal input file in job description but just passed to the list of inputs for treatment in the law job script itself. Only considered if supported by the submission system (e.g. local ones such as htcondor or slurm).
- postfix#
type: bool
Whether the file path should be postfixed when copied.
- render_local#
type: bool
Whether render variables should be resolved locally when copied.
- render_job#
type: bool
Whether render variables should be resolved as part of the job script.
- is_remote#
type: bool (read-only)
Whether the path has a non-empty protocol referring to a remote resource.
- path_sub_abs#
type: str, None
Absolute file path as seen by the submission node. Set only during job file creation.
- path_sub_rel#
type: str, None
File path relative to the submission directory if the submission itself is not forced to use absolute paths. Otherwise identical to
path_sub_abs
. Set only during job file creation.
- path_job_pre_render#
type: str, None
File path as seen by the job node, prior to a potential job-side rendering. It is a full, absolute path in case forwarding is supported, and a relative basename otherwise. Set only during job file creation.
- path_job_post_render#
type: str, None
File path as seen by the job node, after a potential job-side rendering. Therefore, it is identical to
path_job_pre_render
if rendering is disabled, and a relative basename otherwise. Set only during job file creation.
Class JobArguments
#
- class JobArguments(task_cls, task_params, branches, workers=1, auto_retry=False, dashboard_data=None)[source]#
Bases:
object
Wrapper class for job arguments. Currently, it stores a task class task_cls, a list of task_params, a list of covered branches, an auto_retry flag, and custom dashboard_data. It also handles argument encoding as reqired by the job wrapper script at law/job/job.sh.
- task_cls#
type:
law.Register
The task class.
- task_params#
type: list
The list of task parameters.
- branches#
type: list
The list of branch numbers covered by the task.
- workers#
type: int
The number of workers to use in “law run” commands.
- auto_retry#
type: bool
A flag denoting if the job-internal automatic retry mechanism should be used.
- dashboard_data#
type: list
If a job dashboard is used, this is a list of configuration values as returned by
law.job.dashboard.BaseJobDashboard.remote_hook_data()
.
- get_args()[source]#
Returns the list of encoded job arguments. The order of this list corresponds to the arguments expected by the job wrapper script.
- join()[source]#
Returns the list of job arguments from
get_args()
, joined into a single string using a single space character.