law.workflow.remote#
Base definition of remote workflows based on job submission and status polling.
Class BaseRemoteWorkflow
#
- class BaseRemoteWorkflow(*args, **kwargs)[source]#
Bases:
BaseWorkflow
Opinionated base class for remote workflows that works in 2 phases:
1. Create and submit m jobs that process n tasks. Submission information (mostly job ids) is stored in the so-called jobs file, which is an output target of this workflow.
2. Use the job data and start status polling. When done, status data is stored alongside the submission information in the same jobs file.
- classattribute check_unreachable_acceptance#
type: bool
When True, stop the job status polling early if the minimum number of finsihed jobs as defined by
acceptance
becomes unreachable. Otherwise, keep polling until all jobs are either finished or failed. Defaults to False.
- classattribute align_polling_status_line#
type: int, bool
Alignment value that is passed to
law.job.base.BaseJobManager.status_line()
to print the status line during job status polling. Defaults to False.
- classattribute append_retry_jobs#
type: bool
When True, jobs to retry are added to the end of the jobs to submit, giving priority to new ones. However, when shuffle_jobs is True, they might be submitted again earlier. Defaults to False.
- classattribute retries#
type:
luigi.IntParameter
Maximum number of automatic resubmission attempts per job before considering it failed. Defaults to 5.
- classattribute tasks_per_job#
type:
luigi.IntParameter
Number of tasks to be processed by per job. Defaults to 1.
- classattribute parallel_jobs#
type:
luigi.IntParameter
Maximum number of parallel running jobs, e.g. to protect a very busy queue of a batch system. Empty default value (infinity).
- classattribute no_poll#
type:
luigi.BoolParameter
When True, only submit jobs and skip status polling. Defaults to False.
- classattribute submission_threads#
type:
luigi.IntParameter
Number of threads to use for both job submission and job status polling. Defaults to 4.
- classattribute walltime#
type:
law.DurationParameter
Maximum job walltime after which a job will be considered failed. Empty default value. The default unit is hours when a plain number is passed.
- classattribute job_workers#
type:
luigi.IntParameter
Number of cores to use within jobs to process multiple tasks in parallel (via adding ‘–workers’ to remote job command). Defaults to 1.
- classattribute poll_interval#
type:
law.DurationParameter
Interval between two job status polls. Defaults to 1 minute. The default unit is minutes when a plain number is passed.
- classattribute poll_fails#
type:
luigi.IntParameter
Maximum number of consecutive errors during status polling after which a job is considered failed. This can occur due to networking problems. Defaults to 5.
- classattribute shuffle_jobs#
type:
luigi.BoolParameter
When True, the order of jobs is shuffled before submission. Defaults to False.
- classattribute cancel_jobs#
type:
luigi.BoolParameter
When True, already running jobs are cancelled and no new ones are submitted. The job ids are read from the jobs file if existing. Defaults to False.
- classattribute cleanup_jobs#
type:
luigi.BoolParameter
When True, already running jobs are cleaned up and no new ones are submitted. The job ids are read from the jobs file if existing. Defaults to False.
- classattribute transfer_logs#
type:
luigi.BoolParameter
Transfer the combined log file back to the output directory. Defaults to False.
- workflow_run_context()[source]#
Hook to provide a context manager in which the workflow run implementation is placed. This can be helpful in situations where resurces should be acquired before and released after running a workflow.
- is_controlling_remote_jobs()[source]#
Returns True if the remote workflow is only controlling remote jobs instead of handling new ones. This is the case when either cancel_jobs or cleanup_jobs is True.
- control_output_postfix()[source]#
Hook that should return a string that is inserted into the names of control output files.
- poll_callback(poll_data)[source]#
Configurable callback that is called after each job status query and before potential resubmission. It receives the variable polling attributes poll_data (
PollData
) that can be changed within this method.If False is returned, the polling loop is gracefully terminated. Returning any other value does not have any effect.
- post_submit_delay()[source]#
Configurable delay in seconds to wait after submitting jobs and before starting the status polling.
- create_job_dashboard()[source]#
Hook method to return a configured
law.job.BaseJobDashboard
instance that will be used by the worflow.
- forward_dashboard_event(dashboard, job_data, event, job_num)[source]#
Hook to preprocess and publish dashboard events. By default, every event is passed to the dashboard’s
law.job.dashboard.BaseJobDashboard.publish()
method unchanged.
- modify_polling_status_line(status_line)[source]#
Hook to modify the status line that is printed during polling.
- property accepts_messages#
For configuring which scheduler messages can be received. When falsy, this tasks does not accept any message. When True, all messages are accepted.
- handle_scheduler_message(msg)[source]#
Hook that is called when a scheduler message msg is received. Returns True when the messages was handled, and False otherwise.
Handled messages in addition to those defined in
law.workflow.base.BaseWorkflow.handle_scheduler_message()
:parallel_jobs = <int>
walltime = <str/int/float>
poll_fails = <int>
poll_interval = <str/int/float>
retries = <int>
Class BaseRemoteWorkflowProxy
#
- class BaseRemoteWorkflowProxy(*args, **kwargs)[source]#
Bases:
BaseWorkflowProxy
Workflow proxy base class for remote workflows.
- classattribute job_error_messages#
type: dict
A dictionary containing short error messages mapped to job exit codes as defined in the remote job execution script.
- show_errors#
type: int
Numbers of errors to explicity show during job submission and status polling.
- summarize_status_errors#
type: bool, int
During status polling, when the number of errors exceeds
show_errors
, a summary of errors if shown when this flag is true. When a number is given, the summary is printed if the number of errors exceeds this value.
- job_manager#
type: :py:class:`law.job.base.BaseJobManager’
Reference to the job manager object that handles the actual job submission, status queries, etc. The instance is created and configured by
create_job_manager()
.
- job_file_factory#
type:
law.job.base.BaseJobFileFactory
Reference to a job file factory. The instance is created and configured by
create_job_file_factory()
.
- dashboard#
type:
law.job.dashboard.BaseJobDashboard
Reference to the dashboard instance that is used by the workflow.
- abstract create_job_manager(**kwargs)[source]#
Hook to instantiate and return a class derived of
law.job.base.BaseJobManager
. This method must be implemented by inheriting classes and should update and forward all kwargs to the constructor of the respective job manager.
- setup_job_manager()[source]#
Hook invoked externally to further setup the job mananger or perform batch system related preparations, e.g. before jobs can be submitted. The returned keyword arguments will be forwarded to the submit, cancel, cleanup and query methods of the job mananger.
- abstract create_job_file_factory(**kwargs)[source]#
Hook to instantiate and return a class derived of
law.job.base.BaseJobFileFactory
. This method must be implemented by inheriting classes and should update and forward all kwargs to the constructor of the respective job file factory.
- abstract create_job_file(*args, **kwargs)[source]#
Creates a job file using the
job_file_factory
. The expected arguments depend on wether the job manager supports job grouping (BaseJobManager.job_grouping
). If it does, two arguments containing the job number (job_num) and the list of branch numbers (branches) covered by the job. If job grouping is supported, a single dictionary mapping job numbers to covered branch values must be passed. In any case, the path(s) of job files are returned.This method must be implemented by inheriting classes.
- destination_info()[source]#
Hook that can return a string containing information on the location that jobs are submitted to. The string is appended to submission and status messages.
- get_extra_submission_data(job_file, config, log=None)[source]#
Hook that is called after job submission with the job_file, the submission config and an optional log file to return extra data that is saved in the central job data.
- complete()[source]#
Custom completion check that invokes the task’s workflow_complete method and if it returns anything else than NotImplemented returns the value, or just does the default completion check otherwise.
- requires()[source]#
Returns the default workflow requirements in an ordered dictionary, which is updated with the return value of the task’s workflow_requires method.
- output()[source]#
Returns the default workflow outputs in an ordered dictionary. At the moment, this is the collection of outputs of the branch tasks (key
"collection"
), the submission file (key"submission"
), and the status file (key"status"
). These two control outputs are optional, i.e., they are not considered when checking the task’s completeness.
- cancel()[source]#
Cancels running jobs. The job ids are read from the submission file which has to exist for obvious reasons.
- cleanup()[source]#
Cleans up jobs on the remote run location. The job ids are read from the submission file which has to exist for obvious reasons.
Class JobData
#
- class JobData(**kwargs)[source]#
Bases:
ShorthandDict
Sublcass of
law.util.ShorthandDict
that adds shorthands for the attributes jobs, unsubmitted_jobs, tasks_per_job, and dashboard_config. This container object is used to store and keep track of per job information inBaseRemoteWorkflow
.- classattribute dummy_job_id#
type: string
A unique, dummy job id (
"dummy_job_id"
).
- classmethod job_data(job_id='dummy_job_id', branches=None, status=None, code=None, error=None, extra=None, **kwargs)[source]#
Returns a dictionary containing default job submission information such as the job_id, task branches covered by the job, a job status string, a job return code, an error message, and extra data. Additional kwargs are accepted but _not_ stored.