Source code for btrsync.sync.root.local

#!/usr/bin/env python

# Copyright © 2023 Andrei Tatar <andrei.ttr@gmail.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later


"""
Btrfs roots implemented using locally executed ``btrfs`` commands.
"""

import io
import os
import asyncio
import posixpath

from . import BtrfsError
from . import _exec

from ... import util
from ... import btrfs
from ... import cmdex


SUDO = util.Cmd('sudo')


[docs]class LocalBtrfsRoot(_exec.ExecBtrfsRoot): """ Btrfs root implemented using local execution of ``btrfs`` commands, anchored at `rootpath`. :param rootpath: path to the target subvolume root :param scope: determines the scope of accessible subvolumes: ``'all'`` includes all subvolumes reachable from `rootpath`, ``'strict'`` includes only subvolumes directly contained in `rootpath`, and ``'isolated'`` behaves like ``'strict'``, but also excludes other subvolumes from computing parentage :param readonly: if :const:`True`, list only readonly subvolumes :param create_recvpath: if :const:`True`, ensure the `path` passed to :meth:`receive` exists :raises ValueError: for an invalid value of `scope` """ _SCOPES = ('all', 'strict', 'isolated') def __init__(self, rootpath, *, scope='all', readonly=True, create_recvpath=False): self.rootpath = rootpath if scope not in self._SCOPES: raise ValueError(f"`scope' must be one of {self._SCOPES}") self.scope = scope self.readonly = readonly self.create_recvpath = create_recvpath self._args = ('rootpath',) self._kwargs = ('scope', 'readonly', 'create_recvpath') self._isolated = scope == 'isolated' self._strict = scope != 'all' self._fsroot = None @property def name(self): return self.rootpath
[docs] @classmethod async def is_root(cls, path): cmd = btrfs.cmd.show(path) ret, (stdout, stderr) = await cls._run(cmd, stdin=cmdex.DEVNULL) err = stderr.decode('utf-8') if ret != 0: if 'Not a Btrfs subvolume' in err or 'No such file or directory' in err: return False else: raise BtrfsError(cmd.shellify(), err.rstrip()) else: return True
[docs] @classmethod async def get_root(cls, path, **kwargs): rpath = path while rpath != '/' and not await cls.is_root(rpath): rpath = os.path.dirname(rpath) if rpath == '/' and not await cls.is_root(rpath): raise BtrfsError('Cannot find root') return cls(rpath, **kwargs), os.path.relpath(path, rpath)
def _localpath(self, path): if not util.is_subpath(path): raise ValueError('Path must be relative and cannot escape its base directory') return os.path.join(self.rootpath, path) async def _chk(self): if self._fsroot is None: ret, (stdout, stderr) = await self._run_checked(btrfs.cmd.show(self.rootpath), stdin=cmdex.DEVNULL) rp, stats = btrfs.parse.Show.from_stdout(stdout) self._fsroot = posixpath.join(btrfs.FSTREE, '' if rp == '/' else rp)
[docs] async def list(self): await self._chk() alcmd = btrfs.cmd.list(self.rootpath, list_all=not self._isolated, readonly=False, fields='uqR') rocmd = btrfs.cmd.list(self.rootpath, list_all=not self._strict, readonly=self.readonly, fields='u') ret, (stdout, stderr) = await self._run_checked(rocmd, stdin=cmdex.DEVNULL) rvs = util.index(btrfs.parse.List.from_stdout(stdout), lambda v: v['uuid'])[0] ret, (stdout, stderr) = await self._run_checked(alcmd, stdin=cmdex.DEVNULL) allvols = btrfs.relpaths(btrfs.parse.List.from_stdout(stdout), self._fsroot) ct = btrfs.COWTree(allvols, lambda v: v['uuid'] in rvs and not v['path'].startswith(btrfs.FSTREE)) return ct.roots
[docs] async def show(self, path='.'): tpath = self._localpath(path) await self._chk() ret, (stdout, stderr) = await self._run_checked(btrfs.cmd.show(tpath), stdin=cmdex.DEVNULL) return btrfs.parse.Show.from_stdout(stdout)
[docs] async def send(self, *paths, parent=None, clones=[]): tpaths = (self._localpath(p) for p in paths) if parent is not None: parent = self._localpath(parent) clones = (self._localpath(c) for c in clones) await self._chk() r, w = map(io.FileIO, os.pipe(), ('r', 'w')) return util.PipeFlow(r), self._dosend(w, btrfs.cmd.send(*tpaths, parent=parent, clones=clones))
async def _dosend(self, f, cmd): try: await self._run_checked(cmd, stdin=cmdex.DEVNULL, stdout=f) finally: f.close()
[docs] async def receive(self, flow, path='.', *, meta={}): tpath = self._localpath(path) await self._chk() return self._dorecv(tpath, flow.connect_fd())
async def _dorecv(self, tpath, f): if self.create_recvpath: await self._run_checked(util.Cmd('mkdir', ['-p', tpath])) await self._run_checked(btrfs.cmd.receive(tpath), stdin=f)
[docs]def LocalRoot(*, sudo=False): """ Return an appropriate btrfs root class for accessing local btrfs filesystems. :param sudo: if :const:`True`, use ``sudo`` to execute ``btrfs`` commands """ if sudo: class SudoLocalRoot(LocalBtrfsRoot): @staticmethod def wrapcmds(cmds): yield from (c.wrap(SUDO) for c in cmds) def __repr__(self): return f'LocalRoot(sudo=True)({self._reprargs()})' return SudoLocalRoot else: return LocalBtrfsRoot