law.workflow.remote

Contents

law.workflow.remote#

Base definition of remote workflows based on job submission and status polling.

Class BaseRemoteWorkflow#

class BaseRemoteWorkflow(*args, **kwargs)[source]#

Bases: BaseWorkflow

Opinionated base class for remote workflows that works in 2 phases:

1. Create and submit m jobs that process n tasks. Submission information (mostly job ids) is stored in the so-called jobs file, which is an output target of this workflow.

2. Use the job data and start status polling. When done, status data is stored alongside the submission information in the same jobs file.

classattribute check_unreachable_acceptance#

type: bool

When True, stop the job status polling early if the minimum number of finsihed jobs as defined by acceptance becomes unreachable. Otherwise, keep polling until all jobs are either finished or failed. Defaults to False.

classattribute align_polling_status_line#

type: int, bool

Alignment value that is passed to law.job.base.BaseJobManager.status_line() to print the status line during job status polling. Defaults to False.

classattribute append_retry_jobs#

type: bool

When True, jobs to retry are added to the end of the jobs to submit, giving priority to new ones. However, when shuffle_jobs is True, they might be submitted again earlier. Defaults to False.

classattribute retries#

type: luigi.IntParameter

Maximum number of automatic resubmission attempts per job before considering it failed. Defaults to 5.

classattribute tasks_per_job#

type: luigi.IntParameter

Number of tasks to be processed by per job. Defaults to 1.

classattribute parallel_jobs#

type: luigi.IntParameter

Maximum number of parallel running jobs, e.g. to protect a very busy queue of a batch system. Empty default value (infinity).

classattribute no_poll#

type: luigi.BoolParameter

When True, only submit jobs and skip status polling. Defaults to False.

classattribute submission_threads#

type: luigi.IntParameter

Number of threads to use for both job submission and job status polling. Defaults to 4.

classattribute walltime#

type: law.DurationParameter

Maximum job walltime after which a job will be considered failed. Empty default value. The default unit is hours when a plain number is passed.

classattribute job_workers#

type: luigi.IntParameter

Number of cores to use within jobs to process multiple tasks in parallel (via adding ‘–workers’ to remote job command). Defaults to 1.

classattribute poll_interval#

type: law.DurationParameter

Interval between two job status polls. Defaults to 1 minute. The default unit is minutes when a plain number is passed.

classattribute poll_fails#

type: luigi.IntParameter

Maximum number of consecutive errors during status polling after which a job is considered failed. This can occur due to networking problems. Defaults to 5.

classattribute shuffle_jobs#

type: luigi.BoolParameter

When True, the order of jobs is shuffled before submission. Defaults to False.

classattribute cancel_jobs#

type: luigi.BoolParameter

When True, already running jobs are cancelled and no new ones are submitted. The job ids are read from the jobs file if existing. Defaults to False.

classattribute cleanup_jobs#

type: luigi.BoolParameter

When True, already running jobs are cleaned up and no new ones are submitted. The job ids are read from the jobs file if existing. Defaults to False.

classattribute transfer_logs#

type: luigi.BoolParameter

Transfer the combined log file back to the output directory. Defaults to False.

workflow_run_context()[source]#

Hook to provide a context manager in which the workflow run implementation is placed. This can be helpful in situations where resurces should be acquired before and released after running a workflow.

is_controlling_remote_jobs()[source]#

Returns True if the remote workflow is only controlling remote jobs instead of handling new ones. This is the case when either cancel_jobs or cleanup_jobs is True.

control_output_postfix()[source]#

Hook that should return a string that is inserted into the names of control output files.

poll_callback(poll_data)[source]#

Configurable callback that is called after each job status query and before potential resubmission. It receives the variable polling attributes poll_data (PollData) that can be changed within this method.

If False is returned, the polling loop is gracefully terminated. Returning any other value does not have any effect.

post_submit_delay()[source]#

Configurable delay in seconds to wait after submitting jobs and before starting the status polling.

create_job_dashboard()[source]#

Hook method to return a configured law.job.BaseJobDashboard instance that will be used by the worflow.

forward_dashboard_event(dashboard, job_data, event, job_num)[source]#

Hook to preprocess and publish dashboard events. By default, every event is passed to the dashboard’s law.job.dashboard.BaseJobDashboard.publish() method unchanged.

modify_polling_status_line(status_line)[source]#

Hook to modify the status line that is printed during polling.

property accepts_messages#

For configuring which scheduler messages can be received. When falsy, this tasks does not accept any message. When True, all messages are accepted.

handle_scheduler_message(msg)[source]#

Hook that is called when a scheduler message msg is received. Returns True when the messages was handled, and False otherwise.

Handled messages in addition to those defined in law.workflow.base.BaseWorkflow.handle_scheduler_message():

  • parallel_jobs = <int>

  • walltime = <str/int/float>

  • poll_fails = <int>

  • poll_interval = <str/int/float>

  • retries = <int>

Class BaseRemoteWorkflowProxy#

class BaseRemoteWorkflowProxy(*args, **kwargs)[source]#

Bases: BaseWorkflowProxy

Workflow proxy base class for remote workflows.

classattribute job_error_messages#

type: dict

A dictionary containing short error messages mapped to job exit codes as defined in the remote job execution script.

show_errors#

type: int

Numbers of errors to explicity show during job submission and status polling.

summarize_status_errors#

type: bool, int

During status polling, when the number of errors exceeds show_errors, a summary of errors if shown when this flag is true. When a number is given, the summary is printed if the number of errors exceeds this value.

job_manager#

type: :py:class:`law.job.base.BaseJobManager’

Reference to the job manager object that handles the actual job submission, status queries, etc. The instance is created and configured by create_job_manager().

job_file_factory#

type: law.job.base.BaseJobFileFactory

Reference to a job file factory. The instance is created and configured by create_job_file_factory().

job_data#

type: JobData

The job data object holding job submission and status information.

dashboard#

type: law.job.dashboard.BaseJobDashboard

Reference to the dashboard instance that is used by the workflow.

job_data_cls#

type: type (read-only)

Class for instantiating job_data.

abstract create_job_manager(**kwargs)[source]#

Hook to instantiate and return a class derived of law.job.base.BaseJobManager. This method must be implemented by inheriting classes and should update and forward all kwargs to the constructor of the respective job manager.

setup_job_manager()[source]#

Hook invoked externally to further setup the job mananger or perform batch system related preparations, e.g. before jobs can be submitted. The returned keyword arguments will be forwarded to the submit, cancel, cleanup and query methods of the job mananger.

abstract create_job_file_factory(**kwargs)[source]#

Hook to instantiate and return a class derived of law.job.base.BaseJobFileFactory. This method must be implemented by inheriting classes and should update and forward all kwargs to the constructor of the respective job file factory.

abstract create_job_file(*args, **kwargs)[source]#

Creates a job file using the job_file_factory. The expected arguments depend on wether the job manager supports job grouping (BaseJobManager.job_grouping). If it does, two arguments containing the job number (job_num) and the list of branch numbers (branches) covered by the job. If job grouping is supported, a single dictionary mapping job numbers to covered branch values must be passed. In any case, the path(s) of job files are returned.

This method must be implemented by inheriting classes.

destination_info()[source]#

Hook that can return a string containing information on the location that jobs are submitted to. The string is appended to submission and status messages.

get_extra_submission_data(job_file, config, log=None)[source]#

Hook that is called after job submission with the job_file, the submission config and an optional log file to return extra data that is saved in the central job data.

complete()[source]#

Custom completion check that invokes the task’s workflow_complete method and if it returns anything else than NotImplemented returns the value, or just does the default completion check otherwise.

requires()[source]#

Returns the default workflow requirements in an ordered dictionary, which is updated with the return value of the task’s workflow_requires method.

output()[source]#

Returns the default workflow outputs in an ordered dictionary. At the moment, this is the collection of outputs of the branch tasks (key "collection"), the submission file (key "submission"), and the status file (key "status"). These two control outputs are optional, i.e., they are not considered when checking the task’s completeness.

dump_job_data()[source]#

Dumps the current submission data to the submission file.

run()[source]#

Default run implementation that resets the branch map once if requested.

cancel()[source]#

Cancels running jobs. The job ids are read from the submission file which has to exist for obvious reasons.

cleanup()[source]#

Cleans up jobs on the remote run location. The job ids are read from the submission file which has to exist for obvious reasons.

submit(retry_jobs=None)[source]#

Submits all jobs. When retry_jobs is None, a new job list is built. Otherwise, previously failed jobs defined in the retry_jobs dictionary, which maps job numbers to lists of branch numbers, are used.

poll()[source]#

Initiates the job status polling loop.

Class JobData#

class JobData(**kwargs)[source]#

Bases: ShorthandDict

Sublcass of law.util.ShorthandDict that adds shorthands for the attributes jobs, unsubmitted_jobs, tasks_per_job, and dashboard_config. This container object is used to store and keep track of per job information in BaseRemoteWorkflow.

classattribute dummy_job_id#

type: string

A unique, dummy job id ("dummy_job_id").

classmethod job_data(job_id='dummy_job_id', branches=None, status=None, code=None, error=None, extra=None, **kwargs)[source]#

Returns a dictionary containing default job submission information such as the job_id, task branches covered by the job, a job status string, a job return code, an error message, and extra data. Additional kwargs are accepted but _not_ stored.