Source code for law.util

# coding: utf-8

"""
Helpful utility functions.
"""

__all__ = [
    "default_lock", "io_lock", "console_lock", "no_value", "rel_path", "law_src_path",
    "law_home_path", "law_run", "print_err", "abort", "import_file", "get_terminal_width",
    "is_classmethod", "is_number", "is_float", "try_int", "round_discrete", "str_to_int",
    "flag_to_bool", "empty_context", "common_task_params", "colored", "uncolored", "query_choice",
    "is_pattern", "brace_expand", "range_expand", "range_join", "multi_match", "is_iterable",
    "is_lazy_iterable", "make_list", "make_tuple", "make_set", "make_unique", "is_nested",
    "flatten", "merge_dicts", "unzip", "which", "map_verbose", "map_struct", "mask_struct",
    "tmp_file", "perf_counter", "interruptable_popen", "readable_popen", "create_hash",
    "create_random_string", "copy_no_perm", "makedirs", "user_owns_file", "iter_chunks",
    "human_bytes", "parse_bytes", "human_duration", "parse_duration", "is_file_exists_error",
    "send_mail", "DotDict", "ShorthandDict", "open_compat", "patch_object", "join_generators",
    "quote_cmd", "escape_markdown", "classproperty", "BaseStream", "TeeStream", "FilteredStream",
]


import os
import sys
import types
import re
import math
import fnmatch
import itertools
import functools
import tempfile
import subprocess
import signal
import hashlib
import uuid
import shutil
import copy
import collections
import contextlib
import smtplib
import time
import datetime
import random
import threading
import io
import shlex
import inspect
import logging

import six

try:
    import ipykernel
    import ipykernel.iostream
except ImportError:
    ipykernel = None

try:
    import google.colab  # noqa
    ON_COLAB = True
except ImportError:
    ON_COLAB = False


logger = logging.getLogger(__name__)

# some globally usable thread locks
default_lock = threading.Lock()
io_lock = threading.Lock()
console_lock = threading.Lock()


class NoValue(object):

    _instance = None

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super(NoValue, cls).__new__(cls, *args, **kwargs)
        return cls._instance

    def __bool__(self):
        return False

    def __nonzero__(self):
        return False

    def __repr__(self):
        return "{}.no_value".format(self.__module__)

    def __str__(self):
        return "no_value"


#: Unique dummy value that is used to denote missing values and always evaluates to *False*.
no_value = NoValue()


[docs]def rel_path(anchor, *paths): """ Returns a path made of framgment *paths* relativ to an *anchor* path. When *anchor* is a file, its absolute directory is used instead. """ anchor = os.path.abspath(os.path.expandvars(os.path.expanduser(str(anchor)))) if os.path.exists(anchor) and os.path.isfile(anchor): anchor = os.path.dirname(anchor) return os.path.normpath(os.path.join(anchor, *map(str, paths)))
[docs]def law_src_path(*paths): """ Returns the law installation directory, optionally joined with *paths*. """ return rel_path(__file__, *map(str, paths))
[docs]def law_home_path(*paths): """ Returns the law home directory, optionally joined with *paths*. """ from law.config import law_home_path return law_home_path(*paths)
[docs]def law_run(argv, **kwargs): """ Runs a task with certain parameters as defined in *argv*, which can be a string or a list of strings. It must start with the family of the task to run, followed by the desired parameters. All *kwargs* are forwarded to :py:func:`luigi.interface.run`. Example: .. code-block:: python law_run(["MyTask", "--param", "value"]) law_run("MyTask --param value") """ from luigi.interface import run as luigi_run from luigi.cmdline_parser import CmdlineParser from law.parser import _reset as reset_parser # ensure that argv is a list of strings if isinstance(argv, six.string_types): argv = shlex.split(argv) else: argv = [str(arg) for arg in argv] # luigi's pid locking must be disabled argv.append("--no-lock") # run with a patch to the ArgumentParser to overwrite the prog default _build_parser_orig = CmdlineParser._build_parser @functools.wraps(_build_parser_orig) def _build_parser(*args, **kwargs): parser = _build_parser_orig(*args, **kwargs) parser.prog = "law run" return parser ret = False try: with patch_object( CmdlineParser, "_build_parser", staticmethod(_build_parser), orig=staticmethod(_build_parser_orig), ): ret = luigi_run(argv, **kwargs) finally: # reset parser objects reset_parser() return ret
[docs]def abort(msg=None, exitcode=1, color=True): """ Aborts the process (*sys.exit*) with an *exitcode*. If *msg* is not *None*, it is printed first to stdout if *exitcode* is 0 or *None*, and to stderr otherwise. When *color* is *True* and *exitcode* is not 0 or *None*, the message is printed in red. """ if msg is not None: if exitcode in (None, 0): print(msg) else: if color: msg = colored(msg, color="red") print_err(msg) sys.exit(exitcode)
[docs]def import_file(path, attr=None): """ Loads the content of a python file located at *path* and returns its package content as a dictionary. When *attr* is set, only the attribute with that name is returned. The file is not required to be importable as its content is loaded directly into the interpreter. While this approach is not necessarily clean, it can be useful in places where custom code must be loaded. """ # load the package contents path = os.path.expandvars(os.path.expanduser(str(path))) pkg = DotDict() with open(path, "r") as f: exec(f.read(), pkg) # extract a particular attribute if attr: if attr not in pkg: raise AttributeError("no local member '{}' found in file {}".format(attr, path)) return pkg[attr] return pkg
[docs]def get_terminal_width(fallback=False): """ Returns the terminal width when possible, and *None* otherwise. By default, the width is obtained through ``os.get_terminal_size``, querying the *sys.__stdout__* which might fail in case no valid output device is connected. However, when *fallback* is *True*, ``shutil.get_terminal_size`` is used instead, which priotizes the *COLUMNS* variable if set. """ width = None func = getattr(shutil if fallback else os, "get_terminal_size", None) if callable(func): try: width = func().columns except OSError: pass return width
[docs]def is_classmethod(func, cls=None): """ Returns *True* if *func* is a classmethod of *cls*, and *False* otherwise. When *cls* is *None*, it is extracted from the function's qualified name and module name. """ # when no cls is given, try to lookup it up in its associated module _hasattr = lambda attr: getattr(func, attr, None) is not None if cls is None: if _hasattr("__qualname__") and _hasattr("__module__") and "." in func.__qualname__: cls_name = func.__qualname__.rsplit(".", 1)[0] cls = getattr(sys.modules.get(func.__module__), cls_name, None) # when no class exists at this point, func cannot be a classmethod if cls is None: return False # func requires a __name__ if not _hasattr("__name__"): raise AttributeError("func '{}' has not attribute __name__".format(func)) # func must be the class attribute with that name if getattr(cls, func.__name__, None) != func: return False # finally, find the attribute in the __dict__ of cls or its super classes and check the type try: for _cls in inspect.getmro(cls): if func.__name__ not in _cls.__dict__: continue return cls.__dict__[func.__name__].__class__.__name__ == "classmethod" except AttributeError: return False return False
[docs]def is_number(n): """ Returns *True* if *n* is a number, i.e., integer or float, and in particular no boolean. """ return isinstance(n, six.integer_types + (float,)) and not isinstance(n, bool)
[docs]def is_float(v): """ Takes any value *v* and tries to convert it to a float. Returns *True* success, and *False* otherwise. """ try: float(v) return True except: return False
[docs]def try_int(n): """ Takes a number *n* and tries to convert it to an integer. When *n* has no decimals, an integer is returned with the same value as *n*. Otherwise, a float is returned. """ n_int = int(n) return n_int if n == n_int else n
[docs]def round_discrete(n, base=1.0, round_fn=round): """ round_discrete(n, base=1.0, round_fn="round") Rounds a number *n* to a discrete *base*. *round_fn* can be a function used for rounding and defaults to the built-in ``round`` function. It also accepts string values ``"round"``, ``"floor"`` and ``"ceil"`` which are resolved to the corresponding math functions. Example: .. code-block:: python round_discrete(17, 5) # -> 15.0 round_discrete(17, 2.5) # -> 17.5 round_discrete(17, 2.5) # -> 17.5 round_discrete(17, 2.5, math.floor) round_discrete(17, 2.5, "floor") # -> 15.0 """ if isinstance(round_fn, six.string_types): if round_fn == "round": round_fn = round elif round_fn == "floor": round_fn = math.floor elif round_fn == "ceil": round_fn = math.ceil else: raise ValueError("unknown round function '{}'".format(round_fn)) return base * round_fn(float(n) / base)
[docs]def str_to_int(s): """ Converts a string *s* into an integer under consideration of binary, octal, decimal and hexadecimal representations, such as ``"0o0660"``. """ s = str(s).lower() m = re.match(r"^0(b|o|d|x)\d+$", s) base = {"b": 2, "o": 8, "d": 10, "x": 16}[m.group(1)] if m else 10 return int(s, base=base)
[docs]def flag_to_bool(s, silent=False): """ Takes a string flag *s* and returns whether it evaluates to *True* (values ``"1"``, ``"true"`` ``"yes"``, ``"y"``, ``"on"``, case-insensitive) or *False* (values ``"0"``, ``"false"``, `"no"``, ``"n"``, ``"off"``, case-insensitive). When *s* is already a boolean, it is returned unchanged. An error is thrown when *s* is neither of the allowed values and *silent* is *False*. Otherwise, *None* is returned. """ if isinstance(s, bool): return s elif isinstance(s, six.string_types): if s.lower() in ("true", "1", "yes", "y", "on"): return True elif s.lower() in ("false", "0", "no", "n", "off"): return False if silent: return None else: raise ValueError("cannot convert to bool: {}".format(s))
[docs]@contextlib.contextmanager def empty_context(obj=None): """ Yields an empty context that can be used in case of dynamically choosing context managers while maintaining code structure. """ yield obj
[docs]def common_task_params(task_instance, task_cls): """ Returns the parameters that are common between a *task_instance* and a *task_cls* in a dictionary with values taken directly from the task instance. The difference with respect to ``luigi.util.common_params`` is that the values are not parsed using the parameter objects of the task class, which might be faster for some purposes. """ task_cls_param_names = [name for name, _ in task_cls.get_params()] common_param_names = [ name for name, _ in task_instance.get_params() if name in task_cls_param_names ] return {name: getattr(task_instance, name) for name in common_param_names}
colors = { "default": 39, "black": 30, "red": 31, "green": 32, "yellow": 33, "blue": 34, "magenta": 35, "cyan": 36, "light_gray": 37, "dark_gray": 90, "light_red": 91, "light_green": 92, "light_yellow": 93, "light_blue": 94, "light_magenta": 95, "light_cyan": 96, "white": 97, } backgrounds = { "default": 49, "black": 40, "red": 41, "green": 42, "yellow": 43, "blue": 44, "magenta": 45, "cyan": 46, "light_gray": 47, "dark_gray": 100, "light_red": 101, "light_green": 102, "light_yellow": 103, "light_blue": 104, "light_magenta": 105, "light_cyan": 106, "white": 107, } styles = { "default": 0, "bright": 1, "dim": 2, "underlined": 4, "blink": 5, "inverted": 7, "hidden": 8, } uncolor_cre = re.compile(r"(\x1B\[[0-?]*[ -/]*[@-~])")
[docs]def colored(msg, color=None, background=None, style=None, force=False): """ Return the colored version of a string *msg*. For *color*, *background* and *style* options, see https://misc.flogisoft.com/bash/tip_colors_and_formatting. They can also be explicitely set to ``"random"`` to get a random value. Unless *force* is *True*, the *msg* string is returned unchanged in case the output is neither a tty nor an IPython output stream. """ if not force: tty = False ipy = False try: tty = os.isatty(sys.stdout.fileno()) except: pass if not tty and ipykernel is not None: ipy = isinstance(sys.stdout, ipykernel.iostream.OutStream) if not tty and not ipy: return msg if color == "random": color = random.choice(list(colors.values())) else: color = colors.get(color, colors["default"]) if background == "random": background = random.choice(list(backgrounds.values())) else: background = backgrounds.get(background, backgrounds["default"]) if not isinstance(style, (tuple, list, set)): style = (style,) style_values = list(styles.values()) style = ";".join( str(random.choice(style_values) if s == "random" else styles.get(s, styles["default"])) for s in style ) return "\033[{};{};{}m{}\033[0m".format(style, background, color, msg)
[docs]def uncolored(s): """ Removes all color codes from a string *s* and returns it. """ return uncolor_cre.sub("", s)
[docs]def query_choice(msg, choices, default=None, descriptions=None, lower=True): """ Interactively query a choice from the prompt until the input matches one of the *choices*. The prompt can be configured using *msg* and *descriptions*, which, if set, must have the same length as *choices*. When *default* is not *None* it must be one of the choices and is used when the input is empty. When *lower* is *True*, the input is compared to the choices in lower case. """ choices = _choices = [str(c) for c in choices] if lower: _choices = [c.lower() for c in choices] if default is not None: if default not in choices: raise Exception("default must be one of the choices") hints = [(choice if choice != default else choice + "*") for choice in choices] if descriptions is not None: if len(descriptions) != len(choices): raise ValueError("length of descriptions must match length of choices") hints = ["{}({})".format(*tpl) for tpl in zip(hints, descriptions)] msg += " [{}] ".format(", ".join(hints)) choice = None while choice not in _choices: if choice is not None: print("invalid choice: '{}'".format(choice)) choice = six.moves.input(msg) if default is not None and choice == "": choice = default if lower: choice = choice.lower() return choice
[docs]def is_pattern(s): """ Returns *True* if the string *s* represents a pattern, i.e., if it contains characters such as ``"*"`` or ``"?"``. """ return "*" in s or "?" in s
[docs]def brace_expand(s, split_csv=False, escape_csv_sep=True): """ Expands brace statements in a string *s* and returns a list containing all possible string combinations. When *split_csv* is *True*, the input string is split by all comma characters located outside braces, except for escaped ones when *escape_csv_sep* is *True*, and the expansion is performed sequentially on all elements. Example: .. code-block:: python brace_expand("A{1,2}B") # -> ["A1B", "A2B"] brace_expand("A{1,2}B{3,4}C") # -> ["A1B3C", "A1B4C", "A2B3C", "A2B4C"] brace_expand("A{1,2}B,C{3,4}D") # note the full 2x2 expansion # -> ["A1B,C3D", "A1B,C4D", "A2B,C3D", "A2B,C4D"] brace_expand("A{1,2}B,C{3,4}D", split_csv=True) # note the 2+2 sequential expansion # -> ["A1B", "A2B", "C3D", "C4D"] brace_expand("A{1,2}B,C{3}D", split_csv=True) # note the 2+1 sequential expansion # -> ["A1B", "A2B", "C3D"] """ # first, replace escaped braces br_open = "__law_brace_open__" br_close = "__law_brace_close__" s = s.replace(r"\{", br_open).replace(r"\}", br_close) # compile the expression that finds brace statements cre = re.compile(r"\{[^\{]*\}") # take into account csv splitting if split_csv: # replace csv separators in brace statements to avoid splitting br_sep = "__law_brace_csv_sep__" _s = cre.sub(lambda m: m.group(0).replace(",", br_sep), s) # replace escaped commas if escape_csv_sep: escaped_sep = "__law_escaped_csv_sep__" _s = _s.replace(r"\,", escaped_sep) # split by real csv separators except escaped ones when requested parts = _s.split(",") # add back normal commas if escape_csv_sep: parts = [part.replace(escaped_sep, ",") for part in parts] # start recursion when a comma was found, otherwise continue if len(parts) > 1: # replace csv separators in braces again and recurse parts = [part.replace(br_sep, ",") for part in parts] return sum((brace_expand(part, split_csv=False) for part in parts), []) # split the string into n sequences with values to expand and n+1 fixed entities sequences = cre.findall(s) entities = cre.split(s) if len(sequences) + 1 != len(entities): raise ValueError("the number of sequences ({}) and the number of fixed entities ({}) are " "not compatible".format(",".join(sequences), ",".join(entities))) # split each sequence by comma sequences = [seq[1:-1].split(",") for seq in sequences] # create a template using the fixed entities used for formatting tmpl = "{}".join(entities) # build all combinations res = [] for values in itertools.product(*sequences): _s = tmpl.format(*values) # insert escaped braces again _s = _s.replace(br_open, r"\{").replace(br_close, r"\}") res.append(_s) return res
[docs]def range_expand(s, include_end=False, min_value=None, max_value=None, sep=":"): """ Takes a string, or a sequence of strings in the format ``"1:3"``, or a tuple or a sequence of tuples containing start and stop values of a range and returns a list of all intermediate values. When *include_end* is *True*, the end value is included. One sided range expressions such as ``":4"`` or ``"4:"`` for strings and ``(None, 4)`` or ``(4, None)`` for tuples are also expanded but they require *min_value* and *max_value* to be set (an exception is raised otherwise), with *max_value* being either included or not, depending on *include_end*. Also, when a *min_value* (*max_value*) is set, the minimum (maximum) of expanded range is limited at this value. Example: .. code-block:: python range_expand("5:8") # -> [5, 6, 7] range_expand((6, 9)) # -> [6, 7, 8] range_expand("5:8", include_end=True) # -> [5, 6, 7, 8] range_expand(["5-8", "10"]) # -> [5, 6, 7, 10] range_expand(["5-8", "10-"]) # -> Exception, no max_value set range_expand(["5-8", "10-"], max_value=12) # -> [5, 6, 7, 10, 11] range_expand(["5-8", "10-"], max_value=12, include_end=True) # -> [5, 6, 7, 8, 10, 11, 12] """ def to_int(v, s=None): try: return int(v) except ValueError: raise ValueError("invalid number or range '{}'".format(v if s is None else s)) # make_list is used below, but we need to distinguish between lists and tuples if isinstance(s, tuple): s = [s] numbers = [] for s in make_list(s): start, stop, value = None, None, None single_value = False if isinstance(s, (tuple, list)): # parse tuple if len(s) == 1: value = s[0] single_value = True elif len(s) == 2: start, stop = s else: raise ValueError("invalid range tuple length: {}".format(s)) else: # parse as string s = str(s) if sep in s: parts = s.split(sep, 1) start = parts[0] or None stop = parts[1] or None else: value = s single_value = True if single_value: # add a single value numbers.append(to_int(value)) else: # build the range if start is None: if min_value is None: raise Exception("range '{}' with missing start value requires min_value to be " "set".format(s)) start = min_value if stop is None: if max_value is None: raise Exception("range '{}' with missing stop value requires max_value to be " "set".format(s)) stop = max_value # convert to integers and potentially swap start = to_int(start) stop = to_int(stop) if start > stop: start, stop = stop, start # add numbers numbers.extend(range(start, stop + int(bool(include_end)))) # remove duplicates preserving the order numbers = make_unique(numbers) # apply limits if min_value is not None: numbers = [num for num in numbers if num >= min_value] if max_value is not None: py_max_value = (max_value + 1) if include_end else max_value numbers = [num for num in numbers if num < py_max_value] return numbers
[docs]def range_join(numbers, to_str=False, include_end=False, sep=",", range_sep=":"): """ Takes a sequence of positive integer numbers given either as integer or string types, and returns a sequence 1- and 2-tuples, denoting either single numbers or start and end values of possible ranges. Unless *include_end* is *True*, end values are not included. When *to_str* is *True*, a string is returned in a format consistent to :py:func:`range_expand` with ranges constructed by *range_sep* and merged with *sep*. Example: .. code-block:: python range_join([1, 2, 3, 5]) # -> [(1, 4), (5,)] range_join([1, 2, 3, 5], include_end=True) # -> [(1, 3), (5,)] range_join([1, 2, 3, 5, 7, 8, 9]) # -> [(1, 4), (5,), (7, 10)] range_join([1, 2, 3, 5, 7, 8, 9], to_str=True) # -> "1:4,5,7:10" """ if not numbers: return "" if to_str else [] # check type, convert, make unique and sort _numbers = [] for n in numbers: if isinstance(n, six.string_types): try: n = int(n) except ValueError: raise ValueError("invalid number format '{}'".format(n)) if isinstance(n, six.integer_types): _numbers.append(n) else: raise TypeError("cannot handle non-integer value '{}' in numbers to join".format(n)) numbers = sorted(set(_numbers)) # iterate through numbers, keep track of last starts and stops and fill a list of range tuples ranges = [] start = stop = numbers[0] for n in numbers[1:]: if n == stop + 1: stop += 1 else: ranges.append((start,) if start == stop else (start, stop + int(bool(not include_end)))) start = stop = n # add the last one ranges.append((start,) if start == stop else (start, stop + int(bool(not include_end)))) # convert to string representation if to_str: ranges = sep.join( (str(r[0]) if len(r) == 1 else "{1}{0}{2}".format(range_sep, *r)) for r in ranges ) return ranges
[docs]def multi_match(name, patterns, mode=any, regex=False): """ Compares *name* to multiple *patterns* and returns *True* in case of at least one match (*mode* = *any*, the default), or in case all patterns match (*mode* = *all*). Otherwise, *False* is returned. When *regex* is *True*, *re.match* is used instead of *fnmatch.fnmatch*. """ patterns = make_list(patterns) if not regex: return mode(fnmatch.fnmatch(name, pattern) for pattern in patterns) else: return mode(re.match(pattern, name) for pattern in patterns)
[docs]def is_iterable(obj): """ Returns *True* when an object *obj* is iterable and *False* otherwise. """ try: iter(obj) except Exception: return False return True
lazy_iter_types = ( types.GeneratorType, six.moves.collections_abc.MappingView, six.moves.range, six.moves.map, enumerate, )
[docs]def is_lazy_iterable(obj): """ Returns whether *obj* is iterable lazily, such as generators, range objects, maps, etc. """ return isinstance(obj, lazy_iter_types)
[docs]def make_list(obj, cast=True): """ Converts an object *obj* to a list and returns it. Objects of types *tuple* and *set* are converted if *cast* is *True*. Otherwise, and for all other types, *obj* is put in a new list. """ if isinstance(obj, list): return list(obj) if is_lazy_iterable(obj): return list(obj) if isinstance(obj, (tuple, set)) and cast: return list(obj) return [obj]
[docs]def make_tuple(obj, cast=True): """ Converts an object *obj* to a tuple and returns it. Objects of types *list* and *set* are converted if *cast* is *True*. Otherwise, and for all other types, *obj* is put in a new tuple. """ if isinstance(obj, tuple): return obj if is_lazy_iterable(obj): return tuple(obj) if isinstance(obj, (list, set)) and cast: return tuple(obj) return (obj,)
[docs]def make_set(obj, cast=True): """ Converts an object *obj* to a set and returns it. Objects of types *list* and *tuple* are converted if *cast* is *True*. Otherwise, and for all other types, *obj* is put in a new set. """ if isinstance(obj, set): return obj if is_lazy_iterable(obj): return set(obj) if isinstance(obj, (list, tuple)) and cast: return set(obj) return {obj}
[docs]def make_unique(obj): """ Takes a list or tuple *obj*, removes duplicate elements in order of their appearance and returns the sequence of remaining, unique elements. The sequence type is preserved. When *obj* is neither a list nor a tuple, but iterable, a list is returned. Otherwise, a *TypeError* is raised. """ if not isinstance(obj, (list, tuple)): if not is_iterable(obj) and not is_lazy_iterable(obj): raise TypeError("object is neither list, tuple, nor generic iterable") obj = list(obj) ret = sorted(obj.__class__(set(obj)), key=lambda elem: obj.index(elem)) return obj.__class__(ret) if isinstance(obj, tuple) else ret
[docs]def is_nested(obj): """ Takes a list or tuple *obj* and checks whether it only contains items of types list and tuple. """ return isinstance(obj, (list, tuple)) and all(isinstance(item, (list, tuple)) for item in obj)
[docs]def flatten(*structs, **kwargs): """ flatten(*structs, flatten_dict=True, flatten_list=True, flatten_tuple=True, flatten_set=True) Takes one or multiple complex structured objects *structs*, flattens them, and returns a single list. *flatten_dict*, *flatten_list*, *flatten_tuple* and *flatten_set* configure if objects of the respective types are flattened (the default). If not, they are returned unchanged. """ if len(structs) == 0: return [] if len(structs) > 1: return flatten(structs, **kwargs) struct = structs[0] flatten_seq = lambda seq: sum((flatten(obj, **kwargs) for obj in seq), []) if isinstance(struct, dict): if kwargs.get("flatten_dict", True): return flatten_seq(struct.values()) elif isinstance(struct, list): if kwargs.get("flatten_list", True): return flatten_seq(struct) elif isinstance(struct, tuple): if kwargs.get("flatten_tuple", True): return flatten_seq(struct) elif isinstance(struct, set): if kwargs.get("flatten_set", True): return flatten_seq(struct) elif is_lazy_iterable(struct): return flatten_seq(struct) return [struct]
[docs]def merge_dicts(*dicts, **kwargs): """ merge_dicts(*dicts, inplace=False, cls=None, deep=False) Takes multiple *dicts* and returns a single merged dict. The merging takes place in order of the passed dicts and therefore, values of rear objects have precedence in case of field collisions. By default, a new dictionary is returned. However, when *inplace* is *True*, all update operations are performed inplace on the first object in *dicts*. When not inplace, the class of the returned merged dict is configurable via *cls*. If it is *None*, the class is inferred from the first dict object in *dicts*. When *deep* is *True*, dictionary types within the dictionaries to merge are updated recursively such that their fields are merged. This is only possible when input dictionaries have a similar structure. Example: .. code-block:: python merge_dicts({"foo": 1, "bar": {"a": 1, "b": 2}}, {"bar": {"c": 3}}) # -> {"foo": 1, "bar": {"c": 3}} # fully replaced "bar" merge_dicts({"foo": 1, "bar": {"a": 1, "b": 2}}, {"bar": {"c": 3}}, deep=True) # -> {"foo": 1, "bar": {"a": 1, "b": 2, "c": 3}} # inserted entry bar.c merge_dicts({"foo": 1, "bar": {"a": 1, "b": 2}}, {"bar": 2}, deep=True) # -> {"foo": 1, "bar": 2} # "bar" has a different type, so this just uses the rear value """ if not dicts: raise ValueError("cannot merge empty sequence of dictionaries") inplace = kwargs.get("inplace", False) if inplace: merged_dict = dicts[0] else: # get or infer the class cls = kwargs.get("cls", None) if cls is None: for d in dicts: if isinstance(d, dict): cls = d.__class__ break else: raise TypeError("cannot infer cls as none of the passed objects is of type dict") # create a new instance merged_dict = cls() # start merging deep = kwargs.get("deep", False) for d in dicts[(1 if inplace else 0):]: if not isinstance(d, dict): continue if deep: for k, v in d.items(): # just take the value as is when it is not a dict, or the field is either not # existing yet or not a dict in the merged dict if not isinstance(v, dict) or not isinstance(merged_dict.get(k), dict): merged_dict[k] = v else: # merge by recursion merge_dicts(merged_dict[k], v, inplace=True, deep=deep) else: merged_dict.update(d) return merged_dict
[docs]def unzip(struct, fill_none=False): """ Unzips a *struct* consisting of sequences with equal lengths and returns lists with 1st, 2nd, etc elements. This function can be thought of as the opposite of the ``zip`` builtin. The number of elements per returned list is determined by the length of the first sequence in *struct*. In case a sequence does contain fewer items an exception is raised. However, if *fill_none* is *True*, *None* is inserted instead. .. code-block:: python unzip([(1, 2), (3, 4)]) # -> ([1, 3], [2, 4]) unzip([(1, 2), (3,)]) # -> ValueError unzip([(1, 2), (3,)], fill_none=True) # -> ([1, 3], [2, None]) """ lists = None for i, obj in enumerate(struct): # determine the number of lists to return if lists is None: lists = tuple([] for _ in range(len(obj))) # fill them for j, l in enumerate(lists): if len(obj) > j: l.append(obj[j]) elif fill_none: l.append(None) else: raise ValueError( "insufficient length {} of sequence at index {} to unzip".format(j, len(lists)), ) return lists
[docs]def which(prog): """ Pythonic ``which`` implementation. Returns the path to an executable *prog* by searching in *PATH*, or *None* when it could not be found. """ executable = lambda path: os.path.isfile(path) and os.access(path, os.X_OK) # prog can also be a path dirname, _ = os.path.split(str(prog)) if dirname: if executable(str(prog)): return prog elif "PATH" in os.environ: for search_path in os.environ["PATH"].split(os.pathsep): path = os.path.join(search_path.strip('"'), prog) if executable(path): return path return None
[docs]def map_verbose(func, seq, msg="{}", every=25, start=True, end=True, offset=0, callback=None): """ Same as the built-in map function but prints a *msg* after chunks of size *every* iterations. When *start* (*stop*) is *True*, the *msg* is also printed after the first (last) iteration. Note that *msg* is supposed to be a template string that will be formatted with the current iteration number (starting at 0) plus *offset* using ``str.format``. When *callback* is callable, it is invoked instead of the default print method with the current iteration number (without *offset*) as the only argument. Example: .. code-block:: python func = lambda x: x ** 2 msg = "computing square of {}" squares = map_verbose(func, range(7), msg, every=3) # -> # computing square of 0 # computing square of 2 # computing square of 5 # computing square of 6 """ # default callable if not callable(callback): def callback(i): print(msg.format(i + offset)) results = [] for i, obj in enumerate(seq): results.append(func(obj)) do_call = (start and i == 0) or (i + 1) % every == 0 if do_call: callback(i) else: if end and results and not do_call: callback(i) return results
[docs]def map_struct(func, struct, map_dict=True, map_list=True, map_tuple=False, map_set=False, cls=None, custom_mappings=None): """ Applies a function *func* to each value of a complex structured object *struct* and returns the output in the same structure. Example: .. code-block:: python struct = {"foo": [123, 456], "bar": [{"1": 1}, {"2": 2}]} def times_two(i): return i * 2 map_struct(times_two, struct) # -> {"foo": [246, 912], "bar": [{"1": 2}, {"2": 4}]} *map_dict*, *map_list*, *map_tuple* and *map_set* configure if objects of the respective types are traversed or mapped as a whole. They can be booleans or integer values defining the depth of that setting in the struct. When *cls* is not *None*, it exclusively defines the class of objects that *func* is applied on. All other objects are unchanged. *custom_mappings* key be a dictionary that maps custom types to custom object traversal methods. The following example would tranverse lists backwards: .. code-block:: python def traverse_lists(func, l, **kwargs): return [map_struct(func, v, **kwargs) for v in l[::-1]] map_struct(times_two, struct, custom_mappings={list: traverse_lists}) # -> {"foo": [912, 246], "bar": [{"1": 2}, {"2": 4}]} """ # interpret generators and views as lists if is_lazy_iterable(struct): struct = list(struct) # determine valid types for struct traversal valid_types = tuple() if map_dict: valid_types += (dict,) if is_number(map_dict): map_dict -= 1 if map_list: valid_types += (list,) if is_number(map_list): map_list -= 1 if map_tuple: valid_types += (tuple,) if is_number(map_tuple): map_tuple -= 1 if map_set: valid_types += (set,) if is_number(map_set): map_set -= 1 # is an explicit cls set? if cls is not None: return func(struct) if isinstance(struct, cls) else struct # custom mapping? if custom_mappings and isinstance(struct, tuple(flatten(custom_mappings.keys()))): # get the mapping function for mapping_types, mapping_func in six.iteritems(custom_mappings): if isinstance(struct, mapping_types): return mapping_func(func, struct, map_dict=map_dict, map_list=map_list, map_tuple=map_tuple, map_set=map_set, cls=cls, custom_mappings=custom_mappings) # this point should never be reached return struct # traverse? if isinstance(struct, valid_types): # create a new struct, treat tuples as lists for itertative item appending new_struct = struct.__class__() if not isinstance(struct, tuple) else [] # create type-dependent generator and addition callback if isinstance(struct, (list, tuple)): gen = enumerate(struct) add = lambda _, value: new_struct.append(value) elif isinstance(struct, set): gen = enumerate(struct) add = lambda _, value: new_struct.add(value) else: # dict gen = six.iteritems(struct) add = lambda key, value: new_struct.__setitem__(key, value) # recursively fill the new struct for key, value in gen: value = map_struct(func, value, map_dict=map_dict, map_list=map_list, map_tuple=map_tuple, map_set=map_set, cls=cls, custom_mappings=custom_mappings) add(key, value) # convert tuples if isinstance(struct, tuple): new_struct = struct.__class__(new_struct) return new_struct # apply the mapping function on everything else return func(struct)
[docs]def mask_struct(mask, struct, replace=no_value, keep_missing=True, convert_types=None): """ Masks a complex structured object *struct* with a *mask* and returns the remaining values. When *replace* is set, masked values are replaced with that value instead of being removed. The *mask* can have a complex structure as well. In case an item in *struct* is not matched by a value in *mask*, the item is kept unless *keep_missing* is *False*. When *keep_missing* is *True*, unmatched items are removed. *convert_types* can be a dictionary containing conversion functions mapped to types (or tuples) thereof that is applied to objects during the struct traversal if their types match. Examples: .. code-block:: python struct = {"a": [1, 2], "b": [3, ["foo", "bar"]]} # simple example mask_struct({"a": [False, True], "b": False}, struct) # => {"a": [2]} # omitting mask information results in kept values mask_struct({"a": [False, True]}, struct) # => {"a": [2], "b": [3, ["foo", "bar"]]} """ # interpret lazy iterables lists if is_lazy_iterable(struct): struct = list(struct) # cast convert types if convert_types and isinstance(struct, tuple(flatten(convert_types.keys()))): # get the mapping function for _types, convert in six.iteritems(convert_types): if isinstance(struct, _types): struct = convert(struct) break # when mask is a bool, or struct is not a dict or sequence, apply the mask immediately if isinstance(mask, bool) or not isinstance(struct, (list, tuple, dict)): return struct if mask else replace # check list and tuple types if isinstance(struct, (list, tuple)) and isinstance(mask, (list, tuple)): new_struct = [] for i, val in enumerate(struct): if i >= len(mask): if keep_missing: new_struct.append(val) else: repl = replace if isinstance(replace, (list, tuple)) and len(replace) > i: repl = replace[i] val = mask_struct(mask[i], val, replace=repl, keep_missing=keep_missing, convert_types=convert_types) if val != no_value: new_struct.append(val) return struct.__class__(new_struct) if new_struct else replace # check dict types if isinstance(struct, dict) and isinstance(mask, dict): new_struct = struct.__class__() for key, val in six.iteritems(struct): if key not in mask: if keep_missing: new_struct[key] = val else: repl = replace if isinstance(replace, dict) and key in replace: repl = replace[key] val = mask_struct(mask[key], val, replace=repl, keep_missing=keep_missing, convert_types=convert_types) if val != no_value: new_struct[key] = val return new_struct or replace # when this point is reached, mask and struct have incompatible types raise TypeError( "mask and struct must have the same type, got '{}' and '{}'".format( type(mask), type(struct)), )
[docs]@contextlib.contextmanager def tmp_file(*args, **kwargs): """ Context manager that creates an empty, temporary file, yields the file descriptor number and temporary path, and eventually removes it. All *args* and *kwargs* are passed to :py:meth:`tempfile.mkstemp`. The behavior of this function is similar to ``tempfile.NamedTemporaryFile`` which, however, yields an already opened file object. """ fileno, path = tempfile.mkstemp(*args, **kwargs) # create the file with open(path, "w") as f: f.write("") # yield it try: yield fileno, path finally: if os.path.exists(path): os.remove(path)
[docs]def perf_counter(): """ Returns ``time.perf_counter()`` for python 3 and ``time.time()`` for python 2. """ return time.time() if six.PY2 else time.perf_counter()
[docs]def interruptable_popen(*args, **kwargs): """ interruptable_popen(*args, stdin_callback=None, stdin_delay=0, interrupt_callback=None, kill_timeout=None, **kwargs) # noqa Shorthand to :py:class:`Popen` followed by :py:meth:`Popen.communicate` which can be interrupted by *KeyboardInterrupt*. The return code, standard output and standard error are returned in a 3-tuple. *stdin_callback* can be a function accepting no arguments and whose return value is passed to ``communicate`` after a delay of *stdin_delay* to feed data input to the subprocess. *interrupt_callback* can be a function, accepting the process instance as an argument, that is called immediately after a *KeyboardInterrupt* occurs. After that, a SIGTERM signal is send to the subprocess to allow it to gracefully shutdown. When *kill_timeout* is set, and the process is still alive after that period (in seconds), a SIGKILL signal is sent to force the process termination. All other *args* and *kwargs* are forwarded to the :py:class:`Popen` constructor. """ # get kwargs not being passed to Popen stdin_callback = kwargs.pop("stdin_callback", None) stdin_delay = kwargs.pop("stdin_delay", 0) interrupt_callback = kwargs.pop("interrupt_callback", None) kill_timeout = kwargs.pop("kill_timeout", None) # start the subprocess in a new process group kwargs["preexec_fn"] = os.setsid p = subprocess.Popen(*args, **kwargs) # get stdin stdin_data = None if callable(stdin_callback): if stdin_delay > 0: time.sleep(stdin_delay) stdin_data = stdin_callback() if isinstance(stdin_data, six.string_types): stdin_data = (stdin_data + "\n").encode("utf-8") # handle interrupts try: out, err = p.communicate(stdin_data) except KeyboardInterrupt: # allow the interrupt_callback to perform a custom process termination if callable(interrupt_callback): interrupt_callback(p) # when the process is still alive, send SIGTERM to gracefully terminate it pgid = os.getpgid(p.pid) if p.poll() is None: os.killpg(pgid, signal.SIGTERM) # when a kill_timeout is set, and the process is still running after that period, # send SIGKILL to force its termination if kill_timeout is not None: target_time = perf_counter() + kill_timeout while target_time > perf_counter(): time.sleep(0.05) if p.poll() is not None: # the process terminated, exit the loop break else: # check the status again to avoid race conditions if p.poll() is None: os.killpg(pgid, signal.SIGKILL) # transparently reraise raise if six.PY3: if out is not None: out = out.decode("utf-8") if err is not None: err = err.decode("utf-8") return p.returncode, out, err
[docs]def readable_popen(*args, **kwargs): """ Creates a :py:class:`Popen` object and a generator function yielding the output line-by-line as it comes in. All *args* and *kwargs* are forwarded to the :py:class:`Popen` constructor. Example: .. code-block:: python # create the popen object and line generator p, lines = readable_popen(["some_executable", "--args"]) # loop through output lines as they come in for line in lines: print(line) if p.returncode != 0: raise Exception("complain ...") ``communicate()`` is called automatically after the output iteration terminates which sets the subprocess' *returncode* member. """ # force pipes kwargs["stdout"] = subprocess.PIPE kwargs["stderr"] = subprocess.STDOUT p = subprocess.Popen(*args, **kwargs) def line_gen(): if six.PY2: for line in iter(lambda: p.stdout.readline(), ""): yield line.rstrip() else: for line in p.stdout: yield line.decode("utf-8").rstrip() # communicate in the end p.communicate() return p, line_gen()
[docs]def create_hash(inp, l=10, algo="sha256", to_int=False): """ Takes an arbitrary input *inp* and creates a hexadecimal string hash based on an algorithm *algo*. For valid algorithms, see python's hashlib. *l* corresponds to the maximum length of the returned hash and is limited by the length of the hexadecimal representation produced by the hashing algorithm. When *to_int* is *True*, the decimal integer representation is returned. """ h = getattr(hashlib, algo)(six.b(str(inp))).hexdigest()[:l] return int(h, 16) if to_int else h
[docs]def create_random_string(prefix="", l=10): """ Creates and returns a random string consisting of *l* characters using a uuid4 hash. When *prefix* is given, the string will have the format ``<prefix>_<random_string>``. """ s = "" while len(s) < l: s += uuid.uuid4().hex s = s[:l] if prefix: s = "{}_{}".format(prefix, s) return s
[docs]def copy_no_perm(src, dst): """ Copies a file from *src* to *dst* including meta data except for permission bits. """ src, dst = str(src), str(dst) shutil.copyfile(src, dst) perm = os.stat(dst).st_mode shutil.copystat(src, dst) os.chmod(dst, perm)
[docs]def makedirs(path, perm=None): """ Recursively creates directories up to *path*. No exception is raised if *path* refers to an existing directory. If *perm* is set, the permissions of all newly created directories are set to this value. """ # nothing to do when the directory already exists path = str(path) if os.path.isdir(path): return # helper to silently create the directory, catching exceptions if it exists by now # (when dropping py2, just use the exist_ok flag of os.makedirs) def makedirs_safe(path, perm=None): try: if perm is None: os.makedirs(path) else: os.makedirs(path, perm) except Exception as e: if not is_file_exists_error(e): raise if perm is None: makedirs_safe(path) else: umask = os.umask(0) try: makedirs_safe(path, perm) finally: os.umask(umask)
[docs]def user_owns_file(path, uid=None): """ Returns whether a file located at *path* is owned by the user with *uid*. When *uid* is *None*, the user id of the current process is used. """ if uid is None: uid = os.getuid() path = os.path.expandvars(os.path.expanduser(str(path))) return os.stat(path).st_uid == uid
[docs]def iter_chunks(l, size): """ Returns a generator containing chunks of *size* of a list, integer or generator *l*. A *size* smaller than 1 results in no chunking at all. """ if isinstance(l, six.integer_types): l = six.moves.range(l) if is_lazy_iterable(l): if size < 1: yield list(l) else: chunk = [] for elem in l: if len(chunk) < size: chunk.append(elem) else: yield chunk chunk = [elem] else: if chunk: yield chunk else: if size < 1: yield l else: for i in six.moves.range(0, len(l), size): yield l[i:i + size]
byte_units = ["bytes", "kB", "MB", "GB", "TB", "PB", "EB"] byte_units_lower = [u.lower() for u in byte_units]
[docs]def human_bytes(n, unit=None, fmt=False): """ Takes a number of bytes *n*, assigns the best matching unit and returns the respective number and unit string in a tuple. When *unit* is set, that unit is used. When *fmt* is set, it is expected to be a string template with two elements that are filled via *str.format*. It can also be a boolean value in which case the template defaults to ``"{:.1f} {}"`` when *True*. Example: .. code-block:: python human_bytes(3407872) # -> (3.25, "MB") human_bytes(3407872, "kB") # -> (3328.0, "kB") human_bytes(3407872, fmt="{:.2f} -- {}") # -> "3.25 -- MB" human_bytes(3407872, fmt=True) # -> "3.25 MB" """ # check if the unit exists if unit and unit not in byte_units: raise ValueError("unknown unit '{}', valid values are {}".format(unit, byte_units)) if n == 0: idx = 0 elif unit: idx = byte_units.index(unit) else: idx = int(math.floor(math.log(abs(n), 1024))) idx = min(idx, len(byte_units)) # get the value and the unit name value = n / 1024.0 ** idx unit = byte_units[idx] # vast value to int when the unit is bytes if idx == 0: value = int(round(value)) if fmt: if not isinstance(fmt, six.string_types): fmt = "{} {}" if idx == 0 else "{:.1f} {}" return fmt.format(value, unit) return value, unit
[docs]def parse_bytes(s, input_unit="bytes", unit="bytes"): """ Takes a string *s*, interprets it as a size with an optional unit, and returns a float that represents that size in a given *unit*. When no unit is found in *s*, *input_unit* is used as a default. A *ValueError* is raised, when *s* cannot be successfully converted. Example: .. code-block:: python parse_bytes("100") # -> 100.0 parse_bytes("2048", unit="kB") # -> 2.0 parse_bytes("2048 kB", unit="kB") # -> 2048.0 parse_bytes("2048 kB", unit="MB") # -> 2.0 parse_bytes("2048", "kB", unit="MB") # -> 2.0 parse_bytes(2048, "kB", unit="MB") # note the float type of the first argument # -> 2.0 """ # check if the units exists if input_unit.lower() not in byte_units_lower: raise ValueError("unknown input_unit '{}', valid values are {}".format( input_unit, byte_units)) if unit.lower() not in byte_units_lower: raise ValueError("unknown unit '{}', valid values are {}".format( unit, byte_units)) # when s is a number, interpret it as bytes right away # otherwise parse it if isinstance(s, (float, six.integer_types)): input_value = float(s) else: m = re.match(r"^\s*(-?\d+\.?\d*)\s*(|{})\s*$".format("|".join(byte_units_lower)), s.lower()) if not m: raise ValueError("cannot parse bytes from string '{}'".format(s)) input_value, _input_unit = m.groups() input_value = float(input_value) if _input_unit: input_unit = _input_unit # convert the input value to bytes idx = byte_units_lower.index(input_unit.lower()) size_bytes = input_value * 1024.0 ** idx # use human_bytes to convert the size return human_bytes(size_bytes, unit)[0]
time_units = collections.OrderedDict([ ("week", 7 * 24 * 60 * 60), ("day", 24 * 60 * 60), ("hour", 60 * 60), ("minute", 60), ("second", 1), ]) time_unit_aliases = { "w": "week", "weeks": "week", "d": "day", "days": "day", "h": "hour", "hours": "hour", "m": "minute", "min": "minute", "mins": "minute", "minutes": "minute", "s": "second", "sec": "second", "secs": "second", "seconds": "second", }
[docs]def human_duration(colon_format=False, plural=True, **kwargs): """ human_duration Returns a human readable duration. The largest unit is days. When *colon_format* is *True*, the return value has the format ``"[d-][hh:]mm:ss[.ms]"``. *colon_format* can also be a string value referring to a limiting unit. In that case, the returned time string has no field above that unit, e.g. passing ``"m"`` results in a string ``"mm:ss[.ms]"`` where the minute field is potentially larger than 60. Passing ``"s"`` is a special case. Since the colon format always has a minute field (to mark it as colon format in the first place), the returned string will have the format ``"00:ss[.ms]"``. Unless *plural* is *False*, units corresponding to values other than **exactly** one are used in plural e.g. ``"1 second"`` but ``"1.5 seconds"``. All other *kwargs* are passed to ``datetime.timedelta`` to get the total duration in seconds. Example: .. code-block:: python human_duration(seconds=1233) # -> "20 minutes, 33 seconds" human_duration(seconds=90001) # -> "1 day, 1 hour, 1 second" human_duration(seconds=1233, colon_format=True) # -> "20:33" human_duration(seconds=-1233, colon_format=True) # -> "-20:33" human_duration(seconds=90001, colon_format=True) # -> "1-01:00:01" human_duration(seconds=90001, colon_format="h") # -> "25:00:01" human_duration(seconds=65, colon_format="s") # -> "00:65" human_duration(minutes=15, colon_format=True) # -> "15:00" human_duration(minutes=15) # -> "15 minutes" human_duration(minutes=15, plural=False) # -> "15 minute" human_duration(minutes=-15) # -> "minus 15 minutes" """ _time_units = ["day", "hour", "minute", "second"] seconds = float(datetime.timedelta(**kwargs).total_seconds()) sign = 1 if seconds >= 0 else -1 seconds = abs(seconds) # when using colon_format, check if a limiting unit is set colon_unit_limit = None if isinstance(colon_format, six.string_types): colon_unit_limit = time_unit_aliases.get(colon_format, colon_format) if colon_unit_limit not in _time_units: raise ValueError("unknown colon_format unit '{}', valid values are {}".format( colon_unit_limit, ",".join(_time_units))) colon_unit_index = _time_units.index(colon_unit_limit) # start building the human readable string # loop through units, remove the fully dividable part and let the next unit handle the rest human_str = "" for i, unit in enumerate(_time_units): # skip this iteration when a colon unit limit is set if colon_unit_limit and i < colon_unit_index: continue # build the value for this unit if unit == "second": # try to round to 2 digits or convert to int value = try_int(round(seconds, 2)) else: # get the integer divider and adjust the remaining number of seconds mul = time_units[unit] value = int(seconds // mul) seconds -= value * mul # keep zeros under certain conditions if value == 0: if colon_format: keep_zero = human_str or unit == "second" or colon_unit_limit else: keep_zero = not human_str and unit == "second" if not keep_zero: continue # build the human readable representation if colon_format: if unit == "second": # special case 1: force float formatting with optional leading 0 fmt = "0{}" if value < 10 else "{}" # special case 2: when "minutes" are no there yet, prepend "00:" if not human_str: fmt = "00:" + fmt elif unit in ["hour", "minute"]: fmt = "{:02d}:" else: # day fmt = "{}-" human_str += fmt.format(value) else: if human_str: human_str += ", " human_str += "{} {}{}".format(value, unit, "" if (value == 1 or not plural) else "s") # sign if sign == -1: human_str = ("-" if colon_format else "minus ") + human_str return human_str
[docs]def parse_duration(s, input_unit="s", unit="s"): """ Takes a string *s*, interprets it as a duration with an optional unit, and returns a float that represents that size in a given *unit*. When no unit is found in *s*, *input_unit* is used as a default. A *ValueError* is raised, when *s* cannot be successfully converted. Multiple input formats are parsed: Example: .. code-block:: python # plain number parse_duration(100) # -> 100.0 parse_duration(100, unit="min") # -> 1.667 parse_duration(100, input_unit="min") # -> 6000.0 parse_duration(-100, input_unit="min") # -> -6000.0 # strings in the format [d-][h:][m:]s[.ms] are interpreted with input_unit disregarded parse_duration("2:1") # -> 121.0 parse_duration("04:02:01.1") # -> 14521.1 parse_duration("04:02:01.1", unit="min") # -> 242.0183 parse_duration("0-4:2:1.1") # -> 14521.1 # human-readable string, optionally multiple of them separated by comma # missing units are interpreted as input_unit, unit works as above parse_duration("10 mins") # -> 600.0 parse_duration("10 mins", unit="min") # -> 10.0 parse_duration("10", unit="min") # -> 0.167 parse_duration("10", input_unit="min", unit="min") # -> 10.0 parse_duration("10 mins, 15 secs") # -> 615.0 parse_duration("10 mins and 15 secs") # -> 615.0 parse_duration("minus 10 mins and 15 secs") # -> -615.0 """ # consider unit aliases input_unit = time_unit_aliases.get(input_unit, input_unit) unit = time_unit_aliases.get(unit, unit) # check units if input_unit not in time_units: raise ValueError("unknown input_unit '{}', valid values are {}".format( input_unit, ",".join(time_units))) if unit not in time_units: raise ValueError("unknown unit '{}', valid values are {}".format( unit, ",".join(time_units))) sign = 1 duration_seconds = 0.0 # number or string? if isinstance(s, six.integer_types + (float,)) or is_float(s): duration_seconds += float(s) * time_units[input_unit] else: s = s.strip() # identify the format "[d-][h:][m:]s[.ms]" first m = re.match(r"^([+-])?((((((\d+)-)?(\d+)):)?(\d+)):)?(\d+)(\.(\d*))?$", s) if m: sgn, d, h, m, s, ms = [m.group(i) for i in [1, 7, 8, 9, 10, 11]] # interpret leading "-" or "+" as the sign of the duration if sgn == "-": sign = -1 # add to seconds if d: duration_seconds += float(d) * time_units["day"] if h: duration_seconds += float(h) * time_units["hour"] if m: duration_seconds += float(m) * time_units["minute"] duration_seconds += float(s) if ms: duration_seconds += float(ms) else: # human readable format # interpret leading "+", "-", "plus" and "minus" as the sign of the duration m = re.match(r"^(\+|\-|plus\s|minus\s)\s*(.*)$", s) if m: sign = 1 if m.group(1) in ("plus ", "+") else -1 s = m.group(2) # replace "and" with comma, replace multiple commas with one, then split s = re.sub(r"\,+", ",", s.replace("and", ",")) parts = s.split(",") units = list(time_units.keys()) + list(time_unit_aliases.keys()) cre = re.compile(r"^\s*(\d+|\d+\.|\.\d+|\d+\.\d+)\s*(|{})\s*$".format("|".join(units))) # convert each part for part in parts: part = part.strip() if not part: continue m = cre.match(part) if not m: raise ValueError("cannot parse duration string '{}'".format(s)) d, u = m.groups() d = float(d) if not u: u = input_unit u = time_unit_aliases.get(u, u) duration_seconds += d * time_units[u] # convert to output unit duration = sign * duration_seconds / time_units[unit] return duration
[docs]def is_file_exists_error(e): """ Returns whether the exception *e* was raised due to an already existing file or directory. """ if six.PY3: return isinstance(e, FileExistsError) # noqa: F821 else: return isinstance(e, OSError) and e.errno == 17
[docs]def send_mail(recipient, sender, subject="", content="", smtp_host="127.0.0.1", smtp_port=25): """ Lightweight mail functionality. Sends an mail from *sender* to *recipient* with *subject* and *content*. *smtp_host* and *smtp_port* are forwarded to the ``smtplib.SMTP`` constructor. *True* is returned on success, *False* otherwise. """ try: server = smtplib.SMTP(smtp_host, smtp_port) except Exception as e: logger.warning("cannot create SMTP server: {}".format(e)) return False header = "From: {}\r\nTo: {}\r\nSubject: {}\r\n\r\n".format(sender, recipient, subject) server.sendmail(sender, recipient, header + content) return True
[docs]class DotDict(collections.OrderedDict): """ Subclass of *OrderedDict* that provides read access for items via attributes by implementing ``__getattr__``. In case a item is accessed via attribute and it does not exist, an *AttriuteError* is raised rather than a *KeyError*. Example: .. code-block:: python d = DotDict() d["foo"] = 1 print(d["foo"]) # => 1 print(d.foo) # => 1 print(d["bar"]) # => KeyError print(d.bar) # => AttributeError """ # forward certain attributes to the super class in python 2 FORWARD_SUPER = ("_OrderedDict__root", "_OrderedDict__map") def __getattr__(self, attr): if six.PY2 and attr in self.FORWARD_SUPER: return super(DotDict, self).__getattr__(attr) try: return self[attr] except KeyError: raise AttributeError("'{}' object has no attribute '{}'".format( self.__class__.__name__, attr)) def __setattr__(self, attr, value): if six.PY2 and attr in self.FORWARD_SUPER: return super(DotDict, self).__setattr__(attr, value) self[attr] = value def copy(self): """""" return self.__class__(self)
[docs] @classmethod def wrap(cls, *args, **kwargs): """ Takes a dictionary *d* and recursively replaces it and all other nested dictionary types with :py:class:`DotDict`'s for deep attribute-style access. """ wrap = lambda d: cls((k, wrap(v)) for k, v in d.items()) if isinstance(d, dict) else d return wrap(collections.OrderedDict(*args, **kwargs))
[docs]class ShorthandDict(collections.OrderedDict): """ Subclass of *OrderedDict* that implements ``__getattr__`` and ``__setattr__`` for a configurable list of attributes. Example: .. code-block:: python MyDict(ShorthandDict): attributes = {"foo": 1, "bar": 2} d = MyDict(foo=9) print(d.foo) # => 9 print(d.bar) # => 2 d.foo = 3 print(d.foo) # => 3 .. py:classattribute: attributes type: dict Mapping of attribute names to default values. ``__getattr__`` and ``__setattr__`` support is provided for these attributes. """ attributes = {} def __init__(self, **kwargs): super(ShorthandDict, self).__init__() for attr, default in six.iteritems(self.attributes): self[attr] = kwargs.pop(attr, copy.deepcopy(default)) self.update(kwargs) def copy(self): """""" kwargs = {key: copy.deepcopy(value) for key, value in six.iteritems(self)} return self.__class__(**kwargs) def __getattr__(self, attr): if attr in self.attributes: return self[attr] else: return super(ShorthandDict, self).__getattr__(attr) def __setattr__(self, attr, value): if attr in self.attributes: self[attr] = value else: super(ShorthandDict, self).__setattr__(attr, value)
class InsertableDict(collections.OrderedDict): """ Subclass of *OrderedDict* that supports inserting elements before or after certain keys. Example: .. code-block:: python d = InsertableDict(foo=123, bar=456) d.insert_before("bar", "test", 999) print(d) # -> InsertableDict([('foo', 123), ('test', 999), ('bar', 456)]) d.insert_after("test", "foo", "new_value") print(d) # -> InsertableDict([('test', 999), ('foo', 'new_value'), ('bar', 456)]) """ def _insert(self, search_key, key, value, offset): # when key is a list or dict and value is None, assume key refers to key-value pairs if isinstance(key, (list, dict)) and value is None: new_items = key.items() if isinstance(key, dict) else key new_keys = [k for k, v in new_items] else: new_items = [(key, value)] new_keys = [key] # if the search key is not present, insert the new pairs and finish if search_key not in self: self.update(new_items) return # create a copy if the index items = list(self.items()) # find the position where to insert pos = items.index((search_key, self[search_key])) + offset # construct the new items without duplicates items = [ (k, v) for k, v in items[:pos] if k not in new_keys ] + new_items + [ (k, v) for k, v in items[pos:] if k not in new_keys ] # rebuild the index self.clear() self.update(items) def insert_before(self, before_key, key, value=None): """ Inserts a *key* - *value* pair before the key *before_key*. When this key does not exist, the new pair is added to the end. When *key* is list or dictionary and value is *None*, multiple new values are inserted. """ self._insert(before_key, key, value, 0) def insert_after(self, after_key, key, value=None): """ Inserts a *key* - *value* pair after the key *after_key*. When this key does not exist, the new pair is added to the end. When *key* is list or dictionary and value is *None*, multiple new values are inserted. """ self._insert(after_key, key, value, 1)
[docs]def open_compat(path, *args, **kwargs): """ Polyfill for python's ``open`` factory, returning the plain ``open`` in python 3, and ``io.open`` in python 2 with a patched ``write`` method that internally handles unicode conversion of its first argument. All *args* and *kwargs* are forwarded. """ path = str(path) if six.PY3: return open(path, *args, **kwargs) f = io.open(path, *args, **kwargs) if f.encoding and f.encoding.lower().replace("-", "") == "utf8": write_orig = f.write def write(data, *args, **kwargs): u = unicode # noqa: F821 if not isinstance(data, u): data = u(data) return write_orig(data, *args, **kwargs) f.write = write return f
[docs]@contextlib.contextmanager def patch_object(obj, attr, value, reset=True, orig=no_value, lock=False): """ Context manager that temporarily patches an object *obj* by replacing its attribute *attr* with *value*. The original value is set again when the context is closed unless *reset* is *False*. The original value is obtained through ``getattr`` or taken from *orig* if set. When *lock* is *True*, the py:attr:`default_lock` object is used to ensure the patch is thread-safe. When *lock* is a lock instance, this object is used instead. """ if orig is no_value: # get the original value orig = getattr(obj, attr, no_value) # handle thread locks if lock: if isinstance(lock, bool): lock = default_lock else: lock = empty_context() with lock: try: setattr(obj, attr, value) yield obj finally: try: if reset: if orig is no_value: delattr(obj, attr) else: setattr(obj, attr, orig) except: pass
[docs]def join_generators(*generators, **kwargs): """ join_generators(*generators, on_error=None) Joins multiple *generators* and returns a single generator for simplified iteration. Yielded objects are transparently sent back to ``yield`` assignments of the same generator. When *on_error* is callable, it is invoked in case an exception is raised while iterating, including *KeyboardInterrupt*'s. If its return value evaluates to *True*, the state is reset and iterations continue. Otherwise, the exception is raised. """ on_error = kwargs.get("on_error") for gen in generators: last_result = no_value while True: try: if last_result == no_value: last_result = yield six.next(gen) else: last_result = yield gen.send(last_result) except StopIteration: break except (Exception, KeyboardInterrupt) as error: if callable(on_error) and on_error(error): last_result = no_value else: raise
[docs]def quote_cmd(cmd): """ Takes a shell command *cmd* given as a list and returns a single string representation of that command with proper quoting. To denote nested commands (such as shown below), *cmd* can also contain nested lists. Example: .. code-block:: python print(quote_cmd(["bash", "-c", "echo", "foobar"])) # -> "bash -c echo foobar" print(quote_cmd(["bash", "-c", ["echo", "foobar"]])) # -> "bash -c 'echo foobar'" """ # expand lists recursively cmd = [ (quote_cmd(part) if isinstance(part, (list, tuple)) else str(part)) for part in cmd ] # quote all parts and join return " ".join(six.moves.shlex_quote(part) for part in cmd)
[docs]def escape_markdown(s): """ Escapes all characters in a string *s* that coupld be confused for markdown formatting strings and returns it. """ return re.sub(r"([^\\]?)(\(|\)|=|\.|_|-)", r"\1\\\2", s)
class ClassPropertyDescriptor(object): """ Generic descriptor class that is used by :py:func:`classproperty`. Setters are currently not supported. """ def __init__(self, fget, fset=None): self.fget = fget self.fset = fset def __get__(self, obj, cls=None): if cls is None: cls = type(obj) return self.fget.__get__(obj, cls)() def __set__(self, obj, value): if not self.fset: raise AttributeError("can't set attribute") type_ = type(obj) return self.fset.__get__(obj, type_)(value)
[docs]def classproperty(func): """ Propety decorator for class-level methods. """ if not isinstance(func, (classmethod, staticmethod)): func = classmethod(func) return ClassPropertyDescriptor(func)
class BaseStream(object): FLUSH_AFTER_WRITE = True def __init__(self, flush_after_write=None): super(BaseStream, self).__init__() self.closed = False self.flush_after_write = flush_after_write @property def _flush_after_write(self): return self.FLUSH_AFTER_WRITE if self.flush_after_write is None else self.flush_after_write def __del__(self): self.close() def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() def close(self): if not self.closed: self.flush() self._close() self.closed = True def flush(self): if not self.closed: self._flush() def write(self, *args, **kwargs): if not self.closed: self._write(*args, **kwargs) if self._flush_after_write: self.flush() def _close(self): return def _flush(self): return def _write(self, *args, **kwargs): return
[docs]class TeeStream(BaseStream): """ __init__(*consumers, mode="w", **kwargs) Multi-stream object that forwards calls to :py:meth:`write` and :py:meth:`flush` to all registered *consumer* streams. When a *consumer* is a string, it is interpreted as a file which is opened for writing (similar to *tee* in bash). All *kwargs* are forwarded to the :py:class:`BaseStream` constructor. Example: .. code-block:: python tee = TeeStream("/path/to/log.txt", sys.__stdout__) sys.stdout = tee """ def __init__(self, *consumers, **kwargs): mode = kwargs.pop("mode", "w") super(TeeStream, self).__init__(**kwargs) self.consumers = [] self.open_files = [] for consumer in consumers: # interpret strings as file paths if isinstance(consumer, six.string_types): consumer = open_compat(consumer, mode) self.open_files.append(consumer) self.consumers.append(consumer) def _close(self): """ Closes opened files. """ for f in self.open_files: f.close() def _flush(self): """ Flushes all registered consumer streams. """ for consumer in self.consumers: if not getattr(consumer, "closed", False): consumer.flush() def _write(self, *args, **kwargs): """ Writes to all registered consumer streams, passing *args* and *kwargs*. """ for consumer in self.consumers: consumer.write(*args, **kwargs)
[docs]class FilteredStream(BaseStream): """ Stream object that accepts in input *stream* and a function *filter_fn* which is called upon every call to :py:meth:`write`. The payload is written when the returned value evaluates to *True*. All *kwargs* are forwarded to the :py:class:`BaseStream` constructor. """ def __init__(self, stream, filter_fn, **kwargs): super(FilteredStream, self).__init__(**kwargs) self.stream = stream self.filter_fn = filter_fn def _close(self): """ Closes the consumer stream. """ self.stream.close() def _flush(self): """ Flushes the consumer stream. """ if not getattr(self.stream, "closed", False): self.stream.flush() def _write(self, *args, **kwargs): """ Writes to the consumer stream when *filter_fn* evaluates to *True*, passing *args* and *kwargs*. """ if self.filter_fn(*args, **kwargs): self.stream.write(*args, **kwargs)