Source code for btrsync.btrfs

#!/usr/bin/env python

# Copyright © 2023 Andrei Tatar <andrei.ttr@gmail.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later


"""
Data structures and functions manipulating btrfs subvolumes.
"""

import posixpath
import pathlib

from collections import defaultdict

from .. import util


FSTREE = '<FS_TREE>'
"""Path of the btrfs filesystem root, as printed by ``btrfs-progs``."""


[docs]class COWTree: """ Build a hierarchy of btrfs subvolumes based on snapshotting (COW) parentage. :param subvols: the sequence of subvolumes (:class:`dict` instances keyed by subvolume properties) to process :param check: a function that takes a subvolume as argument and returns a boolean whether to consider it for parentage; by default accept all """ def __init__(self, subvols, check=None): def _finish(vol): if vol['_cowcheck']: pq = vol['_cowpreq'] if pq is None: self.roots.append(vol) else: pq['_cowsucc'].append(vol) def _handle_preq(pq): for vol in preq_waitlist[pq['uuid']]: vol['_cowpreq'] = pq['_cowpreq'] _handle_preq(vol) _finish(vol) del preq_waitlist[pq['uuid']] def _handle_sibtree(vol, *sibs, maxdepth=0): for sib in util.bfs(lambda v: reversed(v['_chld']), *reversed(sibs), maxdepth=maxdepth): if sib['_cowcheck']: vol['_cowpreq'] = sib return True return False def _handle_par(vol, par): r = True if not _handle_sibtree(vol, par, maxdepth=1): try: vol['_cowpreq'] = par['_cowpreq'] except KeyError: r = False par['_chld'].append(vol) return r def _handle_parwait(par): uid = par['uuid'] for orph in parent_waitlist[uid]: if _handle_par(orph, par): _handle_preq(orph) _finish(orph) else: preq_waitlist[uid].append(orph) del parent_waitlist[uid] parent_waitlist = defaultdict(list) preq_waitlist = defaultdict(list) self.vols = {} self.roots = [] _check = (lambda v: True) if check is None else check for volume in subvols: vol = volume.copy() vol['_cowcheck'] = _check(vol) vol['_chld'] = [] vol['_cowsucc'] = [] puid = vol['parent_uuid'] if puid is None: vol['_cowpreq'] = None _finish(vol) elif puid in self.vols: if _handle_par(vol, self.vols[puid]): _handle_preq(vol) _finish(vol) else: preq_waitlist[puid].append(vol) else: parent_waitlist[puid].append(vol) self.vols[vol['uuid']] = vol _handle_parwait(vol) for par, orphans in parent_waitlist.items(): sibs = [] for vol in orphans: if not _handle_sibtree(vol, *sibs, maxdepth=0): vol['_cowpreq'] = None sibs.append(vol) _handle_preq(vol) _finish(vol) assert(not preq_waitlist)
[docs] @staticmethod def dfs(node): """Iterate, in a depth-first search, over `node` and its COW descendants.""" return util.dfs(lambda v: v['_cowsucc'], node)
[docs] @staticmethod def bfs(*nodes, **kwargs): """Iterate, in a breadth-first search, over `nodes` and their COW descendants.""" return util.bfs(lambda v: v['_cowsucc'], *nodes, **kwargs)
[docs] @staticmethod def ancestors(node): """Iterate over the COW ancestors of `node`, including itself.""" while node is not None: yield node node = node['_cowpreq']
[docs] @staticmethod def diff(aroots, broots, akeys, bkeys): """ Perform a diff operation over two sets of COW hierarchies, identifying common subvolumes based on custom key functions. Two subvolumes are considered identical if any non-None results of their key functions match. :param aroots: the first sequence of COW hierarchies to compare :param broots: the second sequence of COW hierarchies to compare :param akeys: the sequence of key functions to apply to `aroots` :param bkeys: the sequence of key functions to apply to `broots` :returns: tuple of dicts (`coma`, `comb`), indexing, by subvolume uuid, the elements of `aroots` and `broots` respectively, which have corresponding identical subvolumes in the other set """ coma = defaultdict(list) comb = defaultdict(list) agrp = util.group(COWTree.bfs(*aroots), *akeys) bgrp = util.group(COWTree.bfs(*broots), *bkeys) for ag in agrp: for ak in ag: if ak is not None: for bg in bgrp: if ak in bg: for avol in ag[ak]: coma[avol['uuid']].extend(bg[ak]) for bvol in bg[ak]: comb[bvol['uuid']].extend(ag[ak]) return coma, comb
[docs]def abspaths(vols, rootpath): """ Process a sequence of btrfs volumes, making all relative paths absolute; absolute paths are left unchanged. :param vols: the sequence of btrfs subvolumes to process :param rootpath: the absolute path that subvolume paths are relative to :returns: iterator over the modified subvolumes :raises ValueError: if `rootpath` is not absolute (does not start with :data:`.FSTREE`) """ if not rootpath.startswith(FSTREE): raise ValueError(f'Root path must start with {FSTREE}') for v in vols: vp = v['path'] if vp.startswith(FSTREE): yield v else: nv = v.copy() nv['path'] = util.path_merge(rootpath, vp, root=FSTREE, path=posixpath) yield nv
[docs]def relpaths(vols, rootpath): """ Process a sequence of btrfs volumes, making all paths below a chosen root relative to that root; other paths are left unchanged. :param vols: the sequence of btrfs subvolumes to process :param rootpath: the absolute path that subvolume paths are to be made relative to :returns: iterator over the modified subvolumes :raises ValueError: if `rootpath` is not absolute (does not start with :data:`.FSTREE`) """ if not rootpath.startswith(FSTREE): raise ValueError(f'Root path must start with {FSTREE}') relparts = pathlib.PurePosixPath(rootpath).parts[1:] relrootpath = posixpath.join(*relparts) if relparts else '' for v in vols: vp = v['path'] if vp.startswith(rootpath): nv = v.copy() nv['path'] = posixpath.relpath(vp, rootpath) yield nv elif relrootpath and not vp.startswith(FSTREE): nv = v.copy() nv['path'] = posixpath.relpath(vp, relrootpath) yield nv else: yield v
from . import cmd, parse