# coding: utf-8
"""
Target classes that represent remote files and directories and have a local, optionally read-only
mirror (e.g. through a local mount of the remote file system).
"""
__all__ = ["MirroredTarget", "MirroredFileTarget", "MirroredDirectoryTarget"]
import os
import contextlib
import time
import threading
from law.config import Config
from law.target.file import FileSystemTarget, FileSystemFileTarget, FileSystemDirectoryTarget
from law.target.local import LocalFileTarget, LocalDirectoryTarget
from law.target.remote import RemoteFileTarget, RemoteDirectoryTarget
from law.util import patch_object
local_root_check_lock = threading.Lock()
[docs]class MirroredTarget(FileSystemTarget):
_existing_local_roots = {}
local_sync_default = True
@classmethod
def check_local_root(cls, path, depth=1):
path = str(path)
# path must start with a separator
if path == os.sep:
return True
if not path or not path.startswith(os.sep):
return False
# get the root path or mount point of the path (e.g. "/mnt")
root_path = os.sep.join(path.split(os.sep)[:depth + 1])
if root_path not in cls._existing_local_roots:
with local_root_check_lock:
cls._existing_local_roots[root_path] = os.path.exists(root_path)
return cls._existing_local_roots[root_path]
def __init__(
self,
path,
_is_file,
remote_target=None,
remote_target_cls=None,
remote_fs=None,
remote_kwargs=None,
local_target=None,
local_fs=None,
local_kwargs=None,
local_read_only=True,
local_sync=None,
**kwargs,
):
path = path.lstrip(os.sep)
# create a remote target
if not remote_target:
if not remote_fs:
raise ValueError("either remote_target or remote_fs must be given")
if not remote_target_cls:
raise ValueError("either remote_target or remote_target_cls must be given")
if _is_file and not issubclass(remote_target_cls, RemoteFileTarget):
raise TypeError(
"remote_target_cls must be a subclass of RemoteFileTarget: {}".format(
remote_target_cls,
),
)
if not _is_file and not issubclass(remote_target_cls, RemoteDirectoryTarget):
raise TypeError(
"remote_target_cls must be a subclass of RemoteDirectoryTarget: {}".format(
remote_target_cls,
),
)
remote_kwargs = remote_kwargs.copy() if remote_kwargs else {}
remote_kwargs["fs"] = remote_fs
remote_target = remote_target_cls(os.sep + path, **remote_kwargs)
else:
if _is_file and not isinstance(remote_target, RemoteFileTarget):
raise TypeError(
"remote_target must be an instance of RemoteFileTarget: {}".format(
remote_target,
),
)
if not _is_file and not isinstance(remote_target, RemoteDirectoryTarget):
raise TypeError(
"remote_target must be an instance of RemoteDirectoryTarget: {}".format(
remote_target,
),
)
# create a local target
if not local_target:
local_kwargs = local_kwargs.copy() if local_kwargs else {}
if not local_fs:
local_fs = Config.instance().get_expanded("target", "default_local_fs")
local_kwargs["fs"] = local_fs
local_target_cls = LocalFileTarget if _is_file else LocalDirectoryTarget
local_target = local_target_cls(path, **local_kwargs)
else:
if _is_file and not isinstance(local_target, LocalFileTarget):
raise TypeError(
"local_target must be an instance of LocalFileTarget: {}".format(
local_target,
),
)
if not _is_file and not isinstance(local_target, LocalDirectoryTarget):
raise TypeError(
"local_target must be an instance of LocalDirectoryTarget: {}".format(
local_target,
),
)
# store targets
self.remote_target = remote_target
self.local_target = local_target
# additional attributes
self.local_read_only = local_read_only
self.local_sync = local_sync if local_sync is not None else self.local_sync_default
# temporary, forced file system
self._force_fs = None
super(MirroredTarget, self).__init__(path, **kwargs)
def _copy_kwargs(self):
kwargs = super(MirroredTarget, self)._copy_kwargs()
kwargs["local_read_only"] = self.local_read_only
kwargs["local_sync"] = self.local_sync
return kwargs
@property
def _local_root_depth(self):
return self.local_target.fs.local_root_depth
def _local_root_exists(self):
return self.check_local_root(self.local_target.abspath, depth=self._local_root_depth)
def _local_target_exists(self, *args, **kwargs):
return self._local_root_exists() and self.local_target.exists(*args, **kwargs)
def _parent_args(self):
parent_kwargs = {
"remote_target": self.remote_target.parent,
"local_target": self.local_target.parent,
"local_read_only": self.local_read_only,
"local_sync": self.local_sync,
}
return (), parent_kwargs
def _wait_for_local(self, missing=False, timeout=0.5, attempts=90, local_sync=None):
# no need to wait if local sync is disabled
if not (self.local_sync if local_sync is None else local_sync):
return
# no need to wait if local root does not require checking
if not self.check_local_root(self.local_target.abspath, depth=self._local_root_depth):
return
sleep_counter = 0
while self.local_target.exists() == missing:
time.sleep(timeout)
sleep_counter += 1
if sleep_counter >= attempts:
state = "disappear" if missing else "exist"
raise Exception(
"timeout while waiting for local representation {!r} to {}".format(
self.local_target, state,
),
)
@property
def fs(self):
if self._force_fs is not None:
return self._force_fs
return (
self.local_target.fs
if self._local_target_exists()
else self.remote_target.fs
)
@contextlib.contextmanager
def force_fs(self, fs):
with patch_object(self, "_force_fs", fs):
yield
@property
def abspath(self):
return (
self.local_target.abspath
if self._local_target_exists()
else self.remote_target.abspath
)
@property
def dirname(self):
return (
self.local_target.dirname
if self._local_target_exists()
else self.remote_target.dirname
)
@property
def absdirname(self):
return (
self.local_target.absdirname
if self._local_target_exists()
else self.remote_target.absdirname
)
@property
def basename(self):
return self.local_target.basename
def stat(self, *args, **kwargs):
return (
self.local_target.stat(*args, **kwargs)
if self._local_target_exists()
else self.remote_target.stat(*args, **kwargs)
)
[docs] def exists(self, *args, **kwargs):
return (
self.local_target.exists(*args, **kwargs) or
self.remote_target.exists(*args, **kwargs)
)
[docs] def remove(self, *args, local_sync=None, **kwargs):
if not self.local_read_only:
return self.local_target.remove(*args, **kwargs)
ret = self.remote_target.remove(*args, **kwargs)
self._wait_for_local(missing=True, local_sync=local_sync)
return ret
def chmod(self, *args, **kwargs):
return self.remote_target.chmod(*args, **kwargs)
def uri(self, *args, **kwargs):
return (
self.local_target.uri(*args, **kwargs)
if self._local_target_exists()
else self.remote_target.uri(*args, **kwargs)
)
def copy_to(self, *args, **kwargs):
return (
self.local_target.copy_to(*args, **kwargs)
if self._local_target_exists()
else self.remote_target.copy_to(*args, **kwargs)
)
def copy_from(self, *args, local_sync=None, **kwargs):
ret = self.remote_target.copy_from(*args, **kwargs)
self._wait_for_local(missing=False, local_sync=local_sync)
return ret
def move_to(self, *args, local_sync=None, **kwargs):
if not self.local_read_only:
return self.local_target.move_to(*args, **kwargs)
ret = self.remote_target.move_to(*args, **kwargs)
self._wait_for_local(missing=True, local_sync=local_sync)
return ret
def move_from(self, *args, local_sync=None, **kwargs):
if not self.local_read_only:
return self.local_target.move_from(*args, **kwargs)
ret = self.remote_target.move_from(*args, **kwargs)
self._wait_for_local(missing=False, local_sync=local_sync)
return ret
def copy_to_local(self, *args, **kwargs):
return (
self.local_target.copy_to_local(*args, **kwargs)
if self._local_target_exists()
else self.remote_target.copy_to_local(*args, **kwargs)
)
def copy_from_local(self, *args, local_sync=None, **kwargs):
ret = self.remote_target.copy_from_local(*args, **kwargs)
self._wait_for_local(missing=False, local_sync=local_sync)
return ret
def move_to_local(self, *args, local_sync=None, **kwargs):
if not self.local_read_only:
return self.local_target.move_to_local(*args, **kwargs)
ret = self.remote_target.move_to_local(*args, **kwargs)
self._wait_for_local(missing=True, local_sync=local_sync)
return ret
def move_from_local(self, *args, local_sync=None, **kwargs):
if not self.local_read_only:
return self.local_target.move_from_local(*args, **kwargs)
ret = self.remote_target.move_from_local(*args, **kwargs)
self._wait_for_local(missing=False, local_sync=local_sync)
return ret
@contextlib.contextmanager
def localize(self, mode="r", local_sync=None, **kwargs):
with (
self.local_target.localize(mode, **kwargs)
if (mode == "r" or not self.local_read_only) and self._local_target_exists()
else self.remote_target.localize(mode, **kwargs)
) as ret:
yield ret
if mode == "w" and self.local_read_only:
self._wait_for_local(missing=False, local_sync=local_sync)
def touch(self, *args, local_sync=None, **kwargs):
if not self.local_read_only:
return self.local_target.touch(*args, **kwargs)
ret = self.remote_target.touch(*args, **kwargs)
self._wait_for_local(missing=False, local_sync=local_sync)
return ret
def load(self, *args, **kwargs):
return (
self.local_target.load(*args, **kwargs)
if self._local_target_exists()
else self.remote_target.load(*args, **kwargs)
)
def dump(self, *args, local_sync=None, **kwargs):
if not self.local_read_only:
return self.local_target.dump(*args, **kwargs)
ret = self.remote_target.dump(*args, **kwargs)
self._wait_for_local(missing=False, local_sync=local_sync)
return ret
[docs]class MirroredFileTarget(FileSystemFileTarget, MirroredTarget):
def __init__(self, path, **kwargs):
super(MirroredFileTarget, self).__init__(path, _is_file=True, **kwargs)
[docs] @contextlib.contextmanager
def open(self, mode, local_sync=None, **kwargs):
with (
self.local_target.open(mode, **kwargs)
if (mode == "r" or not self.local_read_only) and self._local_target_exists()
else self.remote_target.open(mode, **kwargs)
) as ret:
yield ret
if mode == "w" and self.local_read_only:
self._wait_for_local(missing=False, local_sync=local_sync)
[docs]class MirroredDirectoryTarget(FileSystemDirectoryTarget, MirroredTarget):
def __init__(self, path, **kwargs):
super(MirroredDirectoryTarget, self).__init__(path, _is_file=False, **kwargs)
def _child_args(self, path, type):
child_kwargs = {
"remote_target": self.remote_target.child(path, type=type),
"local_target": self.local_target.child(path, type=type),
"local_read_only": self.local_read_only,
"local_sync": self.local_sync,
}
return (), child_kwargs
def listdir(self, *args, **kwargs):
return (
self.local_target.listdir(*args, **kwargs)
if self._local_target_exists()
else self.remote_target.listdir(*args, **kwargs)
)
def glob(self, *args, **kwargs):
return (
self.local_target.glob(*args, **kwargs)
if self._local_target_exists()
else self.remote_target.glob(*args, **kwargs)
)
def walk(self, *args, **kwargs):
return (
self.local_target.walk(*args, **kwargs)
if self._local_target_exists()
else self.remote_target.walk(*args, **kwargs)
)
MirroredTarget.file_class = MirroredFileTarget
MirroredTarget.directory_class = MirroredDirectoryTarget