Source code for law.contrib.docker.sandbox

# coding: utf-8

"""
Docker sandbox implementation.
"""

from __future__ import annotations

__all__ = ["DockerSandbox"]

import os
import sys
import uuid
import socket
import subprocess

import luigi  # type: ignore[import-untyped]

from law.config import Config
from law.task.proxy import ProxyCommand
from law.sandbox.base import Sandbox
from law.target.local import LocalFileTarget
from law.cli.software import get_software_deps
from law.util import make_list, interruptable_popen, quote_cmd, flatten, makedirs
from law._types import Any


[docs] class DockerSandbox(Sandbox): sandbox_type: str = "docker" # type: ignore[assignment] config_section_prefix = sandbox_type @property def image(self) -> str: return self.name @property def tag(self) -> str | None: return None if ":" not in self.image else self.image.split(":", 1)[1] @property def env_cache_key(self) -> str: return self.image def get_custom_config_section_postfix(self) -> str: return self.image def create_env(self) -> dict[str, Any]: # strategy: create a tempfile, forward it to a container, let python dump its full env, # close the container and load the env file # helper to load the env def load_env(target: LocalFileTarget) -> dict[str, Any]: try: return tmp.load(formatter="pickle") except Exception as e: raise Exception(f"{self} env deserialization failed: {e}") # load the env when the cache file is configured and existing if self.env_cache_path: env_cache_target = LocalFileTarget(self.env_cache_path) if env_cache_target.exists(): return load_env(env_cache_target) tmp = LocalFileTarget(is_tmp=".env") tmp.touch() env_file = os.path.join("/tmp", tmp.unique_basename) # get the docker run command docker_run_cmd = self._docker_run_cmd() # mount the env file docker_run_cmd.extend(["-v", f"{tmp.path}:{env_file}"]) # pre-setup commands pre_setup_cmds = self._build_pre_setup_cmds() # post-setup commands post_env = self._get_env() post_setup_cmds = self._build_post_setup_cmds(post_env) # build the python command that dumps the environment py_cmd = ( "import os,pickle;" f"pickle.dump(dict(os.environ),open('{env_file}','wb'),protocol=2)" ) # $(whereis -b python | cut -d " " -f 2) searches for the python binary in the container py_executable = f"$( whereis -b python | cut -d \" \" -f 2 ) -c \"{py_cmd}\"" # build the full command cmd = quote_cmd(docker_run_cmd + [ self.image, "bash", "-l", "-c", " && ".join(flatten( pre_setup_cmds, post_setup_cmds, py_executable, )), ]) # run it code, out, _ = interruptable_popen( cmd, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) if code != 0: raise Exception(f"docker sandbox env loading failed with exit code {code}:\n{out}") # copy to the cache path when configured if self.env_cache_path: tmp.copy_to_local(env_cache_target) # load the env env = load_env(tmp) return env def _docker_run_cmd(self) -> list[str]: """ Part of the "docker run" command that is common to env requests and run. """ cmd = ["docker", "run"] # rm flag cmd.extend(["--rm"]) # use the pid namespace of the host so killing the outer process will stop the container cmd.extend(["--pid", "host"]) # task-specific arguments if self.task: # user flag sandbox_user = self.task.sandbox_user() if sandbox_user: if not isinstance(sandbox_user, (tuple, list)) or len(sandbox_user) != 2: raise Exception("sandbox_user() must return 2-tuple") cmd.extend(["-u", ":".join(map(str, sandbox_user))]) # add args configured on the task args_getter = getattr(self.task, "docker_args", None) if callable(args_getter): cmd.extend(make_list(args_getter())) return cmd def cmd(self, proxy_cmd: ProxyCommand) -> str: # docker run command arguments args = [] # container name args.extend(["--name", f"{getattr(self.task, 'task_id', None)}_{str(uuid.uuid4())[:8]}"]) # container hostname args.extend(["-h", socket.gethostname()]) # helper to build forwarded paths cfg = Config.instance() cfg_section = self.get_config_section() forward_dir = cfg.get_expanded(cfg_section, "forward_dir") python_dir = cfg.get_expanded(cfg_section, "python_dir") bin_dir = cfg.get_expanded(cfg_section, "bin_dir") stagein_dir_name = cfg.get_expanded(cfg_section, "stagein_dir_name") stageout_dir_name = cfg.get_expanded(cfg_section, "stageout_dir_name") def dst(*args) -> str: return os.path.join(forward_dir, *(str(arg) for arg in args)) # helper for mounting a volume volume_srcs = [] def mount(*vol) -> None: src = vol[0] # make sure, the same source directory is not mounted twice if src in volume_srcs: return volume_srcs.append(src) # ensure that source directories exist if not os.path.isfile(src): makedirs(src) # store the mount point args.extend(["-v", ":".join(vol)]) # environment variables to set env = self._get_env() # add staging directories if self.stagein_info: env["LAW_SANDBOX_STAGEIN_DIR"] = dst(stagein_dir_name) mount(self.stagein_info.stage_dir.path, dst(stagein_dir_name)) if self.stageout_info: env["LAW_SANDBOX_STAGEOUT_DIR"] = dst(stageout_dir_name) mount(self.stageout_info.stage_dir.path, dst(stageout_dir_name)) # prevent python from writing byte code files env["PYTHONDONTWRITEBYTECODE"] = "1" # adjust path variables env["PATH"] = os.pathsep.join([dst("bin"), "$PATH"]) env["PYTHONPATH"] = os.pathsep.join([dst(python_dir), "$PYTHONPATH"]) # forward python directories of law and dependencies for mod in get_software_deps(): mod_file: str = mod.__file__ # type: ignore[var-type, assignment] path = os.path.dirname(mod_file) name, ext = os.path.splitext(os.path.basename(mod_file)) if name == "__init__": vsrc = path vdst = dst(python_dir, os.path.basename(path)) else: vsrc = os.path.join(path, f"{name}.py") vdst = dst(python_dir, f"{name}.py") mount(vsrc, vdst, "ro") # forward the law cli dir to bin as it contains a law executable env["PATH"] = os.pathsep.join([dst(python_dir, "law", "cli"), env["PATH"]]) # forward the law config file if cfg.config_file: mount(cfg.config_file, dst("law.cfg")) env["LAW_CONFIG_FILE"] = dst("law.cfg") # forward the luigi config file for p in luigi.configuration.LuigiConfigParser._config_paths[::-1]: if os.path.exists(p): mount(p, dst("luigi.cfg")) env["LUIGI_CONFIG_PATH"] = dst("luigi.cfg") break # forward volumes defined in the config and by the task vols = self._get_volumes() for hdir, cdir in vols.items(): if not cdir: mount(hdir, hdir) else: cdir = self._expand_volume(cdir, bin_dir=dst(bin_dir), python_dir=dst(python_dir)) mount(hdir, cdir) # handle local scheduling within the container if self.force_local_scheduler(): proxy_cmd.add_arg("--local-scheduler", "True", overwrite=True) elif self.scheduler_on_host(): # when the scheduler runs on the host system, we need to set the network interface to # the host system and set the correct host address as seen by the container args.extend(["--network", "host"]) proxy_cmd.add_arg("--scheduler-host", self.get_host_ip(), overwrite=True) # get the docker run command, add arguments from above docker_run_cmd = self._docker_run_cmd() + args # pre-setup commands pre_setup_cmds = self._build_pre_setup_cmds() # post-setup commands with the full env post_setup_cmds = self._build_post_setup_cmds(env) # build the final command cmd = quote_cmd(docker_run_cmd + [ self.image, "bash", "-l", "-c", " && ".join(flatten( pre_setup_cmds, post_setup_cmds, proxy_cmd.build(), )), ]) return cmd def get_host_ip(self) -> str: # in host network mode, docker containers can normally be accessed via 127.0.0.1 on Linux # or via docker.for.mac.localhost on Mac (as of docker 17.06), however, in some cases it # might be required to use a different ip which can be set via an env variable default_ip = "docker.for.mac.localhost" if sys.platform == "darwin" else "127.0.0.1" return os.getenv("LAW_DOCKER_HOST_IP", default_ip)