# coding: utf-8
"""
Definition of the job dashboard interface.
"""
__all__ = ["BaseJobDashboard", "NoJobDashboard", "cache_by_status"]
import time
import functools
from contextlib import contextmanager
from abc import ABCMeta, abstractmethod
import six
from law.util import perf_counter
def cache_by_status(func):
"""
Decorator for :py:meth:`BaseJobDashboard.publish` (and inheriting classes) that caches the last
published status to decide if the a new publication is necessary or not. When the status did not
change since the last call, the actual publish method is not invoked and *None* is returned.
"""
@functools.wraps(func)
def wrapper(self, job_data, event, job_num, *args, **kwargs):
job_id = job_data["job_id"]
dashboard_status = self.map_status(job_data.get("status"), event)
# nothing to do when the status is invalid or did not change
if not dashboard_status or self._last_states.get(job_id) == dashboard_status:
return None
# set the new status
self._last_states[job_id] = dashboard_status
return func(self, job_data, event, job_num, *args, **kwargs)
return wrapper
[docs]class BaseJobDashboard(six.with_metaclass(ABCMeta, object)):
"""
Base class of a minimal job dashboard interface that is used from within
:py:class:`law.workflow.remote.BaseRemoteWorkflow`'s.
.. py:classattribute:: persistent_attributes
type: list
List of instance attributes that should be marked as being persistent. This is (e.g.) used
in the :py:class:`law.workflow.remote.BaseRemoteWorkflow` when saving job and submission
information to submission files. Common use cases are user information.
.. py:attribute:: max_rate
type: int
Maximum number of events that can be published per second. :py:meth:`rate_guard` uses this
value to delay function calls.
"""
cache_by_status = None
persistent_attributes = []
def __init__(self, max_rate=0):
super(BaseJobDashboard, self).__init__()
# maximum number of events per second
self.max_rate = max_rate
# timestamp of last event, used to ensure that max_rate is not exceeded
self._last_event_time = 0.0
# last dashboard status per job_id, used to prevent subsequent requests for jobs
# without any status change
self._last_states = {}
[docs] def get_persistent_config(self):
"""
Returns the values of all :py:attr:`persistent_attributes` of this instance in a dictionary.
"""
return {attr: getattr(self, attr) for attr in self.persistent_attributes}
[docs] def apply_config(self, config):
"""
Sets all attributes in a dictionary *config* to this instance. This can be understand as the
counterpart of :py:meth:`get_persistent_config`.
"""
for attr, value in six.iteritems(config):
if hasattr(self, attr):
setattr(self, attr, value)
[docs] @contextmanager
def rate_guard(self):
"""
Context guard that ensures that decorated contexts are delayed in order to limit the number
of status publications per second, defined by :py:attr:`max_rate`. Example:
.. code-block:: python
# print some numbers, which will take 10 / max_rate seconds
for i in range(10):
with self.rate_guard():
print(i)
"""
now = 0.0
if self.max_rate > 0:
now = perf_counter()
diff = self._last_event_time + 1.0 / self.max_rate - now
if diff > 0:
time.sleep(diff)
try:
yield
finally:
self._last_event_time = now
[docs] def remote_hook_file(self):
"""
This method can return the path to a file that is considered as an input file to remote
jobs. This file can contain bash functions, environment variables, etc., that are necessary
to communicate with the implemented job dashboard. When *None* is returned, no file is sent.
"""
return None
[docs] def remote_hook_data(self, job_num, attempt):
"""
This method can return a dictionary that is sent with remote jobs in the format
``key1=value1 key2=value2 ...``. The returned dictionary should (but does not have to)
include the job number *job_num* and the retry *attempt*.
"""
return None
[docs] def create_tracking_url(self):
"""
This method can return a tracking url that refers to a web page that visualizes jobs. When
set, the url is shown in the central luigi scheduler.
"""
return None
[docs] @abstractmethod
def map_status(self, job_status, event):
"""
Maps the *job_status* (see :py:class:`law.job.base.BaseJobManager`) for a particular *event*
to the status name that is accepted by the implemented job dashobard. Possible events are:
- action.submit
- action.cancel
- status.pending
- status.running
- status.finished
- status.retry
- status.failed
"""
return
[docs] @abstractmethod
def publish(self, job_data, event, job_num, *args, **kwargs):
"""
Publishes the status of a job to the implemented job dashboard. *job_data* is a dictionary
that contains a *job_id* and a *status* string (see
:py:meth:`law.workflow.remote.StatusData.job_data`).
"""
return
BaseJobDashboard.cache_by_status = staticmethod(cache_by_status)
[docs]class NoJobDashboard(BaseJobDashboard):
"""
Null job dashboard implementation. Instances of this class actually does not publish any job
status. It can rather be used as a placeholder in situations where a job dashboard is required,
such as in :py:class:`law.workflow.remote.BaseRemoteWorkflow`.
"""
def map_status(self, *args, **kwargs):
""""""
return
def publish(self, *args, **kwargs):
""""""
return