-
Notifications
You must be signed in to change notification settings - Fork 2
/
mownplow.py
executable file
·333 lines (275 loc) · 11.2 KB
/
mownplow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
#!/usr/bin/env python3
"""
Mow'n'Plow MK2.
An efficient Chia plot mower and mover.
Author: Graeme Seaton <graemes@graemes.com>
SPDX-License-Identifier: GPL-3.0-or-later
Based on 'The Plow' by Luke Macken <phorex@protonmail.com> @ https://github.com/lmacken/plow
Feel free to buy me a drink (only if you want to :)):
xch1vlnelz9ef43z3xa4x6a3zzfm7cezwvmq332p97xlflmxxcgzdrpsqamyee
"""
import asyncio
import logging
import queue
from datetime import datetime
from pathlib import Path
import aiohttp
import aionotify
import asyncssh
from mownplow.config import Config
from mownplow.harvester import HarvesterCert, HarvesterRequest
from mownplow.destman import DestMan
from mownplow.scheduler import PlowScheduler
from mownplow.ssh import SSHClient
#####
# System variables - don't change unless you know what you are doing.
#####
# Short & long sleep durations upon various error conditions
SLEEP_FOR = 60 * 3
SLEEP_FOR_LONG = 60 * 20
#####
# Utility functions / classes
#####
async def get_dest_dirs(config: Config) -> list:
dest_dirs = config.dest_dirs
if config.dest_dirs is None or not config.dest_dirs:
dest_dirs = []
ssh_conn = SSHClient(
config.dest_host, config.dest_username, config.ssh_private_key_path
)
await ssh_conn.connect()
logging.debug(f"Destination root: {config.dest_root}")
dest_mounts_scr = (
"mount | grep " + config.dest_root + " | awk '{ print $3 }' | sort"
)
remote_result = await ssh_conn.run_command(dest_mounts_scr)
if remote_result.returncode != 0:
logging.error(
f"⁉️ {dest_mounts_scr!r} exited with {remote_result.returncode}"
)
return dest_dirs
dest_candidates = remote_result.stdout.splitlines()
logging.info(f"Found destination mounts:\n {dest_candidates}")
for dest_dir in dest_candidates:
logging.debug(f"Evaluating {dest_dir}")
dest_dirs.append(Path(dest_dir).name)
dest_dirs.sort()
config.update_dest_dirs(dest_dirs)
await ssh_conn.close()
return dest_dirs
#####
# This is where the magic happens
#####
async def plotfinder(paths: list, plot_queue: queue, loop):
for path in paths:
for plot in Path(path).glob("**/*.plot"):
await plot_queue.put(plot)
await plotwatcher(paths, plot_queue, loop)
async def plotwatcher(paths: list, plot_queue: queue, loop):
watcher = aionotify.Watcher()
for path in paths:
if not Path(path).exists():
logging.info(f"! Path does not exist: {path}")
continue
watcher.watch(
alias=path,
path=path,
flags=aionotify.Flags.MOVED_TO,
)
logging.info(f"Watching {path}")
await watcher.setup(loop)
while True:
event = await watcher.get_event()
logging.info(event)
if event.name.endswith(".plot"):
plot_path = Path(event.alias) / event.name
await plot_queue.put(plot_path)
await asyncio.sleep(0)
async def plow(
config: Config, dest_dir, plot_queue, dest_schedule, harvester_cert, loop
):
# Plow initialisation
ssh_conn = SSHClient(
config.dest_host, config.dest_username, config.ssh_private_key_path
)
await ssh_conn.connect()
destman = DestMan(config, dest_dir, ssh_conn)
if not await destman.init_scripts():
logging.info(f"Unable to initialise scripts for {dest_dir}")
return
harvester_req = None
if not config.farm_during_plow:
harvester_req = HarvesterRequest(
config.harvester_host, config.harvester_port, harvester_cert
)
currently_farming_dest = not config.farm_during_plow
incremental_remove = config.replot
# Work loop
logging.info(f"🧑🌾 plowing to {destman.dest}")
while True:
try:
logging.debug(f"{destman.dest} waiting for plot")
plot = await plot_queue.get()
current_priority = dest_schedule.get_current_priority()
logging.debug(f"Current dest priority: {current_priority}")
if current_priority == dest_dir:
dest_schedule.remove_current_priority()
plot_size = plot.stat().st_size
plot_size_KB = int((plot_size) / (1024))
# Remove from farm only when actually starting
# to plow to this destination
if currently_farming_dest:
logging.info(f"Removing {destman.virtual_dest} from farming")
await harvester_req.remove_plot_directory(destman.virtual_dest)
currently_farming_dest = False
if incremental_remove and config.remove_all_replots:
# await asyncio.sleep(0)
logging.info(f"Removing all matching replots on {destman.dest}")
remove_success = await destman.remove_all_replots()
if not remove_success:
await plot_queue.put(plot)
break
incremental_remove = False
await asyncio.sleep(5)
if incremental_remove:
remove_success = await destman.remove_next_replot(plot_size_KB)
if not remove_success:
await plot_queue.put(plot)
break
await asyncio.sleep(0)
# Treat all destinations as remote (even if local)
dest_free = await destman.get_dest_free_space()
if not dest_free:
await plot_queue.put(plot)
break
if dest_free > plot_size_KB:
logging.info(
f"✅ Destination {destman.dest} has {int(dest_free/(1024*1024))}GiB free"
)
else:
logging.info(f"❎ Destination {destman.dest} is full")
await plot_queue.put(plot)
# Just quit the worker entirely for this destination.
break
logging.info(
f"🚜 {plot} ➡️ {destman.dest} - {int(plot_size_KB/(1024*1024))}GiB"
)
rsync_cmd = (
f"{config.rsync_cmd} {config.rsync_flags} {plot} {destman.dest}"
)
# Now rsync the real plot
proc = await asyncio.create_subprocess_shell(
rsync_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
start = datetime.now()
stdout, stderr = await proc.communicate()
finish = datetime.now()
if proc.returncode == 0:
logging.info(f"🏁 {rsync_cmd} ({finish - start})")
logging.debug(f"Adding {dest_dir} back to schedule")
dest_schedule.add_dest_to_q(dest_dir)
await asyncio.sleep(1)
elif proc.returncode == 10: # Error in socket I/O
# Retry later.
logging.warning(
f"⁉️ {rsync_cmd!r} exited with {proc.returncode} (error in socket I/O)"
)
await plot_queue.put(plot)
await asyncio.sleep(SLEEP_FOR_LONG)
elif proc.returncode in (11, 23): # Error in file I/O
# Most likely a full drive.
logging.error(
f"⁉️ {rsync_cmd!r} exited with {proc.returncode} (error in file I/O)"
)
dest_schedule.rem_dest_from_priorities(dest_dir)
await plot_queue.put(plot)
logging.error(f"{destman.dest} plow exiting")
break
else:
logging.info(f"⁉️ {rsync_cmd!r} exited with {proc.returncode}")
await asyncio.sleep(SLEEP_FOR)
dest_schedule.rem_dest_from_priorities(dest_dir)
await plot_queue.put(plot)
logging.error(f"{destman.dest} plow exiting")
break
if stdout:
output = stdout.decode().strip()
if output:
logging.info(f"{stdout.decode()}")
if stderr:
logging.warning(f"⁉️ {stderr.decode()}")
else:
# logging.info(f"Skipping {dest} for now")
await plot_queue.put(plot)
await asyncio.sleep(5)
except Exception as e:
logging.error(f"! {e}")
# Sync destination path before completion
await destman.sync_dest_mount_path()
await asyncio.sleep(5)
if not currently_farming_dest:
logging.info(f"Adding {destman.virtual_dest} back to farm")
await harvester_req.add_plot_directory(destman.virtual_dest)
currently_farming_dest = True
await ssh_conn.close()
await asyncio.sleep(5)
async def main(config, loop):
plot_queue = asyncio.Queue()
dest_schedule = PlowScheduler()
plow_tasks = []
harvester_cert = None
if not config.farm_during_plow:
harvester_cert = HarvesterCert(
config.dest_host,
config.dest_username,
config.ssh_private_key_path,
config.harvester_cacert_path,
config.harvester_cert_path,
config.harvester_key_path,
)
try:
await harvester_cert.retrieve_cert_and_key()
except (OSError, asyncssh.Error) as e:
raise Exception(f"Failed to retrieve cert and key files: {e}") from e
logging.info("🌱 Mow'n'Plow running...")
create_dests = asyncio.create_task(get_dest_dirs(config))
dest_dirs = await create_dests
logging.debug(f"Destinations: {dest_dirs}")
# Watch for new plots
plow_tasks.append(asyncio.create_task(plotfinder(config.sources, plot_queue, loop)))
# Fire up a worker for each destination
priority = 1
for dest_dir in dest_dirs:
dest_schedule.add_dest_priority(dest_dir, priority)
plow_tasks.append(
asyncio.create_task(
plow(config, dest_dir, plot_queue, dest_schedule, harvester_cert, loop)
)
)
await asyncio.sleep(0.5)
priority = priority + 1
# Once all of the destinations are complete (probably full) then
# plotfinder is the last task running
while len(plow_tasks) > 1:
done, plow_tasks = await asyncio.wait(
plow_tasks, return_when=asyncio.FIRST_COMPLETED
)
plow_tasks.pop().cancel()
await asyncio.sleep(0.5)
logging.info("🌱 Plow destinations complete...")
if __name__ == "__main__":
config = Config("config.yaml")
logging.basicConfig(
format="%(asctime)s %(levelname)-2s %(message)s",
level=config.logging,
datefmt="%Y-%m-%d %H:%M:%S",
force=True,
)
logging.getLogger("asyncssh").setLevel(logging.WARNING)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(config, loop))
except KeyboardInterrupt:
pass