Source code for btrsync.sync

#!/usr/bin/env python

# Copyright © 2023 Andrei Tatar <andrei.ttr@gmail.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later


"""
Btrfs subvolume synchronization.
"""

import asyncio
import posixpath
import itertools

from .. import btrfs
from .. import util


[docs]class BtrSync: """ Base class containing logic to sync btrfs subvolumes from a source root to a destination root. :param src: source btrfs root (:class:`btrsync.sync.root.BtrfsRoot`-like instance) :param dst: destination btrfs root (:class:`btrsync.sync.root.BtrfsRoot`-like instance) :param srckeys: key functions to be applied to the subvolumes of `src`, if :const:`None` use default key functions :param dstkeys: key functions to be applied to the subvolumes of `dst`, if :const:`None` use default key functions Key functions take a subvolume as argument and return a value that is then used for equality testing. Two subvolumes are considered equal if any of their key values compare equal and are not :const:`None`. The default key functions return the UUID and received UUID of subvolumes. """ DEFAULT_KEYS = (lambda v: v['uuid'], lambda v: v['received_uuid']) def __init__(self, src, dst, *, srckeys=None, dstkeys=None): self.src = src self.dst = dst self.srckeys = srckeys if srckeys is not None else self.DEFAULT_KEYS self.dstkeys = dstkeys if dstkeys is not None else self.DEFAULT_KEYS
[docs] @staticmethod def volgroups(roots): """Iterate over `roots` and their descendants in groups of COW-independent volumes.""" g = [] for vol in btrfs.COWTree.bfs(*roots, depth_markers=True): if vol is None: yield g g = [] else: g.append(vol)
[docs] @staticmethod def target(vol): """Return :const:`True` if `vol` is to be considered for sync.""" return True
[docs] def parent(self, vol): """Return the COW parent of `vol` to be used as basis for incremental send, or :const:`None` if unavailable.""" for par in btrfs.COWTree.ancestors(vol): if par['uuid'] in self.diff[0]: return par return None
[docs] @staticmethod def check(vol, parent): """Return :const:`True` if the sync of `vol` with COW parent `parent` should proceed.""" return True
[docs] @staticmethod def stop(vols): """Called after `vols` have been synced; return :const:`True` if sync should immediately stop.""" return False
async def _refresh(self, trans): """Update the internal diff between source and destination roots.""" self.srcroots, self.dstroots = await trans.try_gather(self.src.list(), self.dst.list()) self.diff = btrfs.COWTree.diff(self.srcroots, self.dstroots, self.srckeys, self.dstkeys)
[docs] async def sync(self, trans, *, batch=False, parallel=False, transfer_existing=False, volgroups=None, target=None, parent=None, check=None, stop=None): """ Perform synchronization of subvolumes. Returns whether the sync was successful and all error handling is delegated to `trans`. :param trans: :class:`.Transfer`-like object to perform the actual send/receive operations :param batch: if :const:`True`, batch together multiple volumes into a single transfer :param parallel: if :const:`True`, run independent transfers in parallel :param transfer_existing: if :const:`True`, consider for transfer volumes that already exist on the destination :param volgroups: override for :meth:`volgroups` :param target: override for :meth:`target` :param parent: override for :meth:`parent` :param check: override for :meth:`check` :param stop: override for :meth:`stop` :returns: :const:`True` if sync successful, :const:`False` if errors occured """ volgroups = self.volgroups if volgroups is None else volgroups target = self.target if target is None else target parent = self.parent if parent is None else parent check = self.check if check is None else check stop = self.stop if stop is None else stop def tf(vols, par): async def f(a): r = await a return (vols, r) return f(trans.transf(vols, par, self.src, self.dst)) def mark(vols): for v in vols: self.diff[0][v['uuid']].append(None) await self._refresh(trans) finish = False erred = False for volgr in volgroups(self.srcroots): targets = [vol for vol in volgr if target(vol) and (transfer_existing or vol['uuid'] not in self.diff[0])] parents = (parent(vol) for vol in targets) cand = (pair for pair in zip(targets, parents) if check(*pair)) if batch: packs = ( ([x[0] for x in vps], vps[0][1]) for _, vps in util.group( cand, lambda x: (x[1]['uuid'] if x[1] is not None else None, posixpath.dirname(x[0])) )[0].items() ) else: packs = (([x[0]], x[1]) for x in cand) transfers = (tf(vols, par) for vols, par in packs) transeq = asyncio.as_completed(transfers) if parallel else transfers for transop in transeq: try: vols, res = await transop except asyncio.CancelledError: erred = True if parallel: continue else: break mark(vols) if stop(vols): finish = True if not parallel: break if finish or erred: break return not erred
[docs]class Transfer: """ Base class implementing a transfer as required by :meth:`.BtrSync.sync`. :param recvpath: the path that transfers are received into :param replicate_dirs: if :const:`True` adapt `recvpath` to recreate the sent volumes' directory structure """ def __init__(self, *, recvpath='.', replicate_dirs=False): self.recvbase = recvpath self.replicate_dirs = replicate_dirs
[docs] def err(self, e, *args): """Called on encountering an exception `e`; expected to log the error and return successfully.""" pass
[docs] async def report(self, vols, par, src, dst): """Called at the start of a transfer of volumes `vols` with parent `par` from `src` to `dst`.""" pass
[docs] async def report_done(self, vols, par, src, dst): """Called when the transfer of volumes `vols` with parent `par` from `src` to `dst` has finished.""" pass
[docs] async def try_await(self, aw): """ Try awaiting awaitable `aw`, converting any exceptions to :exc:`asyncio.CancelledError`. Exceptions raised by `aw` will be logged by :meth:`err`. """ try: return await aw except BaseException as e: self.err(e) raise asyncio.CancelledError() from e
async def _collect(self, *tasks): erred = False for t in tasks: try: await asyncio.wait_for(t, timeout=0) except TimeoutError: pass except BaseException as e: self.err(e) erred = True if erred: raise asyncio.CancelledError()
[docs] async def try_gather(self, *coroutines): """ Try running and waiting for all `coroutines`, canceling all upon error. Exceptions raised by the tasks will be logged by :meth:`err`. :raises asyncio.CancelledError: if any tasks raise an exception """ tasks = map(asyncio.create_task, coroutines) try: return await asyncio.gather(*tasks) except BaseException as e: self.err(e) try: await self._collect(*tasks) except asyncio.CancelledError: pass raise asyncio.CancelledError() from e
@staticmethod def _sendpaths(vols, par): volpaths = [v['path'] for v in vols] parent = par['path'] if par is not None else None meta = {'volumes': volpaths} if parent is not None: meta['parent'] = parent return volpaths, parent, meta def _recvpath(self, volpaths): if self.replicate_dirs: voldir = posixpath.dirname(volpaths[0]) for vp in volpaths[1:]: assert(posixpath.dirname(vp) == voldir) return posixpath.join(self.recvbase, voldir) else: return self.recvbase
[docs] async def transf(self, vols, par, src, dst): """ Minimal quiet transfer function, as expected by :meth:`.BtrSync.sync`. :param vols: the subvolumes to transfer :param par: the parent subvolume to use for an incremental transfer :param src: the source btrfs root :param dst: the destination btrfs root :raises asyncio.CancelledError: if any error has occured, after logging it with :meth:`err` """ args = (vols, par, src, dst) await self.try_await(self.report(*args)) volpaths, parent, meta = self._sendpaths(vols, par) flow, scoro = await self.try_await(src.send(*volpaths, parent=parent)) dcoro = await self.try_await(dst.receive(flow, self._recvpath(volpaths), meta=meta)) await self.try_gather(scoro, dcoro, flow.pump()) await self.try_await(self.report_done(*args))
[docs]class ProgressTransfer(Transfer): """ Transfer class providing a mechanism for logging transfer progress. :param period: time, in seconds, between progress reporting events :param prog_seq: sequence to be cycled through to indicate activity :param kwargs: additional keyword arguments to pass to the superclass :class:`.Transfer`'s constructor """ def __init__(self, *, period=1, prog_seq=[], **kwargs): super().__init__(**kwargs) self.period = period self.prog_seq = prog_seq
[docs] async def report_progress(self, total, prev, seq): """ Called on every progress reporting event. :param total: currrent total bytes transferred :param prev: value of `total` from previous call :param seq: sequence to indicate activity """
[docs] async def progress(self, flow, seq): """Implement progress reporting every `period` seconds.""" cnt, prev = flow.count, 0 while True: await self.report_progress(cnt, prev, seq) try: await asyncio.sleep(self.period) except: await self.report_progress(flow.count, cnt, seq) raise cnt, prev = flow.count, cnt
[docs] async def transf(self, vols, par, src, dst): """ Progress reporting transfer function, as expected by `.BtrSync.sync`. :param vols: the subvolumes to transfer :param par: the parent subvolume to use for an incremental transfer :param src: the source btrfs root :param dst: the destination btrfs root :raises asyncio.CancelledError: if any error has occurred, after logging it with :meth:`err` """ args = (vols, par, src, dst) seq = itertools.cycle(self.prog_seq) volpaths, parent, meta = self._sendpaths(vols, par) await self.try_await(self.report(*args)) flow, scoro = await self.try_await(src.send(*volpaths, parent=parent)) flow.stats = True dcoro = await self.try_await(dst.receive(flow, self._recvpath(volpaths), meta=meta)) prog = asyncio.create_task(self.progress(flow, seq)) try: await self.try_gather(scoro, dcoro, flow.pump()) finally: await self._collect(prog) await self.try_await(self.report_done(*args))
from . import root
[docs]def default_root(protocol): """ Get the default btrfs root used to handle `protocol`. :param protocol: btrfs protocol to use; valid values are ``'local'``, ``'ssh'``, ``'file'``, and ``'file-dump'``. :returns: appropriate :class:`btrsync.sync.root.BtrfsRoot` subclass factory for `protocol` :raises ValueError: if protocol is unknown """ if protocol == 'local': return root.local.LocalRoot elif protocol == 'ssh': return root.ssh.SSHRoot elif protocol == 'file': return root.file.FileRoot elif protocol == 'file-dump': return root.file.DumpRoot else: raise ValueError('Unknown protocol', protocol)