Source code for law.contrib.mattermost.parameter

# coding: utf-8

"""
Mattermost related parameters.
"""

from __future__ import annotations

__all__ = ["NotifyMattermostParameter"]

from collections import OrderedDict

from law.config import Config
from law.parameter import NotifyParameter
from law.contrib.mattermost.notification import notify_mattermost
from law._types import Any


[docs] class NotifyMattermostParameter(NotifyParameter): def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.description: str if not self.description: self.description = ( "when true, and the task's run method is decorated with law.decorator.notify, " "a Mattermost notification is sent once the task finishes" )
[docs] def get_transport(self) -> dict[str, Any]: return { "func": self.notify, "raw": True, "colored": False, }
@classmethod def notify(cls, success: bool, title: str, content: dict[str, Any], **kwargs) -> bool: content = OrderedDict(content) # overwrite title cfg = Config.instance() header = cfg.get_expanded("notifications", "mattermost_header", default=None) task_block = f"```\n{content['Task']}\n```" title = f"{header}\n{task_block}" if header else task_block del content["Task"] # markup for traceback if "Traceback" in content: content["Traceback"] = f"\n```\n{content['Traceback']}\n```" # prepend the status text to the message content cfg = Config.instance() status_text = "success" if success else "failure" status_emoji = cfg.get_expanded( "notifications", f"mattermost_{status_text}_emoji", default=None, ) if status_emoji: status_text += " " + status_emoji content["Status"] = status_text content.move_to_end("Status", last=False) # highlight last message if "Last message" in content: content["Last message"] = f"`{content['Last Message']}`" # highlight keys content = content.__class__((f"**{k}**", v) for k, v in content.items()) # send the notification return notify_mattermost(title, content, **kwargs)