cms#
CMS-related contrib package. https://home.cern/about/experiments/cms
Task CrabWorkflow#
- class CrabWorkflow(*args, **kwargs)[source]#
Bases:
BaseRemoteWorkflow- workflow_proxy_cls#
alias of
CrabWorkflowProxy
- abstractmethod crab_stageout_location() tuple[str, str][source]#
Hook to define both the “Site.storageSite” and “Data.outLFNDirBase” settings in a 2-tuple, i.e., the name of the storage site to use and the base directory for crab’s own output staging. An example would be
("T2_DE_DESY", "/store/user/...").In case this is not used, the choice of the output base has no affect, but is still required for crab’s job submission to work.
- abstractmethod crab_output_directory() FileSystemDirectoryTarget[source]#
Hook to define the location of submission output files, such as the json files containing job data. This method should return a
FileSystemDirectoryTarget.
- crab_request_name(submit_jobs: dict[int, list[int]]) str[source]#
Returns a random name for a request, i.e., the project directory inside the crab job working area.
- crab_work_area() str | LocalDirectoryTarget[source]#
Returns the location of the crab working area, defaulting to the value of
crab_output_directory()in case it refers to a local directory. When None, the value of the “job.crab_work_area” configuration options is used.
- crab_workflow_run_context() Generator[None, None, None][source]#
Hook to provide a context manager in which the workflow run implementation is placed. This can be helpful in situations where resurces should be acquired before and released after running a workflow.
- crab_workflow_requires() DotDict[source]#
Hook to define requirements for the workflow itself and that need to be resolved before any submission can happen.
- crab_job_file() str | Path | LocalFileTarget | JobInputFile[source]#
Hook to return the location of the job file that is executed on job nodes.
- crab_bootstrap_file() str | Path | LocalFileTarget | JobInputFile | None[source]#
Hook to define the location of an optional, so-called bootstrap file that is sent alongside jobs and called prior to the actual job payload. It is meant to run a custom setup routine in order for the payload to run successfully (e.g. software setup, data retrieval).
- crab_stageout_file() str | Path | LocalFileTarget | JobInputFile | None[source]#
Hook to define the location of an optional, so-called stageout file that is sent alongside jobs and called after to the actual job payload. It is meant to run a custom output stageout routine if required so by your workflow or target storage element.
- crab_output_postfix() str[source]#
Hook to define the postfix of outputs, for instance such that workflows with different parameters do not write their intermediate job status information into the same json file.
- crab_job_resources(job_num: int, branches: list[int]) dict[str, int][source]#
Hook to define resources for a specific job with number job_num, processing branches. This method should return a dictionary.
- crab_job_manager_cls() Type[CrabJobManager][source]#
Hook to define a custom job managet class to use.
- crab_create_job_manager(**kwargs) CrabJobManager[source]#
Hook to configure how the underlying job manager is instantiated and configured.
- crab_job_file_factory_cls() Type[CrabJobFileFactory][source]#
Hook to define a custom job file factory class to use.
- crab_create_job_file_factory(**kwargs) CrabJobFileFactory[source]#
Hook to configure how the underlying job file factory is instantiated and configured.
- crab_job_config(config: Config, job_num: list[int], branches: list[list[int]]) Config[source]#
Hook to inject custom settings into the job config, which is an instance of the
Configclass defined inside the job manager.
- crab_dump_intermediate_job_data() bool[source]#
Whether to dump intermediate job data to the job submission file while jobs are being submitted.
- crab_post_submit_delay() float | int[source]#
Configurable delay in seconds to wait after submitting jobs and before starting the status polling.
- crab_check_job_completeness() bool[source]#
Hook to define whether after job report successful completion, the job manager should check the completion status of the branch tasks run by the finished jobs.
- crab_check_job_completeness_delay() float | int[source]#
Grace period before
crab_check_job_completeness()is called to ensure that output files are accessible. Especially useful on distributed file systems with possibly asynchronous behavior.
- crab_poll_callback(poll_data: PollData) None[source]#
Configurable callback that is called after each job status query and before potential resubmission. It receives the variable polling attributes poll_data (
PollData) that can be changed within this method. If False is returned, the polling loop is gracefully terminated. Returning any other value does not have any effect.
- crab_post_poll_callback(success: bool, duration: float | int) None[source]#
Configurable callback that is called after the polling loop has ended. It receives a boolean success that indicates whether the job polling was successful, and the duration of the job polling in seconds.
Task BundleCMSSW#
- class BundleCMSSW(*args, **kwargs)[source]#
Bases:
Task- task_namespace = 'law.cms'#
This value can be overridden to set the namespace that will be used. (See Task.namespaces_famlies_and_ids) If it’s not specified and you try to read this value anyway, it will return garbage. Please use
get_task_namespace()to read the namespace.Note that setting this value with
@propertywill not work, because this is a class level value.
- output() FileSystemFileTarget[source]#
The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Targetor a list ofTargetinstances.- Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output
Class CrabJobManager#
- class CrabJobManager(sandbox_name: str | None = None, proxy_file: str | None = None, myproxy_username: str | None = None, instance: str | None = None, threads: int = 1)[source]#
Bases:
BaseJobManager- class JobId(crab_num, task_name, proj_dir)#
Bases:
tuple- crab_num#
Alias for field number 0
- proj_dir#
Alias for field number 2
- task_name#
Alias for field number 1
- classmethod cast_job_id(job_id: tuple[str]) JobId[source]#
Converts a job_id, for instance after json deserialization, into a
JobIdobject.
- group_job_ids(job_ids: list[JobId]) dict[str, list[JobId]][source]#
Hook that needs to be implemented if the job mananger supports grouping of jobs, i.e., when
job_grouping_submit,job_grouping_query, etc. is True, and potentially used during status queries, job cancellation and removal. If so, it should take a sequence of job_ids and return a dictionary mapping ids of group jobs (used for queries etc) to the corresponding lists of original job ids, with an arbitrary grouping mechanism.
- submit(job_file: str | pathlib.Path, *, job_files: Sequence[str | pathlib.Path] | None = None, proxy_file: str | None = None, myproxy_username: str | None = None, instance: str | None = None, retries: int = 0, retry_delay: int | float = 3, silent: bool = False, _processes: list | None = None) list[JobId] | None[source]#
Abstract atomic or group job submission. Can throw exceptions. Should return a single job id or a list of ids.
- cancel(proj_dir: str | pathlib.Path, *, job_ids: list[JobId] | None = None, proxy_file: str | None = None, myproxy_username: str | None = None, instance: str | None = None, silent: bool = False, _processes: list | None = None) dict[JobId, None][source]#
Abstract atomic or group job cancellation. Can throw exceptions. Should return a dictionary mapping job ids to per-job return values.
- cleanup(proj_dir: str | pathlib.Path, *, job_ids: list[JobId] | None = None, proxy_file: str | None = None, myproxy_username: str | None = None, instance: str | None = None, silent: bool = False, _processes: list | None = None) dict[JobId, None][source]#
Abstract atomic or group job cleanup. Can throw exceptions. Should return a dictionary mapping job ids to per-job return values.
- query(proj_dir: str | pathlib.Path, *, job_ids: list[JobId] | None = None, proxy_file: str | None = None, myproxy_username: str | None = None, instance: str | None = None, skip_transfers: bool | None = None, silent: bool = False, _processes: list | None = None) dict[JobId, dict[str, Any]] | None[source]#
Abstract atomic or group job status query. Can throw exceptions. Should return a dictionary mapping job ids to per-job return values.
Class CrabJobFileFactory#
- class CrabJobFileFactory(*, file_name: str = 'crab_job.py', executable: str | None = None, arguments: Sequence[str] | None = None, work_area: str | None = None, request_name: str | None = None, input_files: dict[str, str | Path | JobInputFile] | None = None, output_files: list[str] | None = None, storage_site: str | None = None, output_lfn_base: str | None = None, vo_group: str | None = None, vo_role: str | None = None, custom_content: str | Sequence[str] | None = None, absolute_paths: bool = False, **kwargs)[source]#
Bases:
BaseJobFileFactory
Class CMSSWSandbox#
- class CMSSWSandbox(*args, **kwargs)[source]#
Bases:
BashSandbox
Class CMSJobDashboard#
- class CMSJobDashboard(task: Task, cms_user: str, voms_user: str, apmon_config: dict[str, Any] | None = None, log_level: str = 'WARNING', max_rate: int = 20, task_type: str = 'analysis', site: str | None = None, executable: str = 'law', application: str | None = None, application_version: str | int | None = None, submission_tool: str = 'law', submission_type: str = 'direct', submission_ui: str | None = None, init_timestamp: str | None = None)[source]#
Bases:
BaseJobDashboardThis CMS job dashboard interface requires
apmonto be installed on your system. See http://monalisa.caltech.edu/monalisa__Documentation__ApMon_User_Guide__apmon_ug_py.html and https://twiki.cern.ch/twiki/bin/view/ArdaGrid/CMSJobMonitoringCollector.- classmethod map_status(job_status: str, event: str) str | None[source]#
Maps the job_status (see
law.job.base.BaseJobManager) for a particular event to the status name that is accepted by the implemented job dashobard. Possible events are:action.submit
action.cancel
status.pending
status.running
status.finished
status.retry
status.failed
- remote_hook_file() str[source]#
This method can return the path to a file that is considered as an input file to remote jobs. This file can contain bash functions, environment variables, etc., that are necessary to communicate with the implemented job dashboard. When None is returned, no file is sent.
- remote_hook_data(job_num: int, attempt: int) dict[str, Any][source]#
This method can return a dictionary that is sent with remote jobs in the format
key1=value1 key2=value2 .... The returned dictionary should (but does not have to) include the job number job_num and the retry attempt.
Class Site#
- class Site(name: str | None = None)[source]#
Bases:
objectHelper class that provides site-related data, mostly via simple properties. When name is None, the name of the site is used that the instance of this class is instantiated on. Example:
site = Site() # executed on T2_DE_RWTH print(site.name) # "T2_DE_RWTH" print(site.country) # "DE" print(site.redirector) # "xrootd-cms.infn.it" site = Site("T1_US_FNAL") print(site.name) # "T1_US_FNAL" print(site.country) # "US" print(site.redirector) # "cmsxrootd.fnal.gov"
- classattribute redirectors#
type: dict
A mapping of country codes to redirectors.
- name#
type: string
The name of the site, e.g.
T2_DE_RWTH. This is either the name provided in the constructor or it is determined for the current site by reading environment variables.
- classmethod get_name_from_env() str | None[source]#
Tries to extract the site name from environment variables. Returns the name on succcess and None otherwise.
- property info: tuple[str, str, str] | tuple[None, None, None]#
Tier, country and locality information in a 3-tuple, e.g.
("T2", "DE", "RWTH").
- property locality#
The locality of the site, e.g.
RWTH.
Functions#
- lfn_to_pfn(lfn: str, redirector: str = 'global') str[source]#
Converts a logical file name lfn to a physical file name pfn using a redirector. Valid values for redirector are defined by
Site.redirectors.