slurm#

Slurm contrib functionality.

Class SlurmWorkflow#

class SlurmWorkflow(*args, **kwargs)[source]#

Bases: BaseRemoteWorkflow

workflow_proxy_cls#

alias of SlurmWorkflowProxy

abstractmethod slurm_output_directory() str | Path | FileSystemDirectoryTarget[source]#

Hook to define the location of submission output files, such as the json files containing job data, and optional log files. This method should return a FileSystemDirectoryTarget.

slurm_log_directory() str | Path | FileSystemDirectoryTarget | None[source]#

Hook to define the location of log files if any are written. When set, it has precedence over slurm_output_directory() for log files. This method should return a FileSystemDirectoryTarget or a value that evaluates to False in case no custom log directory is desired.

slurm_workflow_run_context() Generator[None, None, None][source]#

Hook to provide a context manager in which the workflow run implementation is placed. This can be helpful in situations where resurces should be acquired before and released after running a workflow.

slurm_job_resources(job_num: int, branches: list[int]) dict[str, int][source]#

Hook to define resources for a specific job with number job_num, processing branches. This method should return a dictionary.

slurm_dump_intermediate_job_data() bool[source]#

Whether to dump intermediate job data to the job submission file while jobs are being submitted.

slurm_post_submit_delay() int | float[source]#

Configurable delay in seconds to wait after submitting jobs and before starting the status polling.

slurm_poll_callback(poll_data: PollData) None[source]#

Configurable callback that is called after each job status query and before potential resubmission. It receives the variable polling attributes poll_data (PollData) that can be changed within this method. If False is returned, the polling loop is gracefully terminated. Returning any other value does not have any effect.

slurm_post_poll_callback(success: bool, duration: float | int) None[source]#

Configurable callback that is called after the polling loop has ended. It receives a boolean success that indicates whether the job polling was successful, and the duration of the job polling in seconds.

Class SlurmJobManager#

class SlurmJobManager(partition: str | None = None, threads: int = 1)[source]#

Bases: BaseJobManager

cleanup(*args, **kwargs) None[source]#

Abstract atomic or group job cleanup. Can throw exceptions. Should return a dictionary mapping job ids to per-job return values.

cleanup_batch(*args, **kwargs) None[source]#

Cleans up a batch of jobs given by job_ids via a thread pool of size threads which defaults to its instance attribute. When chunk_size, which defaults to chunk_size_cleanup, is not negative, job_ids are split into chunks of that size which are passed to cleanup().

When callback is set, it is invoked after each successful job (or job chunk) cleaning with the index of the corresponding job id (starting at 0) and either None or an exception if any occurred. All other kwargs are passed to cleanup().

Exceptions that occured during job cleaning are stored in a list and returned. An empty list means that no exceptions occured.

submit(job_file: str | Path, partition: str | None = None, retries: int = 0, retry_delay: float | int = 3, silent: bool = False, _processes: list | None = None) int | None[source]#

Abstract atomic or group job submission. Can throw exceptions. Should return a single job id or a list of ids.

cancel(job_id: int | Sequence[int], partition: str | None = None, silent: bool = False, _processes: list | None = None) dict[int, None] | None[source]#

Abstract atomic or group job cancellation. Can throw exceptions. Should return a dictionary mapping job ids to per-job return values.

query(job_id: int | Sequence[int], partition: str | None = None, silent: bool = False, _processes: list | None = None) dict[int, dict[str, Any]] | dict[str, Any] | None[source]#

Abstract atomic or group job status query. Can throw exceptions. Should return a dictionary mapping job ids to per-job return values.

Class SlurmJobFileFactory#

class SlurmJobFileFactory(*, file_name: str = 'slurm_job.sh', command: str | Sequence[str] | None = None, executable: str | None = None, arguments: str | Sequence[str] | None = None, shell: str = 'bash', input_files: dict[str, str | Path | JobInputFile] | None = None, job_name: str | None = None, partition: str | None = None, stdout: str = 'stdout.txt', stderr: str = 'stderr.txt', postfix_output_files: bool = True, custom_content: str | Sequence[str] | None = None, absolute_paths: bool = False, **kwargs)[source]#

Bases: BaseJobFileFactory

create(postfix: str | None = None, **kwargs) tuple[str, Config][source]#

Abstract job file creation method that must be implemented by inheriting classes.