law.workflow.remote#
Base definition of remote workflows based on job submission and status polling.
Class BaseRemoteWorkflow#
- class BaseRemoteWorkflow(*args, **kwargs)[source]#
Bases:
BaseWorkflowOpinionated base class for remote workflows that works in 2 phases:
1. Create and submit m jobs that process n tasks. Submission information (mostly job ids) is stored in the so-called jobs file, which is an output target of this workflow.
2. Use the job data and start status polling. When done, status data is stored alongside the submission information in the same jobs file.
- classattribute check_unreachable_acceptance#
type: bool
When True, stop the job status polling early if the minimum number of finsihed jobs as defined by
acceptancebecomes unreachable. Otherwise, keep polling until all jobs are either finished or failed. Defaults to False.
- classattribute align_polling_status_line#
type: int, bool
Alignment value that is passed to
law.job.base.BaseJobManager.status_line()to print the status line during job status polling. Defaults to False.
- classattribute append_retry_jobs#
type: bool
When True, jobs to retry are added to the end of the jobs to submit, giving priority to new ones. However, when shuffle_jobs is True, they might be submitted again earlier. Defaults to False.
- classattribute include_member_resources#
type: bool
When True, the task resources defined in
process_resources()will contain the the ones defined in theresourcesinstance or class attribute. Defaults to False.
- classattribute retries#
type:
luigi.IntParameterMaximum number of automatic resubmission attempts per job before considering it failed. Defaults to 5.
- classattribute tasks_per_job#
type:
luigi.IntParameterNumber of tasks to be processed by per job. Defaults to 1.
- classattribute parallel_jobs#
type:
luigi.IntParameterMaximum number of parallel running jobs, e.g. to protect a very busy queue of a batch system. Empty default value (infinity).
- classattribute no_poll#
type:
luigi.BoolParameterWhen True, only submit jobs and skip status polling. Defaults to False.
- classattribute submission_threads#
type:
luigi.IntParameterNumber of threads to use for both job submission and job status polling. Defaults to 4.
- classattribute walltime#
type:
law.DurationParameterMaximum job walltime after which a job will be considered failed. Empty default value. The default unit is hours when a plain number is passed.
- classattribute job_workers#
type:
luigi.IntParameterNumber of cores to use within jobs to process multiple tasks in parallel (via adding ‘–workers’ to remote job command). Defaults to 1.
- classattribute poll_interval#
type:
law.DurationParameterInterval between two job status polls. Defaults to 1 minute. The default unit is minutes when a plain number is passed.
- classattribute poll_fails#
type:
luigi.IntParameterMaximum number of consecutive errors during status polling after which a job is considered failed. This can occur due to networking problems. Defaults to 5.
- classattribute shuffle_jobs#
type:
luigi.BoolParameterWhen True, the order of jobs is shuffled before submission. Defaults to False.
- classattribute cancel_jobs#
type:
luigi.BoolParameterWhen True, already running jobs are cancelled and no new ones are submitted. The job ids are read from the jobs file if existing. Defaults to False.
- classattribute cleanup_jobs#
type:
luigi.BoolParameterWhen True, already running jobs are cleaned up and no new ones are submitted. The job ids are read from the jobs file if existing. Defaults to False.
- classattribute transfer_logs#
type:
luigi.BoolParameterTransfer the combined log file back to the output directory. Defaults to False.
- process_resources() dict[str, int][source]#
Method used by luigi to define the resources required when running this task to include into scheduling rules when using the central scheduler.
- is_controlling_remote_jobs() bool[source]#
Returns True if the remote workflow is only controlling remote jobs instead of handling new ones. This is the case when either cancel_jobs or cleanup_jobs is True.
- control_output_postfix() str[source]#
Hook that should return a string that is inserted into the names of control output files.
- create_job_dashboard() BaseJobDashboard | None[source]#
Hook method to return a configured
law.job.BaseJobDashboardinstance that will be used by the worflow.
- forward_dashboard_event(dashboard: BaseJobDashboard | None, job_data: dict, event: str, job_num: int) None[source]#
Hook to preprocess and publish dashboard events. By default, every event is passed to the dashboard’s
law.job.dashboard.BaseJobDashboard.publish()method unchanged.
- modify_polling_status_line(status_line: str) str[source]#
Hook to modify the status line that is printed during polling.
- property accepts_messages: bool#
For configuring which scheduler messages can be received. When falsy, this tasks does not accept any message. When True, all messages are accepted.
- handle_scheduler_message(msg)[source]#
Hook that is called when a scheduler message msg is received. Returns True when the messages was handled, and False otherwise.
Handled messages in addition to those defined in
law.workflow.base.BaseWorkflow.handle_scheduler_message():parallel_jobs = <int>walltime = <str/int/float>poll_fails = <int>poll_interval = <str/int/float>retries = <int>
Class BaseRemoteWorkflowProxy#
- class BaseRemoteWorkflowProxy(*args, **kwargs)[source]#
Bases:
BaseWorkflowProxyWorkflow proxy base class for remote workflows.
- classattribute job_error_messages#
type: dict
A dictionary containing short error messages mapped to job exit codes as defined in the remote job execution script.
- show_errors#
type: int
Numbers of errors to explicity show during job submission and status polling.
- summarize_status_errors#
type: bool, int
During status polling, when the number of errors exceeds
show_errors, a summary of errors if shown when this flag is true. When a number is given, the summary is printed if the number of errors exceeds this value.
- job_manager#
type: :py:class:`law.job.base.BaseJobManager’
Reference to the job manager object that handles the actual job submission, status queries, etc. The instance is created and configured by
create_job_manager().
- job_file_factory#
type:
law.job.base.BaseJobFileFactoryReference to a job file factory. The instance is created and configured by
create_job_file_factory().
- dashboard#
type:
law.job.dashboard.BaseJobDashboardReference to the dashboard instance that is used by the workflow.
- abstractmethod create_job_manager(**kwargs) BaseJobManager[source]#
Hook to instantiate and return a class derived of
law.job.base.BaseJobManager. This method must be implemented by inheriting classes and should update and forward all kwargs to the constructor of the respective job manager.
- setup_job_manager() dict[str, Any][source]#
Hook invoked externally to further setup the job mananger or perform batch system related preparations, e.g. before jobs can be submitted. The returned keyword arguments will be forwarded to the submit, cancel, cleanup and query methods of the job mananger.
- abstractmethod create_job_file_factory(**kwargs) BaseJobFileFactory[source]#
Hook to instantiate and return a class derived of
law.job.base.BaseJobFileFactory. This method must be implemented by inheriting classes and should update and forward all kwargs to the constructor of the respective job file factory.
- create_job_file(job_num: int, branches: list[int]) dict[str, str | Path | Config | None][source]#
Creates a job file using the
job_file_factory. The expected arguments depend on whether the job manager supports job grouping during submission (BaseJobManager.job_grouping_submit). If it does, two arguments containing the job number (job_num) and the list of branch numbers (branches) covered by the job. If job grouping is supported, a single dictionary mapping job numbers to covered branch values must be passed. In any case, the path(s) of job files are returned.This method must be implemented by inheriting classes.
- create_job_file_group(submit_jobs: dict[int, list[int]]) dict[str, str | Path | Config | None][source]#
Creates a job file using the
job_file_factorybased on a group of submit_jobs. This method should be implemented in case the corresponding job manager supports job grouping (BaseJobManager.job_grouping). The path(s) of job files are returned.This method must be implemented by inheriting classes.
- destination_info() InsertableDict[source]#
Hook that can return a string containing information on the location that jobs are submitted to. The string is appended to submission and status messages.
- get_extra_submission_data(job_file: str | Path, job_id: int, config: Config, log: str | Path | None = None) dict[str, Any][source]#
Hook that is called after job submission with the job_file, the returned job_id, the submission config and an optional log file to return extra data that is saved in the central job data.
- process_resources(force: bool = False) dict[str, int][source]#
Override in “template” tasks which provide common resource functionality but allow subclasses to specify additional resources while preserving the name for consistent end-user experience.
- complete() bool[source]#
Custom completion check that invokes the task’s workflow_complete method and if it returns anything else than NotImplemented returns the value, or just does the default completion check otherwise.
- requires() Any[source]#
Returns the default workflow requirements in an ordered dictionary, which is updated with the return value of the task’s workflow_requires method.
- output() Any[source]#
Returns the default workflow outputs in an ordered dictionary. At the moment, this is the collection of outputs of the branch tasks (key
"collection"), the submission file (key"submission"), and the status file (key"status"). These two control outputs are optional, i.e., they are not considered when checking the task’s completeness.
- cancel() None[source]#
Cancels running jobs. The job ids are read from the submission file which has to exist for obvious reasons.
- cleanup() None[source]#
Cleans up jobs on the remote run location. The job ids are read from the submission file which has to exist for obvious reasons.
Class JobData#
- class JobData(**kwargs)[source]#
Bases:
ShorthandDictSublcass of
law.util.ShorthandDictthat adds shorthands for the attributes jobs, unsubmitted_jobs, tasks_per_job, and dashboard_config. This container object is used to store and keep track of per job information inBaseRemoteWorkflow.- classattribute dummy_job_id#
type: string
A unique, dummy job id (
"dummy_job_id").
- classmethod job_data(job_id='dummy_job_id', branches: list[int] | None = None, status: str | None = None, code: int | None = None, error: str | None = None, extra: Any | None = None, **kwargs) dict[source]#
Returns a dictionary containing default job submission information such as the job_id, task branches covered by the job, a job status string, a job return code, an error message, and extra data. Additional kwargs are accepted but _not_ stored.