This repository has been archived by the owner on Oct 2, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
httpsdate.py
executable file
·180 lines (149 loc) · 6.37 KB
/
httpsdate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/env python3
"""httpsdate.py is a script for secure time synchronisation"""
__version__ = '0.2.0'
import argparse
import os
import prctl
import pwd
import sys
import time
import urllib.request
from concurrent import futures
from datetime import datetime, timezone
from statistics import median
# Exit codes
E_PRIV = 3
E_NOTIME = 4
E_NOTENOUGHTIME = 5
E_LARGEOFFSET = 6
# Set a nicer title than 'python3'.
prctl.set_name('httpsdate.py')
def drop_privileges(user):
"""Drop all capabilities except CAP_SYS_TIME from the permitted and
the bounding set and switch to the given user.
:user: name of the user to switch to
:returns: nothing
"""
# Get the numeric UID and GID of the given user.
uid = pwd.getpwnam(user).pw_uid
gid = pwd.getpwnam(user).pw_gid
# Keep permitted capabilities when changing the UID.
prctl.securebits.keep_caps = True
# Limit the bounding set to CAP_SYS_TIME.
prctl.capbset.limit(prctl.CAP_SYS_TIME)
# Change user and group.
os.setgroups([gid])
os.setgid(gid)
os.setuid(uid)
# Limit the permitted set to CAP_SYS_TIME.
prctl.cap_permitted.limit(prctl.CAP_SYS_TIME)
# Activate CAP_SYS_TIME.
prctl.cap_effective.sys_time = True
# Disallow gaining new capabilities.
prctl.set_no_new_privs(1)
def get_date(host, timeout):
"""Send a HEAD request via HTTPS to the given host and return the
contents of the Date header.
:host: the FQDN to get the date from
:timeout: timeout in seconds
:returns: datetime object containing the data from the Date header
"""
try:
request = urllib.request.Request('https://{}/'.format(host),
method='HEAD')
request.add_header('User-Agent', 'httpsdate.py')
response = urllib.request.urlopen(request, timeout=timeout)
date = datetime.strptime(response.info()['Date'],
'%a, %d %b %Y %H:%M:%S GMT')
except Exception as e:
# Add the current host name to whatever exception is raised.
e.host = host
raise e
return date
parser = argparse.ArgumentParser(
description='Set the system clock to a date and time obtained from one or '
'more HTTPS servers. Needs to be run with CAP_SYS_TIME, but drops '
'unnecessary privileges when started as root.')
parser.add_argument('-n', '--dry-run', default=False, action='store_true',
help='do not actually set the system clock')
parser.add_argument('-u', '--user', default='nobody',
help='when started with high privileges, '
'run as this user instead (default: %(default)s)')
parser.add_argument('-t', '--timeout', metavar='seconds', type=int, default=3,
help='HTTPS network response timeout '
'(default: %(default)s seconds)')
parser.add_argument('--max-adjust', metavar='seconds', type=int,
help='do not change the clock more than this many seconds')
parser.add_argument('--max-failed', metavar='N', type=int,
help='do not change the clock if more than N servers '
'failed to send a usable date and time')
parser.add_argument('-q', '--quiet', default=False, action='store_true',
help='do not show warnings and adjustment information')
parser.add_argument('host', nargs='+',
help='get the time from the Date header of these '
'HTTPS servers')
args = parser.parse_args()
# Check if we have sufficient privileges.
if not prctl.cap_permitted.sys_time and not args.dry_run:
print('Insufficient privileges to set the clock.')
print('Please run as root or with CAP_SYS_TIME.')
sys.exit(E_PRIV)
# Check if we have more privileges than needed.
if prctl.cap_effective.setpcap:
# This actually only checks for one of the capabilities required to drop
# privileges. But having any capability besides CAP_SYS_TIME probably
# means that we have all of them.
drop_privileges(args.user)
# Get the time from all hosts in parallel.
with futures.ThreadPoolExecutor() as pool:
threads = [pool.submit(get_date, host, args.timeout) for host in args.host]
times = []
for f in futures.as_completed(threads):
try:
t = f.result()
except (urllib.error.HTTPError, urllib.error.URLError,
ValueError) as e:
if not args.quiet:
print('Warning: Could not get time from {host}: {error}'.
format(host=e.host, error=str(e)), file=sys.stderr)
continue
times.append(t)
succeeded = len(times)
failed = len(args.host) - len(times)
# Check that at least one host was usable.
if not succeeded:
print('Error: Could not get time from any host.', file=sys.stderr)
sys.exit(E_NOTIME)
# Check that no more than --max-failed hosts failed.
if (args.max_failed is not None and failed > args.max_failed):
print('Error: {} hosts failed. No more than {} are allowed.'.
format(failed, args.max_failed), file=sys.stderr)
sys.exit(E_NOTENOUGHTIME)
# Sort the times.
times.sort()
# Calculate the median of all received times.
# Since median() can not really handle datetime objects, we convert them
# to UNIX timestamps and convert the result back to datetime, this time
# adding timezone information.
new_time = (datetime.fromtimestamp(median([t.timestamp() for t in times])).
replace(tzinfo=timezone.utc))
now = datetime.now(tz=timezone.utc)
adjustment = new_time - now
interval = times[-1] - times[0]
# Check if the new time is close enough to the current time.
if args.max_adjust and args.max_adjust < abs(adjustment.total_seconds()):
print('Error: Offset between local and remote clock is {:.2f} seconds. '
'Only {} seconds are allowed.'.
format(adjustment.total_seconds(), args.max_adjust))
sys.exit(E_LARGEOFFSET)
if not args.quiet:
# Display a short summary about how much the time changes and
# how much the remote clocks agree.
print('Time adjustment: {:.2f} seconds'.format(adjustment.total_seconds()))
print('{} remote clocks returned usable time information, {} did not.'.
format(succeeded, failed))
if succeeded > 1:
print('Remote clocks deviate by {}'.format(interval))
if not args.dry_run:
# Actually set the system clock to the new time.
time.clock_settime(time.CLOCK_REALTIME, new_time.timestamp())