Source code for btrsync.cmdex

#!/usr/bin/env python

# Copyright © 2023 Andrei Tatar <andrei.ttr@gmail.com>
#
# SPDX-License-Identifier: GPL-3.0-or-later


"""
Asynchronous execution of subprocess pipelines.
"""

import os
import asyncio
import threading

from . import util


DEVNULL = asyncio.subprocess.DEVNULL
PIPE = asyncio.subprocess.PIPE


def _killall(procs, forced=False):
	for p in procs:
		if p.returncode is None:
			if forced:
				p.kill()
			else:
				p.terminate()

def _waitall(procs):
	return asyncio.gather(*(p.wait() for p in procs))


[docs]async def create_pipeline(*cmds, stdin=None, stdout=None, stderr=None): """ Asynchronous iterator returning started subprocesses connected together as a pipeline. :param cmds: sequence of :class:`btrsync.util.Cmd`-like commands that form the pipeline :param stdin: standard input of the first command in the pipeline; :const:`None` means inherit from caller. If supplied must be file descriptor or file-like object backed by a file descriptor. If supplied, it is guaranteed to be closed on either success or error. :param stdout: standard output of the last command in the pipeline; :const:`None` means inherit from caller If supplied must be file descriptor or file-like object backed by a file descriptor. :param stderr: standard error of all commands in the pipeline; :const:`None` means inherit from caller If supplied must be file descriptor or file-like object backed by a file descriptor. :returns: :class:`asyncio.subprocess.Process` instances of started processes """ def _chkclose(fd): if fd is not None: if not isinstance(fd, int): fd.close() elif fd >= 0: os.close(fd) def _fd(fd): return fd if fd is None or isinstance(fd, int) else fd.fileno() err = _fd(stderr) end = stdin for prg, args in cmds[:-1]: try: nextend, head = os.pipe() except: _chkclose(end) raise try: proc = await asyncio.create_subprocess_exec(prg, *args, stdin=_fd(end), stdout=head, stderr=err) except: os.close(nextend) raise finally: _chkclose(end) os.close(head) yield proc end = nextend for prg, args in cmds[-1:]: try: proc = await asyncio.create_subprocess_exec(prg, *args, stdin=_fd(end), stdout=_fd(stdout), stderr=err) finally: _chkclose(end) yield proc
[docs]async def wait_procs(procs, *, timeout=None, abort=False): """ Wait on multiple processes, optionally timing out or aborting on failure. :param procs: sequence of processes to wait on :param timeout: if not :const:`None`, time out after this many seconds; running processes are left untouched after timeout :param abort: if :const:`True`, terminate remaining running processes after an abnormal completion (exit code not 0) :returns: a list of tuples ``(process, output)``, in order of completion, of process objects and their captured ``(stdout, stderr)`` output, if any """ def _wrapproc(p): async def pwait(): r = await p.communicate() return p, r return pwait() rets = [] for coro in asyncio.as_completed([_wrapproc(p) for p in procs], timeout=timeout): try: res = await coro except TimeoutError: break rets.append(res) if abort and res.returncode: _killall(procs) abort = False return rets
[docs]async def ex(*cmds, stdin=None, stdout=None, stderr=None, timeout=None, hard_timeout=True, **kwargs): """ Execute a series of commands in a pipeline. :param cmds: sequence of :class:`btrsync.util.Cmd`-like commands that form the pipeline :param stdin: standard input of the first command in the pipeline; :const:`None` means inherit from caller :param stdout: standard output of the last command in the pipeline; :const:`None` means inherit from caller :param stderr: standard error of all commands in the pipeline; :const:`None` means inherit from caller :param timeout: if not :const:`None`, time out after this many seconds :param hard_timeout: if :const:`True`, kill spawned processes on timeout, otherwise leave them untouched :param kwargs: additional keyword arguments to be passed to :func:`.wait_procs` :returns: a tuple ``(procs, rets)``: a list of spawned :class:`asyncio.subprocess.Process` objects, in the order specified by `cmds`, and a list of tuples ``(process, output)``, in order of completion, of process objects and their captured ``(stdout, stderr)`` output, if any """ procs = [] try: async for p in create_pipeline(*cmds, stdin=stdin, stdout=stdout, stderr=stderr): procs.append(p) except: _killall(procs) await _waitall(procs) raise try: r = await wait_procs(procs, timeout=timeout, **kwargs) except: _killall(procs) await _waitall(procs) raise if len(r) < len(procs) and hard_timeout: _killall(procs) await _waitall(procs) return procs, r
[docs]async def ex_out(*cmds, stdout=PIPE, **kwargs): """ Execute a series of commands in a pipeline, capturing standard output and error. :param cmds: sequence of :class:`btrsync.util.Cmd`-like commands that form the pipeline :param stdin: standard input of the first command in the pipeline; :const:`None` means inherit from caller :param timeout: if not :const:`None`, time out after this many seconds; remaining running processes are killed on timeout :returns: a list of tuples ``(exit_code, output)``, in the order specified by `cmds`, of the processes' exit code and captured ``(stdout, stderr)`` output """ proc, ret = await ex(*cmds, stdout=stdout, stderr=PIPE, hard_timeout=True, **kwargs) rv = {r[0]: (r[0].returncode, r[1]) for r in ret} return [rv[p] if p in rv else (None, (b'', b'')) for p in proc]