Skip to content
12 changes: 6 additions & 6 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
include:
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.1/gitlab-base.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.1/gitlab-dom0.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.1/gitlab-vm.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.2/gitlab-base.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.2/gitlab-host.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.2/gitlab-vm.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.3/gitlab-base.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.3/gitlab-host.yml'
- project: 'QubesOS/qubes-continuous-integration'
file: '/r4.3/gitlab-vm.yml'

checks:tests:
stage: checks
Expand Down
16 changes: 12 additions & 4 deletions splitgpg2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
from typing import Optional, Dict, Callable, Awaitable, Tuple, Pattern, List, \
Union, Any, TypeVar, Set, TYPE_CHECKING, Coroutine, Sequence, cast

from .stdiostream import StdoutWriterProtocol

if TYPE_CHECKING:
from typing_extensions import Protocol
from typing import TypeAlias
Expand Down Expand Up @@ -326,15 +328,21 @@ def setup_subkey_keyring(self) -> None:
self.gnupghome, self.source_keyring_dir)
with subprocess.Popen(export_cmd,
stdout=subprocess.PIPE,
stdin=subprocess.DEVNULL) as exporter, (
stdin=subprocess.DEVNULL,
stderr=subprocess.PIPE) as exporter, (
subprocess.Popen(import_cmd,
stdin=exporter.stdout)) as importer:
stdin=exporter.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)) as importer:
pass
if exporter.returncode or importer.returncode:
self.log.warning('Unable to export keys. If your key has a '
'passphrase, you might want to save it to a '
'file and use passphrase-file and '
'pinentry-mode loopback in gpg.conf')
'pinentry-mode loopback in gpg.conf.')
self.log.warning("Exporter output: %s", exporter.stderr)
self.log.warning("Importer output: %s %s",
importer.stdout, importer.stderr)
self.log.info('Subkey-only keyring %r created',
self.gnupghome)

Expand Down Expand Up @@ -1405,7 +1413,7 @@ def open_stdinout_connection(*,

write_transport, write_protocol = loop.run_until_complete(
loop.connect_write_pipe(
lambda: asyncio.streams.FlowControlMixin(loop),
lambda: StdoutWriterProtocol(loop),
sys.stdout.buffer))
writer = asyncio.StreamWriter(write_transport, write_protocol, None, loop)

Expand Down
76 changes: 76 additions & 0 deletions splitgpg2/stdiostream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#
# based on asyncio library:
# Copyright (C) 2001 Python Software Foundation
#
# Copyright (C) 2024 Marek Marczykowski-Górecki
# <marmarek@invisiblethingslab.com>
#

import collections
from asyncio import protocols, events, Future
from typing import Optional, Any

class StdoutWriterProtocol(protocols.Protocol):
"""Reusable flow control logic for StreamWriter.drain().
This implements the protocol methods pause_writing(),
resume_writing() and connection_lost(). If the subclass overrides
these it must call the super methods.
StreamWriter.drain() must wait for _drain_helper() coroutine.
"""

def __init__(self, loop: Optional[events.AbstractEventLoop] = None) -> None:
if loop is None:
self._loop = events.get_event_loop()
else:
self._loop = loop
self._paused = False
self._drain_waiters: collections.deque[Future[None]] = \
collections.deque()
self._connection_lost = False
self._closed = self._loop.create_future()

def pause_writing(self) -> None:
assert not self._paused
self._paused = True

def resume_writing(self) -> None:
assert self._paused
self._paused = False

for waiter in self._drain_waiters:
if not waiter.done():
waiter.set_result(None)

def connection_lost(self, exc: Optional[BaseException]) -> None:
self._connection_lost = True
# Wake up the writer(s) if currently paused.
if not self._paused:
return

for waiter in self._drain_waiters:
if not waiter.done():
if exc is None:
waiter.set_result(None)
else:
waiter.set_exception(exc)
if not self._closed.done():
if exc is None:
self._closed.set_result(None)
else:
self._closed.set_exception(exc)

async def _drain_helper(self) -> None:
if self._connection_lost:
raise ConnectionResetError('Connection lost')
if not self._paused:
return
waiter = self._loop.create_future()
self._drain_waiters.append(waiter)
try:
await waiter
finally:
self._drain_waiters.remove(waiter)

# pylint: disable=unused-argument
def _get_close_waiter(self, stream: Any) -> Future[None]:
return self._closed
Loading