forked from internetarchive/openlibrary
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsolr_updater.py
372 lines (308 loc) · 12 KB
/
solr_updater.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
"""New script to handle solr updates.
Author: Anand Chitipothu
Changes:
2013-02-25: First version
2018-02-11: Use newer config method
"""
from typing import Union
from collections.abc import Iterator
import _init_path # noqa: F401 Imported for its side effect of setting PYTHONPATH
import logging
import json
import datetime
import time
import web
import sys
import re
import socket
import urllib
from openlibrary.solr import update_work
from openlibrary.config import load_config
from infogami import config
from openlibrary.solr.update_work import CommitRequest
logger = logging.getLogger("openlibrary.solr-updater")
# FIXME: Some kind of hack introduced to work around DB connectivity issue
args: dict = {}
def read_state_file(path, initial_state: str = None):
try:
return open(path).read()
except OSError:
logger.error(
"State file %s is not found. Reading log from the beginning of today", path
)
return initial_state or f"{datetime.date.today().isoformat()}:0"
def get_default_offset():
return datetime.date.today().isoformat() + ":0"
class InfobaseLog:
def __init__(self, hostname: str, exclude: str = None):
"""
:param str hostname:
:param str|None exclude: if specified, excludes records that include the string
"""
self.base_url = 'http://%s/openlibrary.org/log' % hostname
self.offset = get_default_offset()
self.exclude = exclude
def tell(self):
return self.offset
def seek(self, offset):
self.offset = offset.strip()
def read_records(self, max_fetches=10):
"""Reads all the available log records from the server."""
for i in range(max_fetches):
url = f"{self.base_url}/{self.offset}?limit=100"
logger.debug("Reading log from %s", url)
try:
jsontext = urllib.request.urlopen(url).read()
except urllib.error.URLError as e:
logger.error("Failed to open URL %s", url, exc_info=True)
if e.args and e.args[0].args == (111, 'Connection refused'):
logger.error(
'make sure infogami server is working, connection refused from %s',
url,
)
sys.exit(1)
raise
try:
d = json.loads(jsontext)
except:
logger.error("Bad JSON: %s", jsontext)
raise
data = d['data']
# no more data is available
if not data:
logger.debug("no more records found")
# There's an infobase bug where we'll sometimes get 0 items, but the
# binary offset will have incremented...?
if 'offset' in d:
# There's _another_ infobase bug where if you query a future date,
# it'll return back 2020-12-01. To avoid solrupdater getting stuck
# in a loop, only update the offset if it's newer than the current
old_day, old_boffset = self.offset.split(':')
old_boffset = int(old_boffset)
new_day, new_boffset = d['offset'].split(':')
new_boffset = int(new_boffset)
if new_day >= old_day and new_boffset >= old_boffset:
self.offset = d['offset']
return
for record in data:
if self.exclude and self.exclude in json.dumps(record):
continue
yield record
self.offset = d['offset']
def find_keys(d: Union[dict, list]) -> Iterator[str]:
"""
Find any keys in the given dict or list.
>>> list(find_keys({'key': 'foo'}))
['foo']
>>> list(find_keys([{}, {'key': 'bar'}]))
['bar']
>>> list(find_keys([{'key': 'blue'}, {'key': 'bar'}]))
['blue', 'bar']
>>> list(find_keys({'title': 'foo'}))
[]
>>> list(find_keys({ 'works': [ {'key': 'foo'} ] }))
['foo']
>>> list(find_keys({ 'authors': [ { 'author': {'key': 'foo'} } ] }))
['foo']
"""
if isinstance(d, dict):
if 'key' in d:
yield d['key']
for val in d.values():
yield from find_keys(val)
elif isinstance(d, list):
for val in d:
yield from find_keys(val)
else:
# All other types are not recursed
return
def parse_log(records, load_ia_scans: bool):
for rec in records:
action = rec.get('action')
if action in ('save', 'save_many'):
changeset = rec['data'].get('changeset', {})
old_docs = changeset.get('old_docs', [])
new_docs = changeset.get('docs', [])
for before, after in zip(old_docs, new_docs):
yield after['key']
# before is None if the item is new
if before:
before_keys = set(find_keys(before))
after_keys = set(find_keys(after))
# If a key was changed or was removed, the previous keys
# also need to be updated
yield from before_keys - after_keys
elif action == 'store.put':
# A sample record looks like this:
# {
# "action": "store.put",
# "timestamp": "2011-12-01T00:00:44.241604",
# "data": {
# "data": {"borrowed": "false", "_key": "ebooks/books/OL5854888M", "_rev": "975708", "type": "ebook", "book_key": "/books/OL5854888M"},
# "key": "ebooks/books/OL5854888M"
# },
# "site": "openlibrary.org"
# }
data = rec.get('data', {}).get("data", {})
key = data.get("_key", "")
if data.get("type") == "ebook" and key.startswith("ebooks/books/"):
edition_key = data.get('book_key')
if edition_key:
yield edition_key
elif (
load_ia_scans
and data.get("type") == "ia-scan"
and key.startswith("ia-scan/")
):
identifier = data.get('identifier')
if identifier and is_allowed_itemid(identifier):
yield "/books/ia:" + identifier
# Hack to force updating something from admin interface
# The admin interface writes the keys to update to a document named
# 'solr-force-update' in the store and whatever keys are written to that
# are picked by this script
elif key == 'solr-force-update':
keys = data.get('keys')
yield from keys
elif action == 'store.delete':
key = rec.get("data", {}).get("key")
# An ia-scan key is deleted when that book is deleted/darked from IA.
# Delete it from OL solr by updating that key
if key.startswith("ia-scan/"):
ol_key = "/works/ia:" + key.split("/")[-1]
yield ol_key
def is_allowed_itemid(identifier):
if not re.match("^[a-zA-Z0-9_.-]*$", identifier):
return False
# items starts with these prefixes are not books. Ignore them.
ignore_prefixes = config.get("ia_ignore_prefixes", [])
for prefix in ignore_prefixes:
if identifier.startswith(prefix):
return False
return True
async def update_keys(keys):
if not keys:
return 0
# FIXME: Some kind of hack introduced to work around DB connectivity issue
global args
logger.debug("Args: %s" % str(args))
update_work.load_configs(args['ol_url'], args['ol_config'], 'default')
keys = [
k
for k in keys
if k.count("/") == 2 and k.split("/")[1] in ("books", "authors", "works")
]
count = 0
for chunk in web.group(keys, 100):
chunk = list(chunk)
count += len(chunk)
await update_work.do_updates(chunk)
# Caches should not persist between different calls to update_keys!
update_work.data_provider.clear_cache()
if count:
logger.info("updated %d documents", count)
return count
class Solr:
def __init__(self):
self.reset()
def reset(self):
self.total_docs = 0
self.t_start = time.time()
def commit(self, ndocs):
"""Performs solr commit only if there are sufficient number
of documents or enough time has been passed since last commit.
"""
self.total_docs += ndocs
# no documents to commit
if not self.total_docs:
return
dt = time.time() - self.t_start
if self.total_docs > 100 or dt > 60:
logger.info(
"doing solr commit (%d docs updated, last commit was %0.1f seconds ago)",
self.total_docs,
dt,
)
self._solr_commit()
self.reset()
else:
logger.debug(
"skipping solr commit (%d docs updated, last commit was %0.1f seconds ago)",
self.total_docs,
dt,
)
def _solr_commit(self):
logger.info("BEGIN commit")
update_work.solr_update([CommitRequest()])
logger.info("END commit")
async def main(
ol_config: str,
debugger: bool = False,
state_file: str = 'solr-update.state',
exclude_edits_containing: str = None,
ol_url='http://openlibrary.org/',
solr_url: str = None,
solr_next: bool = False,
socket_timeout: int = 10,
load_ia_scans: bool = False,
commit: bool = True,
initial_state: str = None,
):
"""
:param debugger: Wait for a debugger to attach before beginning
:param exclude_edits_containing: Don't index matching edits
:param solr_url: If wanting to override what's in the config file
:param solr_next: Whether to assume new schema/etc are used
:param initial_state: State to use if state file doesn't exist. Defaults to today.
"""
FORMAT = "%(asctime)-15s %(levelname)s %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)
logger.info("BEGIN solr_updater")
if debugger:
import debugpy
logger.info("Enabling debugger attachment (attach if it hangs here)")
debugpy.listen(address=('0.0.0.0', 3000))
logger.info("Waiting for debugger to attach...")
debugpy.wait_for_client()
logger.info("Debugger attached to port 3000")
# Sometimes archive.org requests blocks forever.
# Setting a timeout will make the request fail instead of waiting forever.
socket.setdefaulttimeout(socket_timeout)
# set OL URL when running on a dev-instance
if ol_url:
host = web.lstrips(ol_url, "http://").strip("/")
update_work.set_query_host(host)
if solr_url:
update_work.set_solr_base_url(solr_url)
update_work.set_solr_next(solr_next)
logger.info("loading config from %s", ol_config)
load_config(ol_config)
offset = read_state_file(state_file, initial_state)
logfile = InfobaseLog(
config.get('infobase_server'), exclude=exclude_edits_containing
)
logfile.seek(offset)
solr = Solr()
while True:
records = logfile.read_records()
keys = parse_log(records, load_ia_scans)
count = await update_keys(keys)
if logfile.tell() != offset:
offset = logfile.tell()
logger.info("saving offset %s", offset)
with open(state_file, "w") as f:
f.write(offset)
if commit:
solr.commit(ndocs=count)
else:
logger.info("not doing solr commit as commit is off")
# don't sleep after committing some records.
# While the commit was on, some more edits might have happened.
if count == 0:
logger.debug("No more log records available, sleeping...")
time.sleep(5)
if __name__ == "__main__":
from scripts.solr_builder.solr_builder.fn_to_cli import FnToCLI
cli = FnToCLI(main)
args = cli.args_dict()
cli.run()