Source code for law.sandbox.bash
# coding: utf-8
"""
Bash sandbox implementation.
"""
__all__ = ["BashSandbox"]
import os
import collections
import six
from law.config import Config
from law.sandbox.base import Sandbox
from law.util import tmp_file, interruptable_popen, quote_cmd, flatten, makedirs
[docs]class BashSandbox(Sandbox):
sandbox_type = "bash"
config_section_prefix = sandbox_type
@property
def script(self):
return os.path.expandvars(os.path.expanduser(self.name))
@property
def env_cache_key(self):
return self.script
def get_custom_config_section_postfix(self):
return self.name
def create_env(self):
# strategy: create a tempfile, let python dump its full env in a subprocess and load the
# env file again afterwards
# helper to write the env
def write_env(path):
# get the bash command
bash_cmd = self._bash_cmd()
# pre-setup commands
pre_setup_cmds = self._build_pre_setup_cmds()
# post-setup commands
post_env = self._get_env()
post_setup_cmds = self._build_post_setup_cmds(post_env)
# build the python command that dumps the environment
py_cmd = "import os,pickle;" \
+ "pickle.dump(dict(os.environ),open('{}','wb'),protocol=2)".format(path)
# build the full command
cmd = quote_cmd(bash_cmd + ["-c", " && ".join(flatten(
pre_setup_cmds,
"source \"{}\" \"\"".format(self.script), # noqa: Q003
post_setup_cmds,
quote_cmd(["python", "-c", py_cmd]),
))])
# run it
returncode = interruptable_popen(cmd, shell=True, executable="/bin/bash")[0]
if returncode != 0:
raise Exception("{} env loading failed with exit code {}".format(
self, returncode))
# helper to load the env
def load_env(path):
pickle_kwargs = {"encoding": "utf-8"} if six.PY3 else {}
with open(path, "rb") as f:
try:
return collections.OrderedDict(six.moves.cPickle.load(f, **pickle_kwargs))
except Exception as e:
raise Exception(
"env deserialization of sandbox {} failed: {}".format(self, e),
)
# use the cache path if set
if self.env_cache_path:
env_cache_path = str(self.env_cache_path)
# write it if it does not exist yet
if not os.path.exists(env_cache_path):
makedirs(os.path.dirname(env_cache_path))
write_env(env_cache_path)
# load it
env = load_env(env_cache_path)
else:
# use a temp file
with tmp_file() as tmp:
tmp_path = os.path.realpath(tmp[1])
# write and load it
write_env(tmp_path)
env = load_env(tmp_path)
return env
def _bash_cmd(self):
cmd = ["bash"]
# login flag
cfg = Config.instance()
cfg_section = self.get_config_section()
if cfg.get_expanded_bool(cfg_section, "login"):
cmd.extend(["-l"])
return cmd
def cmd(self, proxy_cmd):
# get the bash command
bash_cmd = self._bash_cmd()
# pre-setup commands
pre_setup_cmds = self._build_pre_setup_cmds()
# post-setup commands
post_env = self._get_env()
# add staging directories
if self.stagein_info:
post_env["LAW_SANDBOX_STAGEIN_DIR"] = self.stagein_info.stage_dir.path
if self.stageout_info:
post_env["LAW_SANDBOX_STAGEOUT_DIR"] = self.stageout_info.stage_dir.path
post_setup_cmds = self._build_post_setup_cmds(post_env)
# handle local scheduling within the container
if self.force_local_scheduler():
proxy_cmd.add_arg("--local-scheduler", "True", overwrite=True)
# build the final command
cmd = quote_cmd(bash_cmd + ["-c", " && ".join(flatten(
pre_setup_cmds,
"source \"{}\" \"\"".format(self.script), # noqa: Q003
post_setup_cmds,
proxy_cmd.build(),
))])
return cmd