# coding: utf-8
"""
Base classes for implementing remote job management and job file creation.
"""
__all__ = ["BaseJobManager", "BaseJobFileFactory", "JobArguments", "JobInputFile"]
import os
import time
import shutil
import tempfile
import fnmatch
import base64
import copy
import re
import json
from collections import defaultdict
from multiprocessing.pool import ThreadPool
from threading import Lock
from abc import ABCMeta, abstractmethod
import six
from law.config import Config
from law.target.file import get_scheme, get_path
from law.target.remote.base import RemoteTarget
from law.util import (
colored, make_list, make_tuple, iter_chunks, makedirs, create_hash, empty_context,
)
from law.logger import get_logger
logger = get_logger(__name__)
def get_async_result_silent(result, timeout=None):
"""
Calls the ``get([timeout])`` method of an `AsyncResult
<https://docs.python.org/latest/library/multiprocessing.html#multiprocessing.pool.AsyncResult>`__
object *result* and returns its value. The only difference is that potentially raised exceptions
are returned instead of re-raised.
""" # noqa
try:
return result.get(timeout)
except Exception as e:
return e
[docs]class BaseJobManager(six.with_metaclass(ABCMeta, object)):
"""
Base class that defines how remote jobs are submitted, queried, cancelled and cleaned up. It
also defines the most common job states:
- PENDING: The job is submitted and waiting to be processed.
- RUNNUNG: The job is running.
- FINISHED: The job is completed and successfully finished.
- RETRY: The job is completed but failed. It can be resubmitted.
- FAILED: The job is completed but failed. It cannot or should not be recovered.
The particular job manager implementation should match its own, native states to these common
states.
*status_names* and *status_diff_styles* are used in :py:meth:`status_line` and default to
:py:attr:`default_status_names` and :py:attr:`default_status_diff_styles`. *threads* is the
default number of concurrent threads that are used in :py:meth:`submit_batch`,
:py:meth:`cancel_batch`, :py:meth:`cleanup_batch` and :py:meth:`query_batch`.
.. py:classattribute:: PENDING
type: string
Flag that represents the ``PENDING`` status.
.. py:classattribute:: RUNNING
type: string
Flag that represents the ``RUNNING`` status.
.. py:classattribute:: FINISHED
type: string
Flag that represents the ``FINISHED`` status.
.. py:classattribute:: RETRY
type: string
Flag that represents the ``RETRY`` status.
.. py:classattribute:: FAILED
type: string
Flag that represents the ``FAILED`` status.
.. py:classattribute:: default_status_names
type: list
The list of all default status flags that is used in :py:meth:`status_line`.
.. py:classattribute:: default_status_diff_styles
type: dict
A dictionary that defines to coloring styles per job status that is used in
:py:meth:`status_line`.
.. py:classattribute:: job_grouping
type: bool
Whether this manager implementation groups jobs into single interactions for submission and
status queries. In general, this means that the submission of a single job file can result
in multiple jobs on the remote batch system.
.. py:classattribute:: chunk_size_submit
type: int
The default chunk size value when no value is given in :py:meth:`submit_batch`. When the
value evaluates to *False*, no chunking is allowed.
.. py:classattribute:: chunk_size_cancel
type: int
The default chunk size value when no value is given in :py:meth:`cancel_batch`. When the
value evaluates to *False*, no chunking is allowed.
.. py:classattribute:: chunk_size_cleanup
type: int
The default chunk size value when no value is given in :py:meth:`cleanup_batch`. When the
value evaluates to *False*, no chunking is allowed.
.. py:classattribute:: chunk_size_query
type: int
The default chunk size value when no value is given in :py:meth:`query_batch`. When the
value evaluates to *False*, no chunking is allowed.
"""
PENDING = "pending"
RUNNING = "running"
FINISHED = "finished"
RETRY = "retry"
FAILED = "failed"
default_status_names = [PENDING, RUNNING, FINISHED, RETRY, FAILED]
# color styles per status when job count decreases / stagnates / increases
default_status_diff_styles = {
PENDING: ({}, {}, {"color": "green"}),
RUNNING: ({}, {}, {"color": "green"}),
FINISHED: ({}, {}, {"color": "green"}),
RETRY: ({"color": "green"}, {}, {"color": "red"}),
FAILED: ({}, {}, {"color": "red", "style": "bright"}),
}
# job grouping settings
job_grouping = False
# chunking settings for unbatched methods
# disabled by default
chunk_size_submit = 0
chunk_size_cancel = 0
chunk_size_cleanup = 0
chunk_size_query = 0
[docs] @classmethod
def job_status_dict(cls, job_id=None, status=None, code=None, error=None, extra=None):
"""
Returns a dictionay that describes the status of a job given its *job_id*, *status*, return
*code*, *error*, and additional *extra* data.
"""
return dict(job_id=job_id, status=status, code=code, error=error, extra=extra)
[docs] @classmethod
def cast_job_id(cls, job_id):
"""
Hook for casting an input *job_id*, for instance, after loading serialized data from json.
"""
return job_id
def __init__(self, status_names=None, status_diff_styles=None, threads=1):
super(BaseJobManager, self).__init__()
self.status_names = status_names or list(self.default_status_names)
self.status_diff_styles = status_diff_styles or self.default_status_diff_styles.copy()
self.threads = threads
self.last_counts = [0] * len(self.status_names)
[docs] @abstractmethod
def submit(self):
"""
Abstract atomic or group job submission.
Can throw exceptions.
Should return a list of job ids.
"""
return
[docs] @abstractmethod
def cancel(self):
"""
Abstract atomic or group job cancellation.
Can throw exceptions.
Should return a dictionary mapping job ids to per-job return values.
"""
return
[docs] @abstractmethod
def cleanup(self):
"""
Abstract atomic or group job cleanup.
Can throw exceptions.
Should return a dictionary mapping job ids to per-job return values.
"""
return
[docs] @abstractmethod
def query(self):
"""
Abstract atomic or group job status query.
Can throw exceptions.
Should return a dictionary mapping job ids to per-job return values.
"""
return
[docs] def group_job_ids(self, job_ids):
"""
Hook that needs to be implemented if the job mananger supports grouping of jobs, i.e., when
:py:attr:`job_grouping` is *True*, and potentially used during status queries, job
cancellation and removal. If so, it should take a sequence of *job_ids* and return a
dictionary mapping ids of group jobs (used for queries etc) to the corresponding lists of
original job ids, with an arbitrary grouping mechanism.
"""
raise NotImplementedError(
"internal error, {}.group_job_ids not implemented".format(self.__class__.__name__),
)
def _apply_batch(
self,
func,
result_type,
job_objs,
default_chunk_size,
threads=None,
chunk_size=None,
callback=None,
**kwargs # noqa
):
# default arguments
threads = max(threads or self.threads or 1, 1)
# is chunking allowed?
chunk_size = max(chunk_size or default_chunk_size, 0) if default_chunk_size else 0
chunking = chunk_size > 0
# build chunks if needed
job_objs = make_list(job_objs)
job_objs = list(iter_chunks(job_objs, chunk_size)) if chunking else job_objs
# factory to call the passed callback for each job file even when chunking
def cb_factory(i):
if not callable(callback):
return None
if chunking:
def wrapper(result_data):
offset = sum(map(len, job_objs[:i]))
for j in range(len(job_objs[i])):
data = result_data if isinstance(result_data, Exception) else result_data[j]
callback(offset + j, data)
else:
def wrapper(data):
callback(i, data)
return wrapper
# threaded processing
pool = ThreadPool(threads)
results = [
pool.apply_async(func, (arg,), kwargs, callback=cb_factory(i))
for i, arg in enumerate(job_objs)
]
pool.close()
pool.join()
# store result data or an exception
result_data = result_type()
if chunking:
for _job_objs, res in six.moves.zip(job_objs, results):
data = get_async_result_silent(res)
for i, job_obj in enumerate(_job_objs):
if isinstance(result_data, list):
result_data.append(data if isinstance(data, Exception) else data[i])
else:
result_data[job_obj] = data if isinstance(data, Exception) else data[job_obj]
else:
for job_obj, res in six.moves.zip(job_objs, results):
data = get_async_result_silent(res)
if isinstance(result_data, list):
result_data.append(data)
else:
result_data[job_obj] = data
return result_data
[docs] def submit_batch(self, job_files, threads=None, chunk_size=None, callback=None, **kwargs):
"""
Submits a batch of jobs given by *job_files* via a thread pool of size *threads* which
defaults to its instance attribute. When *chunk_size*, which defaults to
:py:attr:`chunk_size_submit`, is not negative, *job_files* are split into chunks of that
size which are passed to :py:meth:`submit`.
When *callback* is set, it is invoked after each successful job submission with the index of
the corresponding job file (starting at 0) and either the assigned job id or an exception if
any occurred. All other *kwargs* are passed to
:py:meth:`submit`.
The return value is a list containing the return values of the particular :py:meth:`submit`
calls, in an order that corresponds to *job_files*. When an exception was raised during a
submission, this exception is added to the returned list.
"""
return self._apply_batch(
func=self.submit,
result_type=list,
job_objs=job_files,
default_chunk_size=self.chunk_size_submit,
threads=threads,
chunk_size=chunk_size,
callback=callback,
**kwargs # noqa
)
[docs] def cancel_batch(self, job_ids, threads=None, chunk_size=None, callback=None, **kwargs):
"""
Cancels a batch of jobs given by *job_ids* via a thread pool of size *threads* which
defaults to its instance attribute. When *chunk_size*, which defaults to
:py:attr:`chunk_size_cancel`, is not negative, *job_ids* are split into chunks of that size
which are passed to :py:meth:`cancel`.
When *callback* is set, it is invoked after each successful job (or job chunk) cancelling
with the index of the corresponding job id (starting at 0) and either *None* or an exception
if any occurred. All other *kwargs* are passed to :py:meth:`cancel`.
Exceptions that occured during job cancelling are stored in a list and returned. An empty
list means that no exceptions occured.
"""
results = self._apply_batch(
func=self.cancel,
result_type=dict,
job_objs=job_ids,
default_chunk_size=self.chunk_size_cancel,
threads=threads,
chunk_size=chunk_size,
callback=callback,
**kwargs # noqa
)
# return only errors
return [error for error in results.values() if isinstance(error, Exception)]
[docs] def cleanup_batch(self, job_ids, threads=None, chunk_size=None, callback=None, **kwargs):
"""
Cleans up a batch of jobs given by *job_ids* via a thread pool of size *threads* which
defaults to its instance attribute. When *chunk_size*, which defaults to
:py:attr:`chunk_size_cleanup`, is not negative, *job_ids* are split into chunks of that size
which are passed to :py:meth:`cleanup`.
When *callback* is set, it is invoked after each successful job (or job chunk) cleaning with
the index of the corresponding job id (starting at 0) and either *None* or an exception if
any occurred. All other *kwargs* are passed to
:py:meth:`cleanup`.
Exceptions that occured during job cleaning are stored in a list and returned. An empty list
means that no exceptions occured.
"""
results = self._apply_batch(
func=self.cleanup,
result_type=dict,
job_objs=job_ids,
default_chunk_size=self.chunk_size_cleanup,
threads=threads,
chunk_size=chunk_size,
callback=callback,
**kwargs # noqa
)
# return only errors
return [error for error in results.values() if isinstance(error, Exception)]
[docs] def query_batch(self, job_ids, threads=None, chunk_size=None, callback=None, **kwargs):
"""
Queries the status of a batch of jobs given by *job_ids* via a thread pool of size *threads*
which defaults to its instance attribute. When *chunk_size*, which defaults to
:py:attr:`chunk_size_query`, is not negative, *job_ids* are split into chunks of that size
which are passed to :py:meth:`query`.
When *callback* is set, it is invoked after each successful job (or job chunk) status query
with the index of the corresponding job id (starting at 0) and the obtained status query
data or an exception if any occurred. All other *kwargs* are passed to :py:meth:`query`.
This method returns a dictionary that maps job ids to either the status query data or to an
exception if any occurred.
"""
return self._apply_batch(
func=self.query,
result_type=dict,
job_objs=job_ids,
default_chunk_size=self.chunk_size_query,
threads=threads,
chunk_size=chunk_size,
callback=callback,
**kwargs # noqa
)
def _apply_group(
self,
func,
result_type,
group_func,
job_objs,
threads=None,
callback=None,
**kwargs # noqa
):
# default arguments
threads = max(threads or self.threads or 1, 1)
# group objects
job_objs = group_func(make_list(job_objs))
# factory to call the passed callback for each job file even when chunking
def cb_factory(i):
if not callable(callback):
return None
def wrapper(result_data):
offset = sum(map(len, list(job_objs.values())[:i]))
for j in range(len(list(job_objs.values())[i])):
data = result_data if isinstance(result_data, Exception) else result_data[j]
callback(offset + j, data)
return wrapper
# threaded processing
pool = ThreadPool(threads)
results = [
pool.apply_async(func, make_tuple(arg), kwargs, callback=cb_factory(i))
for i, arg in enumerate(job_objs.items())
]
pool.close()
pool.join()
# store result data or an exception
result_data = result_type()
for _job_objs, res in six.moves.zip(job_objs.values(), results):
data = get_async_result_silent(res)
for i, job_obj in enumerate(_job_objs):
if isinstance(result_data, list):
result_data.append(data if isinstance(data, Exception) else data[i])
else:
result_data[job_obj] = data if isinstance(data, Exception) else data[job_obj]
return result_data
[docs] def submit_group(self, job_files, threads=None, callback=None, **kwargs):
"""
Submits several job groups given by *job_files* via a thread pool of size *threads* which
defaults to its instance attribute. As per the definition of a job group, a single job file
can result in multiple jobs being processed on the remote batch system.
When *callback* is set, it is invoked after each successful job submission with the index of
the corresponding job (starting at 0) and either the assigned job id or an exception if any
occurred. All other *kwargs* are passed to :py:meth:`submit`.
The return value is a list containing the return values of the particular :py:meth:`submit`
calls, in an order that in general corresponds *job_files*, with ids of single jobs per job
file properly expanded. When an exception was raised during a submission, this exception is
added to the returned list.
"""
# in order to use the generic grouping mechanism in _apply_group create a trivial group_func
def group_func(job_files):
groups = defaultdict(list)
for job_file in job_files:
groups[job_file].append(job_file)
return groups
return self._apply_group(
func=self.submit,
result_type=list,
group_func=group_func,
job_objs=job_files,
threads=threads,
callback=callback,
**kwargs # noqa
)
[docs] def cancel_group(self, job_ids, threads=None, callback=None, **kwargs):
"""
Takes several *job_ids*, groups them according to :py:meth:`group_job_ids`, and cancels all
groups simultaneously via a thread pool of size *threads* which defaults to its instance
attribute.
When *callback* is set, it is invoked after each successful job cancellation with the index
of the corresponding job id (starting at 0) and either *None* or an exception if any
occurred. All other *kwargs* are passed to :py:meth:`cancel`.
Exceptions that occured during job cancelling are stored in a list and returned. An empty
list means that no exceptions occured.
"""
results = self._apply_group(
func=self.cancel,
result_type=dict,
group_func=self.group_job_ids,
job_objs=job_ids,
threads=threads,
callback=callback,
**kwargs # noqa
)
# return only errors
return [error for error in results.values() if isinstance(error, Exception)]
[docs] def cleanup_group(self, job_ids, threads=None, callback=None, **kwargs):
"""
Takes several *job_ids*, groups them according to :py:meth:`group_job_ids`, and cleans up
all groups simultaneously via a thread pool of size *threads* which defaults to its instance
attribute.
When *callback* is set, it is invoked after each successful job cleanup with the index of
the corresponding job id (starting at 0) and either *None* or an exception if any occurred.
All other *kwargs* are passed to :py:meth:`cleanup`.
Exceptions that occured during job cancelling are stored in a list and returned. An empty
list means that no exceptions occured.
"""
results = self._apply_group(
func=self.cleanup,
result_type=dict,
group_func=self.group_job_ids,
job_objs=job_ids,
threads=threads,
callback=callback,
**kwargs # noqa
)
# return only errors
return [error for error in results.values() if isinstance(error, Exception)]
[docs] def query_group(self, job_ids, threads=None, callback=None, **kwargs):
"""
Takes several *job_ids*, groups them according to :py:meth:`group_job_ids`, and queries the
status of all groups simultaneously via a thread pool of size *threads* which defaults to
its instance attribute.
When *callback* is set, it is invoked after each successful job status query with the index
of the corresponding job id (starting at 0) and the obtained status query data or an
exception if any occurred. All other *kwargs* are passed to :py:meth:`query`.
This method returns a dictionary that maps job ids to either the status query data or to an
exception if any occurred.
"""
return self._apply_group(
func=self.query,
result_type=dict,
group_func=self.group_job_ids,
job_objs=job_ids,
threads=threads,
callback=callback,
**kwargs # noqa
)
[docs] def status_line(self, counts, last_counts=None, sum_counts=None, timestamp=True, align=False,
color=False):
"""
Returns a job status line containing job counts per status. When *last_counts* is *True*,
the status line also contains the differences in job counts with respect to the counts from
the previous call to this method. When you pass a list or tuple, those values are used
intead to compute the differences.
The status line starts with the sum of jobs which is inferred from *counts*. When you want
to use a custom value, set *sum_counts*. The length of *counts* should match the length of
*status_names* of this instance. When *timestamp* is *True*, the status line begins with the
current timestamp. When *timestamp* is a non-empty string, it is used as the ``strftime``
format.
*align* handles the alignment of the values in the status line by using a maximum width.
*True* will result in the default width of 4. When *align* evaluates to *False*, no
alignment is used. By default, some elements of the status line are colored. Set *color* to
*False* to disable this feature.
Example:
.. code-block:: python
status_line((2, 0, 0, 0, 0))
# 12:45:18: all: 2, pending: 2, running: 0, finished: 0, retry: 0, failed: 0
status_line((0, 2, 0, 0), last_counts=(2, 0, 0, 0), skip=["retry"], timestamp=False)
# all: 2, pending: 0 (-2), running: 2 (+2), finished: 2 (+0), failed: 0 (+0)
"""
# check and or set last counts
use_last_counts = bool(last_counts)
if use_last_counts and not isinstance(last_counts, (list, tuple)):
last_counts = self.last_counts or ([0] * len(self.status_names))
if last_counts and len(last_counts) != len(self.status_names):
raise Exception("{} last status counts expected, got {}".format(len(self.status_names),
len(last_counts)))
# check current counts
if len(counts) != len(self.status_names):
raise Exception("{} status counts expected, got {}".format(len(self.status_names),
len(counts)))
# calculate differences
if last_counts:
diffs = tuple(n - m for n, m in zip(counts, last_counts))
# number formatting
if isinstance(align, bool) or not isinstance(align, six.integer_types):
align = 4 if align else 0
count_fmt = "%d" if not align else "%{}d".format(align)
diff_fmt = "%+d" if not align else "%+{}d".format(align)
# build the status line
line = ""
if timestamp:
time_format = timestamp if isinstance(timestamp, six.string_types) else "%H:%M:%S"
line += "{}: ".format(time.strftime(time_format))
if sum_counts is None:
sum_counts = sum(counts)
line += "all: " + count_fmt % (sum_counts,)
for i, (status, count) in enumerate(zip(self.status_names, counts)):
count = count_fmt % count
if color:
count = colored(count, style="bright")
line += ", {}: {}".format(status, count)
if diffs:
diff = diff_fmt % diffs[i]
if color:
# 0 if negative, 1 if zero, 2 if positive
style_idx = (diffs[i] > 0) + (diffs[i] >= 0)
diff = colored(diff, **self.status_diff_styles[status][style_idx])
line += " ({})".format(diff)
# store current counts for next call
self.last_counts = list(counts)
return line
[docs]class BaseJobFileFactory(six.with_metaclass(ABCMeta, object)):
"""
Base class that handles the creation of job files. It is likely that inheriting classes only
need to implement the :py:meth:`create` method as well as extend the constructor to handle
additional arguments.
The general idea behind this class is as follows. An instance holds the path to a directory
*dir*, defaulting to a new, temporary directory inside ``job.job_file_dir`` (which itself
defaults to the system's tmp path). Job input files, which are supported by almost all job /
batch systems, are automatically copied into this directory. The file name can be optionally
postfixed with a configurable string, so that multiple job files can be created and stored
within the same *dir* without the risk of interfering file names. A common use case would be
the use of a job number or id. Another *transformation* that is applied to copied files is the
rendering of variables. For example, when an input file looks like
.. code-block:: bash
#!/usr/bin/env bash
echo "Hello, {{my_variable}}!"
the rendering mechanism can replace variables such as ``my_variable`` following a double-brace
notation. Internally, the rendering is implemented in :py:meth:`render_file`, but there is
usually no need to call this method directly as implementations of this base class might use it
in their :py:meth:`create` method.
.. py::classattribute:: config_attrs
type: list
List of attributes that is used to create a configuration dictionary. See
:py:meth:`get_config` for more info.
.. py::attribute:: dir
type: string
The path to the internal job file directory.
.. py::attribute: cleanup
type: bool
Boolean that denotes whether this internal job file directory is temporary and should be
cleaned up upon instance deletion. It defaults to *True* when the *dir* constructor argument
is *None*.
"""
config_attrs = ["dir", "render_variables", "custom_log_file"]
render_key_cre = re.compile(r"\{\{(\w+)\}\}")
class Config(object):
def __repr__(self):
return repr(self.__dict__)
def __getattr__(self, attr):
return self.__dict__[attr]
def __setattr__(self, attr, value):
self.__dict__[attr] = value
def __getitem__(self, attr):
return self.__dict__[attr]
def __setitem__(self, attr, value):
self.__dict__[attr] = value
def __contains__(self, attr):
return attr in self.__dict__
def __init__(self, dir=None, render_variables=None, custom_log_file=None, mkdtemp=None,
cleanup=None):
super(BaseJobFileFactory, self).__init__()
cfg = Config.instance()
# get default values from config if None
if mkdtemp is None:
mkdtemp = cfg.get_expanded_bool("job", "job_file_dir_mkdtemp")
if cleanup is None:
cleanup = cfg.get_expanded_bool("job", "job_file_dir_cleanup")
# store the cleanup flag
self.cleanup = cleanup
# when dir ist None, a temporary directory is forced
if not dir:
mkdtemp = True
# store the directory, default to the job.job_file_dir config
self.dir = str(dir or cfg.get_expanded("job", "job_file_dir"))
self.dir = os.path.expandvars(os.path.expanduser(self.dir))
# create the directory
makedirs(self.dir)
# check if it should be extended by a temporary dir
if mkdtemp:
self.dir = tempfile.mkdtemp(dir=self.dir)
# store attributes
self.render_variables = render_variables or {}
self.custom_log_file = custom_log_file
# locks for thread-safe file operations
self.file_locks = defaultdict(Lock)
def __del__(self):
self.cleanup_dir(force=False)
def __call__(self, *args, **kwargs):
return self.create(*args, **kwargs)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
return
[docs] @classmethod
def postfix_file(cls, path, postfix=None, add_hash=False):
"""
Adds a *postfix* to a file *path*, right before the first file extension in the base name.
When *add_hash* is *True*, a hash based on the full source path is added before the postfix.
Example:
.. code-block:: python
postfix_file("/path/to/file.tar.gz", "_1")
# -> "/path/to/file_1.tar.gz"
postfix_file("/path/to/file.txt", "_1", add_hash=True)
# -> "/path/to/file_dacc4374d3_1.txt"
*postfix* might also be a dictionary that maps patterns to actual postfix strings. When a
pattern matches the base name of the file, the associated postfix is applied and the path is
returned. You might want to use an ordered dictionary to control the first match.
"""
path = str(path)
dirname, basename = os.path.split(path)
# get the actual postfix
_postfix = postfix
if isinstance(postfix, dict):
for pattern, _postfix in six.iteritems(postfix):
if fnmatch.fnmatch(basename, pattern):
break
else:
_postfix = ""
# optionally add a hash of the full path
if add_hash:
full_path = os.path.realpath(os.path.expandvars(os.path.expanduser(path)))
_postfix = "_" + create_hash(full_path) + (_postfix or "")
# add the postfix
if _postfix:
parts = basename.split(".", 1)
parts[0] += _postfix
path = os.path.join(dirname, ".".join(parts))
return path
[docs] @classmethod
def postfix_input_file(cls, path, postfix=None):
"""
Shorthand for :py:meth:`postfix_file` with *add_hash* set to *True*.
"""
return cls.postfix_file(path, postfix=postfix, add_hash=True)
[docs] @classmethod
def postfix_output_file(cls, path, postfix=None):
"""
Shorthand for :py:meth:`postfix_file` with *add_hash* set to *False*.
"""
return cls.postfix_file(path, postfix=postfix, add_hash=False)
[docs] @classmethod
def render_string(cls, s, key, value):
"""
Renders a string *s* by replacing ``{{key}}`` with *value* and returns it.
"""
return s.replace("{{" + key + "}}", str(value))
[docs] @classmethod
def linearize_render_variables(cls, render_variables):
"""
Linearizes variables contained in the dictionary *render_variables*. In some use cases,
variables may contain render expressions pointing to other variables, e.g.:
.. code-block:: python
render_variables = {
"variable_a": "Tom",
"variable_b": "Hello, {{variable_a}}!",
}
Situations like this can be simplified by linearizing the variables:
.. code-block:: python
linearize_render_variables(render_variables)
# ->
# {
# "variable_a": "Tom",
# "variable_b": "Hello, Tom!",
# }
"""
linearized = {}
for key, value in render_variables.items():
if not isinstance(value, str):
raise Exception("render variables must be strings, found '{}' for key '{}'".format(
value, key))
while True:
m = cls.render_key_cre.search(value)
if not m:
break
sub_key = m.group(1)
value = cls.render_string(value, sub_key, render_variables.get(sub_key, ""))
linearized[key] = value
# add base64 encoded render variables themselves
vars_str = base64.b64encode(six.b(json.dumps(linearized) or "-"))
if six.PY3:
vars_str = vars_str.decode("utf-8")
linearized["render_variables"] = vars_str
return linearized
[docs] @classmethod
def render_file(cls, src, dst, render_variables, postfix=None, silent=True):
"""
Renders a source file *src* with *render_variables* and copies it to a new location *dst*.
In some cases, a render variable value might contain a path that should be subject to file
postfixing (see :py:meth:`postfix_file`). When *postfix* is not *None*, this method will
replace substrings in the format ``postfix:<path>`` the postfixed ``path``. In the following
example, the variable ``my_command`` in *src* will be rendered with a string that contains a
postfixed path:
.. code-block:: python
render_file(src, dst, {"my_command": "echo postfix:some/path.txt"}, postfix="_1")
# replaces "{{my_command}}" in src with "echo some/path_1.txt" in dst
In case the file content is not readable, the method returns unless *silent* is *False* in
which case an exception is raised.
"""
src, dst = str(src), str(dst)
if not os.path.isfile(src):
raise IOError("source file for rendering does not exist: {}".format(src))
with open(src, "r") as f:
try:
content = f.read()
except UnicodeDecodeError:
if silent:
return
raise
def postfix_fn(m):
return cls.postfix_input_file(m.group(1), postfix=postfix)
for key, value in six.iteritems(render_variables):
# value might contain paths to be postfixed, denoted by "__law_job_postfix__:..."
if postfix:
value = re.sub(r"\_\_law\_job\_postfix\_\_:([^\s]+)", postfix_fn, value)
content = cls.render_string(content, key, value)
# finally, replace all non-rendered keys with empty strings
content = cls.render_key_cre.sub("", content)
with open(dst, "w") as f:
f.write(content)
[docs] def get_config(self, **kwargs):
"""
The :py:meth:`create` method potentially takes a lot of keywork arguments for configuring
the content of job files. It is useful if some of these configuration values default to
attributes that can be set via constructor arguments of this class.
This method merges keyword arguments *kwargs* (e.g. passed to :py:meth:`create`) with
default values obtained from instance attributes given in :py:attr:`config_attrs`. It
returns the merged values in a dictionary that can be accessed via dot-notation (attribute
notation). Example:
.. code-block:: python
class MyJobFileFactory(BaseJobFileFactory):
config_attrs = ["stdout", "stderr"]
def __init__(self, stdout="stdout.txt", stderr="stderr.txt", **kwargs):
super(MyJobFileFactory, self).__init__(**kwargs)
self.stdout = stdout
self.stderr = stderr
def create(self, **kwargs):
config = self.get_config(kwargs)
# when called as create(stdout="log.txt"):
# config.stderr is "stderr.txt"
# config.stdout is "log.txt"
...
"""
cfg = self.Config()
for attr in self.config_attrs:
cfg[attr] = copy.deepcopy(kwargs.get(attr, getattr(self, attr)))
return cfg
[docs] def cleanup_dir(self, force=True):
"""
Removes the directory that is held by this instance. When *force* is *False*, the directory
is only removed when :py:attr:`cleanup` is *True*.
"""
if not self.cleanup and not force:
return
if isinstance(self.dir, six.string_types) and os.path.exists(self.dir):
shutil.rmtree(self.dir)
[docs] @abstractmethod
def create(self, **kwargs):
"""
Abstract job file creation method that must be implemented by inheriting classes.
"""
return
[docs]class JobArguments(object):
"""
Wrapper class for job arguments. Currently, it stores a task class *task_cls*, a list of
*task_params*, a list of covered *branches*, an *auto_retry* flag, and custom *dashboard_data*.
It also handles argument encoding as reqired by the job wrapper script at
`law/job/job.sh <https://github.com/riga/law/blob/master/law/job/job.sh>`__.
.. py:attribute:: task_cls
type: :py:class:`law.Register`
The task class.
.. py:attribute:: task_params
type: list
The list of task parameters.
.. py:attribute:: branches
type: list
The list of branch numbers covered by the task.
.. py:attribute:: workers
type: int
The number of workers to use in "law run" commands.
.. py:attribute:: auto_retry
type: bool
A flag denoting if the job-internal automatic retry mechanism should be used.
.. py:attribute:: dashboard_data
type: list
If a job dashboard is used, this is a list of configuration values as returned by
:py:meth:`law.job.dashboard.BaseJobDashboard.remote_hook_data`.
"""
def __init__(self, task_cls, task_params, branches, workers=1, auto_retry=False,
dashboard_data=None):
super(JobArguments, self).__init__()
self.task_cls = task_cls
self.task_params = task_params
self.branches = branches
self.workers = max(workers, 1)
self.auto_retry = auto_retry
self.dashboard_data = dashboard_data or []
[docs] @classmethod
def encode_bool(cls, b):
"""
Encodes a boolean *b* into a string (``"yes"`` or ``"no"``).
"""
return "yes" if b else "no"
[docs] @classmethod
def encode_string(cls, s):
"""
Encodes a string *s* via base64 encoding.
"""
encoded = base64.b64encode(six.b(s or "-"))
return encoded.decode("utf-8") if six.PY3 else encoded
[docs] @classmethod
def encode_list(cls, l):
"""
Encodes a list *l* into a string via base64 encoding.
"""
encoded = base64.b64encode(six.b(" ".join(str(v) for v in l) or "-"))
return encoded.decode("utf-8") if six.PY3 else encoded
[docs] def get_args(self):
"""
Returns the list of encoded job arguments. The order of this list corresponds to the
arguments expected by the job wrapper script.
"""
return [
self.task_cls.__module__,
self.task_cls.__name__,
self.encode_string(self.task_params),
self.encode_list(self.branches),
self.workers,
self.encode_bool(self.auto_retry),
self.encode_list(self.dashboard_data),
]
[docs] def join(self):
"""
Returns the list of job arguments from :py:meth:`get_args`, joined into a single string
using a single space character.
"""
return " ".join(str(arg) for arg in self.get_args())
class DeprecatedInputFiles(dict):
"""
Class to keep track of input files for remote jobs that is only used to show a deprecation
warning for users still relying on lists. Therefore, this class emulates the most used list
methods and internally fills input files using dict methods. To be removed in version 1.0.
"""
@classmethod
def _log_warning(cls, method):
logger.warning_once(
"the use of input_files.{} is deprecated, please consider updating your code towards "
"using dictionaries instead, e.g., 'input_files[key] = path'; by doing so, law "
"automatically adds a render variable 'key' that will refer to the postfixed path of "
"the input file for immediate use in remote jobs".format(method),
)
def __init__(self, *args, **kwargs):
paths = None
if not kwargs and len(args) == 1 and isinstance(args[0], list):
paths = args[0]
args = ()
super(DeprecatedInputFiles, self).__init__(*args, **kwargs)
if paths:
self.extend(paths)
def _append(self, path):
# generate a key by taking the basename of the path and strip the file extension
path = str(path)
key = os.path.basename(path).split(".", 1)[0]
while key in self:
key += "_"
self[key] = path
def append(self, path):
# deprecation warning until v0.1
self._log_warning("append(path)")
self._append(path)
def extend(self, paths):
# deprecation warning until v0.1
self._log_warning("extend([path, ...])")
for path in paths:
self._append(path)
def __add__(self, paths):
# type-preserving shallow copy
self_ = self.__class__(self)
self_.extend(paths)
return self_
def __iadd__(self, paths):
self.extend(paths)
return self