Source code for btrsync.sync.root.local

#!/usr/bin/env python

# Copyright © 2023 Andrei Tatar <andrei.ttr@gmail.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later


"""
Btrfs roots implemented using locally executed ``btrfs`` commands.
"""

import os
import asyncio
import posixpath

from . import BtrfsError, BtrfsRoot

from ... import util
from ... import btrfs
from ... import cmdex


SUDO = util.Cmd('sudo')


[docs]class LocalBtrfsRoot(BtrfsRoot): """ Btrfs root implemented using local execution of ``btrfs`` commands, anchored at `rootpath`. :param rootpath: path to the target subvolume root :param scope: determines the scope of accessible subvolumes: ``'all'`` includes all subvolumes reachable from `rootpath`, ``'strict'`` includes only subvolumes directly contained in `rootpath`, and ``'isolated'`` behaves like ``'strict'``, but also excludes other subvolumes from computing parentage :param readonly: if :const:`True`, list only readonly subvolumes :param create_recvpath: if :const:`True`, ensure the `path` passed to :meth:`.receive` exists :raises ValueError: for an invalid value of `scope` """ _SCOPES = ('all', 'strict', 'isolated') def __init__(self, rootpath, *, scope='all', readonly=True, create_recvpath=False): self.localroot = rootpath if scope not in self._SCOPES: raise ValueError(f"`scope' must be one of {self._SCOPES}") self.scope = scope self.readonly = readonly self.create_recvpath = create_recvpath self._args = ('localroot',) self._kwargs = ('scope', 'readonly', 'create_recvpath') self._isolated = scope == 'isolated' self._strict = scope != 'all' self._fsroot = None def _reprargs(self): return ', '.join((', '.join(f'{repr(getattr(self, arg))}' for arg in self._args), ', '.join(f'{arg}={repr(getattr(self, arg))}' for arg in self._kwargs))) def __repr__(self): return f'{type(self).__name__}({self._reprargs()})'
[docs] @staticmethod def wrapcmds(cmds): """Return `cmds` unchanged; override to customize executed commands.""" yield from cmds
@classmethod async def _run(cls, *cmds, **kwargs): return (await cmdex.ex_out(*cls.wrapcmds(cmds), **kwargs))[-1] def _ex(self, *cmds, **kwargs): return cmdex.ex(*self.wrapcmds(cmds), **kwargs) async def _run_checked(self, *cmds, msg='', **kwargs): ret, (stdout, stderr) = await self._run(*cmds, **kwargs) if ret != 0: raise BtrfsError(': '.join((stderr.decode('utf-8').rstrip(), msg))) return ret, (stdout, stderr)
[docs] @classmethod async def is_root(cls, path): ret, (stdout, stderr) = await cls._run(btrfs.cmd.show(path), stdin=cmdex.DEVNULL) err = stderr.decode('utf-8') if ret != 0: if 'Not a Btrfs subvolume' in err or 'No such file or directory' in err: return False else: raise BtrfsError(': '.join((err.rstrip(), path))) else: return True
[docs] @classmethod async def get_root(cls, path, **kwargs): rpath = path while rpath != '/' and not await cls.is_root(rpath): rpath = os.path.dirname(rpath) if rpath == '/' and not await cls.is_root(rpath): raise BtrfsError('Cannot find root') return cls(rpath, **kwargs), os.path.relpath(path, rpath)
def _localpath(self, path): if not util.is_subpath(path): raise ValueError('Path must be relative and cannot escape its base directory') return os.path.join(self.localroot, path) async def _chk(self): if self._fsroot is None: ret, (stdout, stderr) = await self._run_checked( btrfs.cmd.show(self.localroot), msg=self.localroot, stdin=cmdex.DEVNULL ) rp, stats = btrfs.parse.Show.from_stdout(stdout) self._fsroot = posixpath.join(btrfs.FSTREE, '' if rp == '/' else rp)
[docs] async def list(self): await self._chk() alcmd = btrfs.cmd.list(self.localroot, list_all=not self._isolated, readonly=False, fields='uqR') rocmd = btrfs.cmd.list(self.localroot, list_all=not self._strict, readonly=self.readonly, fields='u') ret, (stdout, stderr) = await self._run_checked(rocmd, msg=self.localroot, stdin=cmdex.DEVNULL) rvs = util.index(btrfs.parse.List.from_stdout(stdout), lambda v: v['uuid'])[0] ret, (stdout, stderr) = await self._run_checked(alcmd, msg=self.localroot, stdin=cmdex.DEVNULL) allvols = btrfs.relpaths(btrfs.parse.List.from_stdout(stdout), self._fsroot) ct = btrfs.COWTree(allvols, lambda v: v['uuid'] in rvs and not v['path'].startswith(btrfs.FSTREE)) return ct.roots
[docs] async def show(self, path='.'): tpath = self._localpath(path) await self._chk() ret, (stdout, stderr) = await self._run_checked(btrfs.cmd.show(tpath), msg=tpath, stdin=cmdex.DEVNULL) return btrfs.parse.Show.from_stdout(stdout)
async def _dosend(self, fildes, cmd): p, r = await self._ex(cmd, stdin=cmdex.DEVNULL, stdout=fildes.fd, stderr=cmdex.PIPE) fildes.close() if r[0][0].returncode: raise BtrfsError(r[0][1][1].decode('utf-8').splitlines())
[docs] async def send(self, *paths, parent=None, clones=[]): tpaths = (self._localpath(p) for p in paths) if parent is not None: parent = self._localpath(parent) clones = (self._localpath(c) for c in clones) cmd = btrfs.cmd.send(*tpaths, parent=parent, clones=clones) await self._chk() r, w = map(util.FileDesc, os.pipe()) return r, self._dosend(w, cmd)
[docs] async def receive(self, fildes, path='.'): tpath = self._localpath(path) cmd = btrfs.cmd.receive(tpath) await self._chk() if self.create_recvpath: await self._run_checked(util.Cmd('mkdir', ['-p', tpath])) task = asyncio.create_task(self._ex(cmd, stdin=fildes.fd, stdout=cmdex.PIPE, stderr=cmdex.PIPE)) fildes.closed = True p, r = await task if r[-1][0].returncode: raise BtrfsError(r[-1][1][1].decode('utf-8').splitlines())
[docs]def LocalRoot(*, sudo=False): """ Return an appropriate btrfs root class for accessing local btrfs filesystems. :param sudo: if :const:`True`, use ``sudo`` to execute ``btrfs`` commands """ if sudo: class SudoLocalRoot(LocalBtrfsRoot): @staticmethod def wrapcmds(cmds): yield from (c.wrap(SUDO) for c in cmds) def __repr__(self): return f'LocalRoot(sudo=True)({self._reprargs()})' return SudoLocalRoot else: return LocalBtrfsRoot