Source code for law.target.mirrored

# coding: utf-8

"""
Target classes that represent remote files and directories and have a local, optionally read-only
mirror (e.g. through a local mount of the remote file system).
"""

from __future__ import annotations

__all__ = ["MirroredTarget", "MirroredFileTarget", "MirroredDirectoryTarget"]

import os
import contextlib
import time
import pathlib
import threading

from law.config import Config
from law.target.file import FileSystemTarget, FileSystemFileTarget, FileSystemDirectoryTarget
from law.target.local import LocalFileSystem, LocalTarget, LocalFileTarget, LocalDirectoryTarget
from law.target.remote import (
    RemoteFileSystem, RemoteTarget, RemoteFileTarget, RemoteDirectoryTarget,
)
from law.util import patch_object
from law._types import Any, Type, Generator, Iterator, IO


local_root_check_lock = threading.Lock()


[docs] class MirroredTarget(FileSystemTarget): _existing_local_roots: dict[str, bool] = {} local_sync_default: bool = True @classmethod def check_local_root(cls, path: str | pathlib.Path, depth: int = 1) -> bool: path = str(path) # path must start with a separator if path == os.sep: return True if not path or not path.startswith(os.sep): return False root_path = os.sep.join(path.split(os.sep)[:depth + 1]) if root_path not in cls._existing_local_roots: with local_root_check_lock: cls._existing_local_roots[root_path] = os.path.exists(root_path) return cls._existing_local_roots[root_path] def __init__( self, path: str | pathlib.Path, _is_file: bool, remote_target: RemoteTarget | None = None, remote_target_cls: Type[RemoteTarget] | None = None, remote_fs: RemoteFileSystem | str | None = None, remote_kwargs: dict[str, Any] | None = None, local_target: LocalTarget | None = None, local_fs: LocalFileSystem | str | None = None, local_kwargs: dict[str, Any] | None = None, local_read_only: bool = True, local_sync: bool | None = None, **kwargs, ) -> None: path = str(path).lstrip(os.sep) # create a remote target if not remote_target: if not remote_fs: raise ValueError("either remote_target or remote_fs must be given") if not remote_target_cls: raise ValueError("either remote_target or remote_target_cls must be given") if _is_file and not issubclass(remote_target_cls, RemoteFileTarget): raise TypeError( f"remote_target_cls must subclass RemoteFileTarget: {remote_target_cls}", ) if not _is_file and not issubclass(remote_target_cls, RemoteDirectoryTarget): raise TypeError( f"remote_target_cls must subclass RemoteDirectoryTarget: {remote_target_cls}", ) remote_kwargs = remote_kwargs.copy() if remote_kwargs else {} remote_kwargs["fs"] = remote_fs remote_target = remote_target_cls(os.sep + path, **remote_kwargs) else: if _is_file and not isinstance(remote_target, RemoteFileTarget): raise TypeError( f"remote_target must be an instance of RemoteFileTarget: {remote_target}", ) if not _is_file and not isinstance(remote_target, RemoteDirectoryTarget): raise TypeError( f"remote_target must be an instance of RemoteDirectoryTarget: {remote_target}", ) # create a local target if not local_target: local_kwargs = local_kwargs.copy() if local_kwargs else {} if not local_fs: local_fs = Config.instance().get_expanded("target", "default_local_fs") local_kwargs["fs"] = local_fs local_target_cls = LocalFileTarget if _is_file else LocalDirectoryTarget local_target = local_target_cls(path, **local_kwargs) else: if _is_file and not isinstance(local_target, LocalFileTarget): raise TypeError( f"local_target must be an instance of LocalFileTarget: {local_target}", ) if not _is_file and not isinstance(local_target, LocalDirectoryTarget): raise TypeError( f"local_target must be an instance of LocalDirectoryTarget: {local_target}", ) # store targets self.remote_target = remote_target self.local_target = local_target # additional attributes self.local_read_only = local_read_only self.local_sync = self.local_sync_default if local_sync is None else local_sync # temporary, forced file system self._force_fs = None super().__init__(path, **kwargs) def _copy_kwargs(self): kwargs = super()._copy_kwargs() kwargs["local_read_only"] = self.local_read_only kwargs["local_sync"] = self.local_sync return kwargs @property def _local_root_depth(self) -> int: return self.local_target.fs.local_root_depth def _local_root_exists(self) -> bool: return self.check_local_root(self.local_target.abspath, depth=self._local_root_depth) def _local_target_exists(self, *args, **kwargs) -> bool: return bool(self._local_root_exists() and self.local_target.exists(*args, **kwargs)) def _parent_args(self) -> tuple[tuple[Any, ...], dict[str, Any]]: parent_kwargs = { "remote_target": self.remote_target.parent, "local_target": self.local_target.parent, "local_read_only": self.local_read_only, "local_sync": self.local_sync, } return (), parent_kwargs def _wait_for_local( self, missing: bool = False, timeout: int | float = 0.5, attempts: int = 90, local_sync: bool | None = None, ) -> None: # no need to wait if local sync is disabled if not (self.local_sync if local_sync is None else local_sync): return # no need to wait if local root does not require checking if not self.check_local_root(self.local_target.abspath, depth=self._local_root_depth): return sleep_counter = 0 while self.local_target.exists() == missing: time.sleep(timeout) sleep_counter += 1 if sleep_counter >= attempts: state = "disappear" if missing else "exist" raise Exception( f"timeout while waiting for local target representation {self.local_target!r} " f"to {state}", ) @property def fs(self) -> LocalFileSystem | RemoteFileSystem: if self._force_fs is not None: return self._force_fs return ( self.local_target.fs if self._local_target_exists() else self.remote_target.fs # type: ignore[return-value] ) @contextlib.contextmanager def force_fs(self, fs) -> Generator[None, None, None]: with patch_object(self, "_force_fs", fs): yield @property def abspath(self) -> str: return ( self.local_target.abspath if self._local_target_exists() else self.remote_target.abspath ) @property def dirname(self) -> str: return ( self.local_target.dirname if self._local_target_exists() else self.remote_target.dirname ) @property def absdirname(self) -> str: return ( self.local_target.absdirname if self._local_target_exists() else self.remote_target.absdirname ) @property def basename(self) -> str: return self.local_target.basename def stat(self, *args, **kwargs) -> os.stat_result: return ( self.local_target.stat(*args, **kwargs) if self._local_target_exists() else self.remote_target.stat(*args, **kwargs) )
[docs] def exists(self, *args, **kwargs) -> bool | os.stat_result | None: # type: ignore[override] return ( self.local_target.exists(*args, **kwargs) or self.remote_target.exists(*args, **kwargs) )
[docs] def remove(self, *args, local_sync: bool | None = None, **kwargs) -> bool: if not self.local_read_only: return self.local_target.remove(*args, **kwargs) ret = self.remote_target.remove(*args, **kwargs) self._wait_for_local(missing=True, local_sync=local_sync) return ret
def chmod(self, *args, **kwargs) -> bool: return self.remote_target.chmod(*args, **kwargs) def uri(self, *args, **kwargs) -> str | list[str]: return ( self.local_target.uri(*args, **kwargs) if self._local_target_exists() else self.remote_target.uri(*args, **kwargs) ) def copy_to(self, *args, **kwargs) -> str: return ( self.local_target.copy_to(*args, **kwargs) if self._local_target_exists() else self.remote_target.copy_to(*args, **kwargs) ) def copy_from(self, *args, local_sync: bool | None = None, **kwargs) -> str: ret = self.remote_target.copy_from(*args, **kwargs) self._wait_for_local(missing=False, local_sync=local_sync) return ret def move_to(self, *args, local_sync: bool | None = None, **kwargs) -> str: if not self.local_read_only: return self.local_target.move_to(*args, **kwargs) ret = self.remote_target.move_to(*args, **kwargs) self._wait_for_local(missing=True, local_sync=local_sync) return ret def move_from(self, *args, local_sync: bool | None = None, **kwargs) -> str: if not self.local_read_only: return self.local_target.move_from(*args, **kwargs) ret = self.remote_target.move_from(*args, **kwargs) self._wait_for_local(missing=False, local_sync=local_sync) return ret def copy_to_local(self, *args, **kwargs) -> str: return ( self.local_target.copy_to_local(*args, **kwargs) if self._local_target_exists() else self.remote_target.copy_to_local(*args, **kwargs) ) def copy_from_local(self, *args, local_sync: bool | None = None, **kwargs) -> str: ret = self.remote_target.copy_from_local(*args, **kwargs) self._wait_for_local(missing=False, local_sync=local_sync) return ret def move_to_local(self, *args, local_sync: bool | None = None, **kwargs) -> str: if not self.local_read_only: return self.local_target.move_to_local(*args, **kwargs) ret = self.remote_target.move_to_local(*args, **kwargs) self._wait_for_local(missing=True, local_sync=local_sync) return ret def move_from_local(self, *args, local_sync: bool | None = None, **kwargs) -> str: if not self.local_read_only: return self.local_target.move_from_local(*args, **kwargs) ret = self.remote_target.move_from_local(*args, **kwargs) self._wait_for_local(missing=False, local_sync=local_sync) return ret @contextlib.contextmanager def localize(self, mode: str = "r", *, local_sync: bool | None = None, **kwargs) -> Iterator[FileSystemTarget]: with ( self.local_target.localize(mode=mode, **kwargs) if (mode == "r" or not self.local_read_only) and self._local_target_exists() else self.remote_target.localize(mode=mode, **kwargs) ) as ret: yield ret if mode == "w" and self.local_read_only: self._wait_for_local(missing=False, local_sync=local_sync) def touch(self, *, local_sync: bool | None = None, **kwargs) -> bool: if not self.local_read_only: return self.local_target.touch(**kwargs) ret = self.remote_target.touch(**kwargs) self._wait_for_local(missing=False, local_sync=local_sync) return ret def load(self, *args, **kwargs) -> Any: return ( self.local_target.load(*args, **kwargs) if self._local_target_exists() else self.remote_target.load(*args, **kwargs) ) def dump(self, *args, local_sync: bool | None = None, **kwargs) -> Any: if not self.local_read_only: return self.local_target.dump(*args, **kwargs) ret = self.remote_target.dump(*args, **kwargs) self._wait_for_local(missing=False, local_sync=local_sync) return ret
[docs] class MirroredFileTarget(FileSystemFileTarget, MirroredTarget): # type: ignore[misc] def __init__(self, path: str | pathlib.Path, **kwargs) -> None: super().__init__(path, _is_file=True, **kwargs)
[docs] @contextlib.contextmanager def open(self, mode: str, *, local_sync: bool | None = None, **kwargs) -> Iterator[IO]: with ( self.local_target.open(mode, **kwargs) # type: ignore[misc] if (mode == "r" or not self.local_read_only) and self._local_target_exists() else self.remote_target.open(mode, **kwargs) # type: ignore[misc] ) as ret: yield ret if mode == "w" and self.local_read_only: self._wait_for_local(missing=False, local_sync=local_sync) return ret
[docs] class MirroredDirectoryTarget(FileSystemDirectoryTarget, MirroredTarget): # type: ignore[misc] def __init__(self, path: str | pathlib.Path, **kwargs) -> None: super().__init__(path, _is_file=False, **kwargs) def _child_args( self, path: str | pathlib.Path, type: str, ) -> tuple[tuple[Any, ...], dict[str, Any]]: child_kwargs = { "remote_target": self.remote_target.child(path, type=type), # type: ignore[attr-defined] # noqa "local_target": self.local_target.child(path, type=type), # type: ignore[attr-defined] # noqa "local_read_only": self.local_read_only, "local_sync": self.local_sync, } return (), child_kwargs def listdir(self, *args, **kwargs) -> list[str]: return ( self.local_target.listdir(*args, **kwargs) # type: ignore[attr-defined] if self._local_target_exists() else self.remote_target.listdir(*args, **kwargs) # type: ignore[attr-defined] ) def glob(self, *args, **kwargs) -> list[str]: return ( self.local_target.glob(*args, **kwargs) # type: ignore[attr-defined] if self._local_target_exists() else self.remote_target.glob(*args, **kwargs) # type: ignore[attr-defined] ) def walk(self, *args, **kwargs) -> Iterator[tuple[str, list[str], list[str], int]]: return ( self.local_target.walk(*args, **kwargs) # type: ignore[attr-defined] if self._local_target_exists() else self.remote_target.walk(*args, **kwargs) # type: ignore[attr-defined] )
MirroredTarget.file_class = MirroredFileTarget # type: ignore[type-abstract] MirroredTarget.directory_class = MirroredDirectoryTarget # type: ignore[type-abstract]