Source code for law.contrib.glite.workflow

# coding: utf-8

"""
gLite remote workflow implementation. See
https://wiki.italiangrid.it/twiki/bin/view/CREAM/UserGuide.
"""

from __future__ import annotations

__all__ = ["GLiteWorkflow"]

import os
import sys
import abc
import contextlib
import pathlib

import law
from law.config import Config
from law.workflow.remote import BaseRemoteWorkflow, BaseRemoteWorkflowProxy, PollData
from law.job.base import JobArguments, JobInputFile
from law.task.proxy import ProxyCommand
from law.target.file import get_path
from law.target.local import LocalFileTarget
from law.parameter import CSVParameter
from law.util import no_value, law_src_path, merge_dicts, DotDict, InsertableDict
from law.logger import get_logger
from law._types import Type, Any, Generator

from law.contrib.wlcg import WLCGDirectoryTarget, delegate_vomsproxy_glite
from law.contrib.glite.job import GLiteJobManager, GLiteJobFileFactory


logger = get_logger(__name__)


class GLiteWorkflowProxy(BaseRemoteWorkflowProxy):

    workflow_type: str = "glite"

    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)

        # check if there is at least one ce
        if not self.task.glite_ce:  # type: ignore[attr-defined]
            raise Exception("please set at least one glite computing element (--glite-ce)")

        self.delegation_ids = None

    def create_job_manager(self, **kwargs) -> GLiteJobManager:
        return self.task.glite_create_job_manager(**kwargs)  # type: ignore[attr-defined]

    def setup_job_mananger(self) -> dict[str, Any]:
        kwargs = {}

        # delegate the voms proxy to all endpoints
        task: GLiteWorkflow = self.task  # type: ignore[assignment]
        if callable(task.glite_delegate_proxy):
            delegation_ids = []
            for ce in task.glite_ce:  # type: ignore[attr-defined]
                endpoint = law.wlcg.get_ce_endpoint(ce)  # type: ignore[attr-defined]
                delegation_ids.append(task.glite_delegate_proxy(endpoint))
            kwargs["delegation_id"] = delegation_ids

        return kwargs

    def create_job_file_factory(self, **kwargs) -> GLiteJobFileFactory:
        return self.task.glite_create_job_file_factory(**kwargs)  # type: ignore[attr-defined]

    def create_job_file(
        self,
        job_num: int,
        branches: list[int],
    ) -> dict[str, str | pathlib.Path | GLiteJobFileFactory.Config | None]:
        task: GLiteWorkflow = self.task  # type: ignore[assignment]

        # the file postfix is pythonic range made from branches, e.g. [0, 1, 2, 4] -> "_0To5"
        postfix = f"_{branches[0]}To{branches[-1] + 1}"

        # create the config
        c = self.job_file_factory.get_config()  # type: ignore[union-attr]
        c.input_files = {}
        c.output_files = []
        c.render_variables = {}
        c.custom_content = []

        # get the actual wrapper file that will be executed by the remote job
        wrapper_file = task.glite_wrapper_file()
        law_job_file = task.glite_job_file()
        if wrapper_file and get_path(wrapper_file) != get_path(law_job_file):
            c.input_files["executable_file"] = wrapper_file
            c.executable = wrapper_file
        else:
            c.executable = law_job_file
        c.input_files["job_file"] = law_job_file

        # collect task parameters
        exclude_args = (
            task.exclude_params_branch |
            task.exclude_params_workflow |
            task.exclude_params_remote_workflow |
            task.exclude_params_glite_workflow |
            {"workflow", "effective_workflow"}
        )
        proxy_cmd = ProxyCommand(
            task.as_branch(branches[0]),
            exclude_task_args=list(exclude_args),
            exclude_global_args=["workers", "local-scheduler", f"{task.task_family}-*"],
        )
        if task.glite_use_local_scheduler():
            proxy_cmd.add_arg("--local-scheduler", "True", overwrite=True)
        for key, value in dict(task.glite_cmdline_args()).items():
            proxy_cmd.add_arg(key, value, overwrite=True)

        # job script arguments
        dashboard_data = None
        if self.dashboard is not None:
            dashboard_data = self.dashboard.remote_hook_data(
                job_num,
                self.job_data.attempts.get(job_num, 0),
            )
        job_args = JobArguments(
            task_cls=task.__class__,
            task_params=proxy_cmd.build(skip_run=True),
            branches=branches,
            workers=task.job_workers,  # type: ignore[arg-type]
            auto_retry=False,
            dashboard_data=dashboard_data,
        )
        c.arguments = job_args.join()

        # add the bootstrap file
        bootstrap_file = task.glite_bootstrap_file()
        if bootstrap_file:
            c.input_files["bootstrap_file"] = bootstrap_file

        # add the stageout file
        stageout_file = task.glite_stageout_file()
        if stageout_file:
            c.input_files["stageout_file"] = stageout_file

        # does the dashboard have a hook file?
        if self.dashboard is not None:
            dashboard_file = self.dashboard.remote_hook_file()
            if dashboard_file:
                c.input_files["dashboard_file"] = dashboard_file

        # initialize logs with empty values and defer to defaults later
        c.stdout = no_value
        c.stderr = no_value
        if task.transfer_logs:
            log_file = "stdall.txt"
            c.stdout = log_file
            c.stderr = log_file
            c.custom_log_file = log_file

        # meta infos
        c.output_uri = task.glite_output_uri()

        # task hook
        c = task.glite_job_config(c, job_num, branches)

        # build the job file and get the sanitized config
        job_file, c = self.job_file_factory(postfix=postfix, **c.__dict__)  # type: ignore[misc]

        # logging defaults
        c.stdout = c.stdout or None
        c.stderr = c.stderr or None
        c.custom_log_file = c.custom_log_file or None

        # determine the custom log file uri if set
        abs_log_file = None
        if c.custom_log_file:
            abs_log_file = os.path.join(str(c.output_uri), c.custom_log_file)

        # return job and log files
        return {"job": job_file, "config": c, "log": abs_log_file}

    def destination_info(self) -> InsertableDict:
        info = super().destination_info()

        task: GLiteWorkflow = self.task  # type: ignore[assignment]
        info["ce"] = f"ce: {','.join(task.glite_ce)}"  # type: ignore[arg-type]

        info = task.glite_destination_info(info)

        return info


[docs] class GLiteWorkflow(BaseRemoteWorkflow): workflow_proxy_cls = GLiteWorkflowProxy glite_workflow_run_decorators: list | None = None glite_job_manager_defaults: dict | None = None glite_job_file_factory_defaults: dict | None = None glite_ce = CSVParameter( default=(), significant=False, description="target glite computing element(s); default: empty", ) glite_job_kwargs: list[str] = [] glite_job_kwargs_submit = ["glite_ce"] glite_job_kwargs_cancel: dict | None = None glite_job_kwargs_cleanup: dict | None = None glite_job_kwargs_query: dict | None = None exclude_params_branch = {"glite_ce"} exclude_params_glite_workflow: set[str] = set() exclude_index = True @abc.abstractmethod def glite_output_directory(self) -> WLCGDirectoryTarget: ... @abc.abstractmethod def glite_bootstrap_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFile: ... def glite_wrapper_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFile | None: return None def glite_job_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFile: return JobInputFile(law_src_path("job", "law_job.sh")) def glite_stageout_file(self) -> str | pathlib.Path | LocalFileTarget | JobInputFile | None: return None
[docs] @contextlib.contextmanager def glite_workflow_run_context(self) -> Generator[None, None, None]: """ Hook to provide a context manager in which the workflow run implementation is placed. This can be helpful in situations where resurces should be acquired before and released after running a workflow. """ yield
def glite_workflow_requires(self) -> DotDict: return DotDict() def glite_output_postfix(self) -> str: return "" def glite_output_uri(self) -> str: return self.glite_output_directory().uri(return_all=False) # type: ignore[return-value]
[docs] def glite_job_resources(self, job_num: int, branches: list[int]) -> dict[str, int]: """ Hook to define resources for a specific job with number *job_num*, processing *branches*. This method should return a dictionary. """ return {}
def glite_delegate_proxy(self, endpoint: str) -> str: return delegate_vomsproxy_glite( # type: ignore[attr-defined] endpoint, stdout=sys.stdout, stderr=sys.stderr, cache=True, ) def glite_job_manager_cls(self) -> Type[GLiteJobManager]: return GLiteJobManager def glite_create_job_manager(self, **kwargs) -> GLiteJobManager: kwargs = merge_dicts(self.glite_job_manager_defaults, kwargs) return self.glite_job_manager_cls()(**kwargs) def glite_job_file_factory_cls(self) -> Type[GLiteJobFileFactory]: return GLiteJobFileFactory def glite_create_job_file_factory(self, **kwargs) -> GLiteJobFileFactory: # get the file factory cls factory_cls = self.glite_job_file_factory_cls() # job file fectory config priority: kwargs > class defaults kwargs = merge_dicts({}, self.glite_job_file_factory_defaults, kwargs) # default mkdtemp value which might require task-level info if kwargs.get("mkdtemp") is None: cfg = Config.instance() mkdtemp = cfg.get_expanded( "job", cfg.find_option("job", "glite_job_file_dir_mkdtemp", "job_file_dir_mkdtemp"), ) if isinstance(mkdtemp, str) and mkdtemp.lower() not in {"true", "false"}: kwargs["mkdtemp"] = factory_cls._expand_template_path( mkdtemp, variables={"task_id": self.live_task_id, "task_family": self.task_family}, ) return factory_cls(**kwargs) def glite_job_config( self, config: GLiteJobFileFactory.Config, job_num: int, branches: list[int], ) -> GLiteJobFileFactory.Config: return config
[docs] def glite_dump_intermediate_job_data(self) -> bool: """ Whether to dump intermediate job data to the job submission file while jobs are being submitted. """ return True
[docs] def glite_post_submit_delay(self) -> int | float: """ Configurable delay in seconds to wait after submitting jobs and before starting the status polling. """ return self.poll_interval * 60 # type: ignore[operator]
def glite_check_job_completeness(self) -> bool: return False def glite_check_job_completeness_delay(self) -> float | int: return 0.0
[docs] def glite_poll_callback(self, poll_data: PollData) -> None: """ Configurable callback that is called after each job status query and before potential resubmission. It receives the variable polling attributes *poll_data* (:py:class:`PollData`) that can be changed within this method. If *False* is returned, the polling loop is gracefully terminated. Returning any other value does not have any effect. """ return
[docs] def glite_post_poll_callback(self, success: bool, duration: float | int) -> None: """ Configurable callback that is called after the polling loop has ended. It receives a boolean *success* that indicates whether the job polling was successful, and the duration of the job polling in seconds. """ return
def glite_use_local_scheduler(self) -> bool: return True def glite_cmdline_args(self) -> dict[str, str]: return {} def glite_destination_info(self, info: InsertableDict) -> InsertableDict: return info