Skip to content

Commit 16c94c6

Browse files
committed
Add tests for proper termination
QubesOS/qubes-issues#9779
1 parent ce3d443 commit 16c94c6

File tree

2 files changed

+178
-0
lines changed

2 files changed

+178
-0
lines changed

debian/pybuild.testfiles

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
qubes.Gpg2.service

splitgpg2/test_termination.py

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
#!/usr/bin/python3
2+
#
3+
# Copyright (C) 2025 Simon Gaiser <simon@invisiblethingslab.com>
4+
#
5+
# This program is free software; you can redistribute it and/or modify
6+
# it under the terms of the GNU General Public License as published by
7+
# the Free Software Foundation; either version 2 of the License, or
8+
# (at your option) any later version.
9+
#
10+
# This program is distributed in the hope that it will be useful,
11+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
# GNU General Public License for more details.
14+
#
15+
# You should have received a copy of the GNU Lesser General Public License along
16+
# with this program; if not, see <http://www.gnu.org/licenses/>.
17+
18+
import unittest
19+
import tempfile
20+
import subprocess
21+
import os
22+
import re
23+
import struct
24+
25+
from typing import Dict, IO
26+
27+
28+
class DidNotTerminate(AssertionError):
29+
def __init__(self) -> None:
30+
super().__init__("splitgpg2 service did not terminate")
31+
32+
33+
# Test that the splitgpg2 service terminates itself as expected. IO happens
34+
# through stdin/-out when called from qrexec. This behaves a bit differently
35+
# than the Unix socket we use for other tests (for example on close). So
36+
# instead start the service script directly.
37+
class TC_Termination(unittest.TestCase):
38+
@staticmethod
39+
def path_prepend(env: Dict[str, str], name: str, value: str) -> None:
40+
if name in env:
41+
env[name] = ":".join([value, env[name]])
42+
else:
43+
env[name] = value
44+
45+
def setUp(self) -> None:
46+
super().setUp()
47+
48+
self.test_env = os.environ.copy()
49+
50+
self.tmp_dir = tempfile.TemporaryDirectory()
51+
52+
gpg_home = self.tmp_dir.name + "/gpg-home"
53+
self.test_env["GNUPGHOME"] = gpg_home
54+
os.mkdir(gpg_home, mode=0o700)
55+
56+
xdg_conf_dir = self.tmp_dir.name + "/xdg-config"
57+
os.mkdir(xdg_conf_dir)
58+
self.test_env["XDG_CONFIG_HOME"] = xdg_conf_dir
59+
60+
splitgpg2_conf_dir = xdg_conf_dir + "/qubes-split-gpg2"
61+
os.mkdir(splitgpg2_conf_dir)
62+
63+
with open(splitgpg2_conf_dir + "/qubes-split-gpg2.conf", "wb") as f:
64+
f.write(b"[DEFAULT]\nsource_keyring_dir = no\n")
65+
66+
path_dir = self.tmp_dir.name + "/path"
67+
os.mkdir(path_dir)
68+
self.path_prepend(self.test_env, "PATH", path_dir)
69+
70+
notify_path = path_dir + "/notify-send"
71+
with open(notify_path, "wb") as f:
72+
f.write(b"#!/bin/sh\n")
73+
os.chmod(notify_path, 0o755)
74+
75+
self.test_env["QREXEC_REMOTE_DOMAIN"] = "testvm"
76+
77+
top_dir = os.path.dirname(os.path.dirname(__file__))
78+
self.path_prepend(self.test_env, "PYTHONPATH", top_dir)
79+
80+
service_path = top_dir + "/qubes.Gpg2.service"
81+
82+
# pybuild copies us somewhere else and while you can specify extra
83+
# files in debian/pybuild.testfiles it executable bit when copying. So
84+
# fix it.
85+
if "PYBUILD_NAME" in os.environ:
86+
os.chmod(service_path, 0o755)
87+
88+
self.service = subprocess.Popen(
89+
[service_path],
90+
env=self.test_env,
91+
stdin=subprocess.PIPE,
92+
stdout=subprocess.PIPE,
93+
)
94+
95+
# Make mypy happy:
96+
assert self.service.stdin is not None
97+
self.service_stdin = self.service.stdin
98+
assert self.service.stdout is not None
99+
self.service_stdout = self.service.stdout
100+
101+
self.addCleanup(self.cleanup_service)
102+
103+
hi = self.read_line()
104+
self.assertTrue(re.match(rb"\AOK\s", hi))
105+
106+
def tearDown(self) -> None:
107+
subprocess.run(["gpgconf", "--kill", "gpg-agent"], env=self.test_env)
108+
self.tmp_dir.cleanup()
109+
super().tearDown()
110+
111+
def cleanup_service(self) -> None:
112+
self.service_stdin.close()
113+
self.service_stdout.close()
114+
self.service.kill()
115+
self.service.wait()
116+
117+
def expect_termination(self) -> int:
118+
try:
119+
return self.service.wait(2)
120+
except subprocess.TimeoutExpired:
121+
raise DidNotTerminate()
122+
123+
def write(self, d: bytes) -> None:
124+
self.service_stdin.write(d)
125+
self.service_stdin.flush()
126+
127+
def read_line(self) -> bytes:
128+
return self.service_stdout.readline()
129+
130+
def test_000_bye(self) -> None:
131+
self.write(b"GETINFO version\n")
132+
self.assertTrue(re.match(rb"\AD\s", self.read_line()))
133+
self.assertTrue(re.match(rb"\AOK\s", self.read_line()))
134+
135+
self.write(b"BYE\n")
136+
self.assertTrue(re.match(rb"\AOK\s", self.read_line()))
137+
138+
self.expect_termination()
139+
140+
def test_001_close(self) -> None:
141+
self.service_stdin.close()
142+
143+
self.expect_termination()
144+
145+
def test_002_filterd(self) -> None:
146+
self.write(b"GETINFO asdf\n")
147+
self.assertEqual(
148+
self.read_line(), b"ERR 67109888 Command filtered by split-gpg2.\n"
149+
)
150+
151+
self.expect_termination()
152+
153+
def test_003_agent_kill(self) -> None:
154+
self.write(b"GETINFO version\n")
155+
self.assertTrue(re.match(rb"\AD\s", self.read_line()))
156+
self.assertTrue(re.match(rb"\AOK\s", self.read_line()))
157+
158+
# Simulate a sudden exit of gpg-agent. (Forcefully killing it is hard,
159+
# since gpg-agent doesn't like to be started in the foreground. So for
160+
# now ask it to terminate itself.)
161+
subprocess.run(["gpgconf", "--kill", "gpg-agent"], env=self.test_env)
162+
163+
# We currently don't detect a disconnected agent until we try to
164+
# communicate with it. So we have to trigger it.
165+
self.write(b"GETINFO version\n")
166+
167+
self.expect_termination()
168+
169+
def test_004_test_self_test(self) -> None:
170+
# Test out test method. With no reason to terminate it should still be
171+
# running.
172+
self.write(b"GETINFO version\n")
173+
self.assertTrue(re.match(rb"\AD\s", self.read_line()))
174+
self.assertTrue(re.match(rb"\AOK\s", self.read_line()))
175+
176+
with self.assertRaises(DidNotTerminate):
177+
self.expect_termination()

0 commit comments

Comments
 (0)