law.job.base

Contents

law.job.base#

Base classes for implementing remote job management and job file creation.

Class BaseJobManager#

class BaseJobManager(status_names=None, status_diff_styles=None, threads=1)[source]#

Bases: object

Base class that defines how remote jobs are submitted, queried, cancelled and cleaned up. It also defines the most common job states:

  • PENDING: The job is submitted and waiting to be processed.

  • RUNNUNG: The job is running.

  • FINISHED: The job is completed and successfully finished.

  • RETRY: The job is completed but failed. It can be resubmitted.

  • FAILED: The job is completed but failed. It cannot or should not be recovered.

The particular job manager implementation should match its own, native states to these common states.

status_names and status_diff_styles are used in status_line() and default to default_status_names and default_status_diff_styles. threads is the default number of concurrent threads that are used in submit_batch(), cancel_batch(), cleanup_batch() and query_batch().

classattribute PENDING#

type: string

Flag that represents the PENDING status.

classattribute RUNNING#

type: string

Flag that represents the RUNNING status.

classattribute FINISHED#

type: string

Flag that represents the FINISHED status.

classattribute RETRY#

type: string

Flag that represents the RETRY status.

classattribute FAILED#

type: string

Flag that represents the FAILED status.

classattribute default_status_names#

type: list

The list of all default status flags that is used in status_line().

classattribute default_status_diff_styles#

type: dict

A dictionary that defines to coloring styles per job status that is used in status_line().

classattribute job_grouping#

type: bool

Whether this manager implementation groups jobs into single interactions for submission and status queries. In general, this means that the submission of a single job file can result in multiple jobs on the remote batch system.

classattribute chunk_size_submit#

type: int

The default chunk size value when no value is given in submit_batch(). When the value evaluates to False, no chunking is allowed.

classattribute chunk_size_cancel#

type: int

The default chunk size value when no value is given in cancel_batch(). When the value evaluates to False, no chunking is allowed.

classattribute chunk_size_cleanup#

type: int

The default chunk size value when no value is given in cleanup_batch(). When the value evaluates to False, no chunking is allowed.

classattribute chunk_size_query#

type: int

The default chunk size value when no value is given in query_batch(). When the value evaluates to False, no chunking is allowed.

classmethod job_status_dict(job_id=None, status=None, code=None, error=None, extra=None)[source]#

Returns a dictionay that describes the status of a job given its job_id, status, return code, error, and additional extra data.

classmethod cast_job_id(job_id)[source]#

Hook for casting an input job_id, for instance, after loading serialized data from json.

abstract submit()[source]#

Abstract atomic or group job submission. Can throw exceptions. Should return a list of job ids.

abstract cancel()[source]#

Abstract atomic or group job cancellation. Can throw exceptions. Should return a dictionary mapping job ids to per-job return values.

abstract cleanup()[source]#

Abstract atomic or group job cleanup. Can throw exceptions. Should return a dictionary mapping job ids to per-job return values.

abstract query()[source]#

Abstract atomic or group job status query. Can throw exceptions. Should return a dictionary mapping job ids to per-job return values.

group_job_ids(job_ids)[source]#

Hook that needs to be implemented if the job mananger supports grouping of jobs, i.e., when job_grouping is True, and potentially used during status queries, job cancellation and removal. If so, it should take a sequence of job_ids and return a dictionary mapping ids of group jobs (used for queries etc) to the corresponding lists of original job ids, with an arbitrary grouping mechanism.

submit_batch(job_files, threads=None, chunk_size=None, callback=None, **kwargs)[source]#

Submits a batch of jobs given by job_files via a thread pool of size threads which defaults to its instance attribute. When chunk_size, which defaults to chunk_size_submit, is not negative, job_files are split into chunks of that size which are passed to submit().

When callback is set, it is invoked after each successful job submission with the index of the corresponding job file (starting at 0) and either the assigned job id or an exception if any occurred. All other kwargs are passed to submit().

The return value is a list containing the return values of the particular submit() calls, in an order that corresponds to job_files. When an exception was raised during a submission, this exception is added to the returned list.

cancel_batch(job_ids, threads=None, chunk_size=None, callback=None, **kwargs)[source]#

Cancels a batch of jobs given by job_ids via a thread pool of size threads which defaults to its instance attribute. When chunk_size, which defaults to chunk_size_cancel, is not negative, job_ids are split into chunks of that size which are passed to cancel().

When callback is set, it is invoked after each successful job (or job chunk) cancelling with the index of the corresponding job id (starting at 0) and either None or an exception if any occurred. All other kwargs are passed to cancel().

Exceptions that occured during job cancelling are stored in a list and returned. An empty list means that no exceptions occured.

cleanup_batch(job_ids, threads=None, chunk_size=None, callback=None, **kwargs)[source]#

Cleans up a batch of jobs given by job_ids via a thread pool of size threads which defaults to its instance attribute. When chunk_size, which defaults to chunk_size_cleanup, is not negative, job_ids are split into chunks of that size which are passed to cleanup().

When callback is set, it is invoked after each successful job (or job chunk) cleaning with the index of the corresponding job id (starting at 0) and either None or an exception if any occurred. All other kwargs are passed to cleanup().

Exceptions that occured during job cleaning are stored in a list and returned. An empty list means that no exceptions occured.

query_batch(job_ids, threads=None, chunk_size=None, callback=None, **kwargs)[source]#

Queries the status of a batch of jobs given by job_ids via a thread pool of size threads which defaults to its instance attribute. When chunk_size, which defaults to chunk_size_query, is not negative, job_ids are split into chunks of that size which are passed to query().

When callback is set, it is invoked after each successful job (or job chunk) status query with the index of the corresponding job id (starting at 0) and the obtained status query data or an exception if any occurred. All other kwargs are passed to query().

This method returns a dictionary that maps job ids to either the status query data or to an exception if any occurred.

submit_group(job_files, threads=None, callback=None, **kwargs)[source]#

Submits several job groups given by job_files via a thread pool of size threads which defaults to its instance attribute. As per the definition of a job group, a single job file can result in multiple jobs being processed on the remote batch system.

When callback is set, it is invoked after each successful job submission with the index of the corresponding job (starting at 0) and either the assigned job id or an exception if any occurred. All other kwargs are passed to submit().

The return value is a list containing the return values of the particular submit() calls, in an order that in general corresponds job_files, with ids of single jobs per job file properly expanded. When an exception was raised during a submission, this exception is added to the returned list.

cancel_group(job_ids, threads=None, callback=None, **kwargs)[source]#

Takes several job_ids, groups them according to group_job_ids(), and cancels all groups simultaneously via a thread pool of size threads which defaults to its instance attribute.

When callback is set, it is invoked after each successful job cancellation with the index of the corresponding job id (starting at 0) and either None or an exception if any occurred. All other kwargs are passed to cancel().

Exceptions that occured during job cancelling are stored in a list and returned. An empty list means that no exceptions occured.

cleanup_group(job_ids, threads=None, callback=None, **kwargs)[source]#

Takes several job_ids, groups them according to group_job_ids(), and cleans up all groups simultaneously via a thread pool of size threads which defaults to its instance attribute.

When callback is set, it is invoked after each successful job cleanup with the index of the corresponding job id (starting at 0) and either None or an exception if any occurred. All other kwargs are passed to cleanup().

Exceptions that occured during job cancelling are stored in a list and returned. An empty list means that no exceptions occured.

query_group(job_ids, threads=None, callback=None, **kwargs)[source]#

Takes several job_ids, groups them according to group_job_ids(), and queries the status of all groups simultaneously via a thread pool of size threads which defaults to its instance attribute.

When callback is set, it is invoked after each successful job status query with the index of the corresponding job id (starting at 0) and the obtained status query data or an exception if any occurred. All other kwargs are passed to query().

This method returns a dictionary that maps job ids to either the status query data or to an exception if any occurred.

status_line(counts, last_counts=None, sum_counts=None, timestamp=True, align=False, color=False)[source]#

Returns a job status line containing job counts per status. When last_counts is True, the status line also contains the differences in job counts with respect to the counts from the previous call to this method. When you pass a list or tuple, those values are used intead to compute the differences.

The status line starts with the sum of jobs which is inferred from counts. When you want to use a custom value, set sum_counts. The length of counts should match the length of status_names of this instance. When timestamp is True, the status line begins with the current timestamp. When timestamp is a non-empty string, it is used as the strftime format.

align handles the alignment of the values in the status line by using a maximum width. True will result in the default width of 4. When align evaluates to False, no alignment is used. By default, some elements of the status line are colored. Set color to False to disable this feature.

Example:

status_line((2, 0, 0, 0, 0))
# 12:45:18: all: 2, pending: 2, running: 0, finished: 0, retry: 0, failed: 0

status_line((0, 2, 0, 0), last_counts=(2, 0, 0, 0), skip=["retry"], timestamp=False)
# all: 2, pending: 0 (-2), running: 2 (+2), finished: 2 (+0), failed: 0 (+0)

Class BaseJobFileFactory#

class BaseJobFileFactory(dir=None, render_variables=None, custom_log_file=None, mkdtemp=None, cleanup=None)[source]#

Bases: object

Base class that handles the creation of job files. It is likely that inheriting classes only need to implement the create() method as well as extend the constructor to handle additional arguments.

The general idea behind this class is as follows. An instance holds the path to a directory dir, defaulting to a new, temporary directory inside job.job_file_dir (which itself defaults to the system’s tmp path). Job input files, which are supported by almost all job / batch systems, are automatically copied into this directory. The file name can be optionally postfixed with a configurable string, so that multiple job files can be created and stored within the same dir without the risk of interfering file names. A common use case would be the use of a job number or id. Another transformation that is applied to copied files is the rendering of variables. For example, when an input file looks like

#!/usr/bin/env bash

echo "Hello, {{my_variable}}!"

the rendering mechanism can replace variables such as my_variable following a double-brace notation. Internally, the rendering is implemented in render_file(), but there is usually no need to call this method directly as implementations of this base class might use it in their create() method.

classmethod postfix_file(path, postfix=None, add_hash=False)[source]#

Adds a postfix to a file path, right before the first file extension in the base name. When add_hash is True, a hash based on the full source path is added before the postfix. Example:

postfix_file("/path/to/file.tar.gz", "_1")
# -> "/path/to/file_1.tar.gz"

postfix_file("/path/to/file.txt", "_1", add_hash=True)
# -> "/path/to/file_dacc4374d3_1.txt"

postfix might also be a dictionary that maps patterns to actual postfix strings. When a pattern matches the base name of the file, the associated postfix is applied and the path is returned. You might want to use an ordered dictionary to control the first match.

classmethod postfix_input_file(path, postfix=None)[source]#

Shorthand for postfix_file() with add_hash set to True.

classmethod postfix_output_file(path, postfix=None)[source]#

Shorthand for postfix_file() with add_hash set to False.

classmethod render_string(s, key, value)[source]#

Renders a string s by replacing {{key}} with value and returns it.

classmethod linearize_render_variables(render_variables)[source]#

Linearizes variables contained in the dictionary render_variables. In some use cases, variables may contain render expressions pointing to other variables, e.g.:

render_variables = {
    "variable_a": "Tom",
    "variable_b": "Hello, {{variable_a}}!",
}

Situations like this can be simplified by linearizing the variables:

linearize_render_variables(render_variables)
# ->
# {
#     "variable_a": "Tom",
#     "variable_b": "Hello, Tom!",
# }
classmethod render_file(src, dst, render_variables, postfix=None, silent=True)[source]#

Renders a source file src with render_variables and copies it to a new location dst. In some cases, a render variable value might contain a path that should be subject to file postfixing (see postfix_file()). When postfix is not None, this method will replace substrings in the format postfix:<path> the postfixed path. In the following example, the variable my_command in src will be rendered with a string that contains a postfixed path:

render_file(src, dst, {"my_command": "echo postfix:some/path.txt"}, postfix="_1")
# replaces "{{my_command}}" in src with "echo some/path_1.txt" in dst

In case the file content is not readable, the method returns unless silent is False in which case an exception is raised.

provide_input(src, postfix=None, dir=None, render_variables=None, skip_existing=False)[source]#

Convenience method that copies an input file to a target directory dir which defaults to the dir attribute of this instance. The provided file has the same basename, which is optionally postfixed with postfix. Essentially, this method calls render_file() when render_variables is set, or simply shutil.copy2 otherwise. If the file to create is already existing, it is overwritten unless skip_existing is True.

get_config(**kwargs)[source]#

The create() method potentially takes a lot of keywork arguments for configuring the content of job files. It is useful if some of these configuration values default to attributes that can be set via constructor arguments of this class.

This method merges keyword arguments kwargs (e.g. passed to create()) with default values obtained from instance attributes given in config_attrs. It returns the merged values in a dictionary that can be accessed via dot-notation (attribute notation). Example:

class MyJobFileFactory(BaseJobFileFactory):

    config_attrs = ["stdout", "stderr"]

    def __init__(self, stdout="stdout.txt", stderr="stderr.txt", **kwargs):
        super(MyJobFileFactory, self).__init__(**kwargs)

        self.stdout = stdout
        self.stderr = stderr

    def create(self, **kwargs):
        config = self.get_config(kwargs)

        # when called as create(stdout="log.txt"):
        # config.stderr is "stderr.txt"
        # config.stdout is "log.txt"

        ...
cleanup_dir(force=True)[source]#

Removes the directory that is held by this instance. When force is False, the directory is only removed when cleanup is True.

abstract create(**kwargs)[source]#

Abstract job file creation method that must be implemented by inheriting classes.

Class JobInputFile#

class JobInputFile(path, copy=None, share=None, forward=None, postfix=None, render=None, render_local=None, render_job=None)[source]#

Bases: object

Wrapper around a path referring to an input file of a job, accompanied by optional flags that control how the file should be handled during job submission (mostly within BaseJobFileFactory.provide_input()). See the attributs below for more info.

path#

type: str

The path of the input file.

copy#

type: bool

Whether this file should be copied into the job submission directory or not.

share#

type: bool

Whether the file can be shared in the job submission directory. A shared file is copied only once into the submission directory and render_local must be False.

forward#

type: bool

Whether this file should actually not be listed as a normal input file in job description but just passed to the list of inputs for treatment in the law job script itself. Only considered if supported by the submission system (e.g. local ones such as htcondor or slurm).

postfix#

type: bool

Whether the file path should be postfixed when copied.

render_local#

type: bool

Whether render variables should be resolved locally when copied.

render_job#

type: bool

Whether render variables should be resolved as part of the job script.

is_remote#

type: bool (read-only)

Whether the path has a non-empty protocol referring to a remote resource.

path_sub_abs#

type: str, None

Absolute file path as seen by the submission node. Set only during job file creation.

path_sub_rel#

type: str, None

File path relative to the submission directory if the submission itself is not forced to use absolute paths. Otherwise identical to path_sub_abs. Set only during job file creation.

path_job_pre_render#

type: str, None

File path as seen by the job node, prior to a potential job-side rendering. It is a full, absolute path in case forwarding is supported, and a relative basename otherwise. Set only during job file creation.

path_job_post_render#

type: str, None

File path as seen by the job node, after a potential job-side rendering. Therefore, it is identical to path_job_pre_render if rendering is disabled, and a relative basename otherwise. Set only during job file creation.

Class JobArguments#

class JobArguments(task_cls, task_params, branches, workers=1, auto_retry=False, dashboard_data=None)[source]#

Bases: object

Wrapper class for job arguments. Currently, it stores a task class task_cls, a list of task_params, a list of covered branches, an auto_retry flag, and custom dashboard_data. It also handles argument encoding as reqired by the job wrapper script at law/job/job.sh.

task_cls#

type: law.Register

The task class.

task_params#

type: list

The list of task parameters.

branches#

type: list

The list of branch numbers covered by the task.

workers#

type: int

The number of workers to use in “law run” commands.

auto_retry#

type: bool

A flag denoting if the job-internal automatic retry mechanism should be used.

dashboard_data#

type: list

If a job dashboard is used, this is a list of configuration values as returned by law.job.dashboard.BaseJobDashboard.remote_hook_data().

classmethod encode_bool(b)[source]#

Encodes a boolean b into a string ("yes" or "no").

classmethod encode_string(s)[source]#

Encodes a string s via base64 encoding.

classmethod encode_list(l)[source]#

Encodes a list l into a string via base64 encoding.

get_args()[source]#

Returns the list of encoded job arguments. The order of this list corresponds to the arguments expected by the job wrapper script.

join()[source]#

Returns the list of job arguments from get_args(), joined into a single string using a single space character.