27
27
typedef struct {
28
28
void * c ; /* original connection structure. still with source thread attached. */
29
29
int sfd ; /* client fd. */
30
- bipbuf_t * buf ; /* output buffer */
31
- char * cbuf ; /* current buffer */
30
+ int buflen ;
31
+ int bufused ;
32
+ char * buf ; /* output buffer */
32
33
} crawler_client_t ;
33
34
34
35
typedef struct _crawler_module_t crawler_module_t ;
@@ -86,7 +87,7 @@ crawler_module_reg_t *crawler_mod_regs[3] = {
86
87
& crawler_metadump_mod
87
88
};
88
89
89
- static int lru_crawler_client_getbuf (crawler_client_t * c );
90
+ static int lru_crawler_write (crawler_client_t * c );
90
91
crawler_module_t active_crawler_mod ;
91
92
enum crawler_run_type active_crawler_type ;
92
93
@@ -107,26 +108,34 @@ static void *storage;
107
108
108
109
/*** LRU CRAWLER THREAD ***/
109
110
110
- #define LRU_CRAWLER_WRITEBUF 8192
111
+ #define LRU_CRAWLER_MINBUFSPACE 8192
111
112
112
113
static void lru_crawler_close_client (crawler_client_t * c ) {
113
114
//fprintf(stderr, "CRAWLER: Closing client\n");
114
115
sidethread_conn_close (c -> c );
115
116
c -> c = NULL ;
116
- c -> cbuf = NULL ;
117
- bipbuf_free (c -> buf );
117
+ free (c -> buf );
118
118
c -> buf = NULL ;
119
119
}
120
120
121
121
static void lru_crawler_release_client (crawler_client_t * c ) {
122
122
//fprintf(stderr, "CRAWLER: Closing client\n");
123
123
redispatch_conn (c -> c );
124
124
c -> c = NULL ;
125
- c -> cbuf = NULL ;
126
- bipbuf_free (c -> buf );
125
+ free (c -> buf );
127
126
c -> buf = NULL ;
128
127
}
129
128
129
+ static int lru_crawler_expand_buf (crawler_client_t * c ) {
130
+ c -> buflen *= 2 ;
131
+ char * nb = realloc (c -> buf , c -> buflen );
132
+ if (nb == NULL ) {
133
+ return -1 ;
134
+ }
135
+ c -> buf = nb ;
136
+ return 0 ;
137
+ }
138
+
130
139
static int crawler_expired_init (crawler_module_t * cm , void * data ) {
131
140
struct crawler_expired_data * d ;
132
141
if (data != NULL ) {
@@ -236,7 +245,6 @@ static void crawler_expired_eval(crawler_module_t *cm, item *search, uint32_t hv
236
245
}
237
246
238
247
static void crawler_metadump_eval (crawler_module_t * cm , item * it , uint32_t hv , int i ) {
239
- //int slab_id = CLEAR_LRU(i);
240
248
char keybuf [KEY_MAX_URI_ENCODED_LENGTH ];
241
249
int is_flushed = item_is_flushed (it );
242
250
/* Ignore expired content. */
@@ -247,7 +255,7 @@ static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, i
247
255
}
248
256
// TODO: uriencode directly into the buffer.
249
257
uriencode (ITEM_key (it ), keybuf , it -> nkey , KEY_MAX_URI_ENCODED_LENGTH );
250
- int total = snprintf (cm -> c .cbuf , 4096 ,
258
+ int total = snprintf (cm -> c .buf + cm -> c . bufused , 4096 ,
251
259
"key=%s exp=%ld la=%llu cas=%llu fetch=%s cls=%u size=%lu\n" ,
252
260
keybuf ,
253
261
(it -> exptime == 0 ) ? -1 : (long )(it -> exptime + process_started ),
@@ -257,53 +265,61 @@ static void crawler_metadump_eval(crawler_module_t *cm, item *it, uint32_t hv, i
257
265
ITEM_clsid (it ),
258
266
(unsigned long ) ITEM_ntotal (it ));
259
267
refcount_decr (it );
260
- // TODO: some way of tracking the errors. these are very unlikely though.
261
- if (total >= LRU_CRAWLER_WRITEBUF - 1 || total <= 0 ) {
262
- /* Failed to write, don't push it. */
268
+ // TODO: some way of tracking the errors. these should be impossible given
269
+ // the space requirements.
270
+ if (total >= LRU_CRAWLER_MINBUFSPACE - 1 || total <= 0 ) {
271
+ // Failed to write, don't push it.
263
272
return ;
264
273
}
265
- bipbuf_push ( cm -> c .buf , total ) ;
274
+ cm -> c .bufused += total ;
266
275
}
267
276
268
277
static void crawler_metadump_finalize (crawler_module_t * cm ) {
269
278
if (cm -> c .c != NULL ) {
270
- // Ensure space for final message.
271
- lru_crawler_client_getbuf (& cm -> c );
272
- memcpy (cm -> c .cbuf , "END\r\n" , 5 );
273
- bipbuf_push (cm -> c .buf , 5 );
279
+ lru_crawler_write (& cm -> c ); // empty the write buffer
280
+ memcpy (cm -> c .buf , "END\r\n" , 5 );
281
+ cm -> c .bufused += 5 ;
274
282
}
275
283
}
276
284
277
- static int lru_crawler_poll (crawler_client_t * c ) {
278
- unsigned char * data ;
279
- unsigned int data_size = 0 ;
285
+ // write the whole buffer out to the client socket.
286
+ static int lru_crawler_write (crawler_client_t * c ) {
287
+ unsigned int data_size = c -> bufused ;
288
+ unsigned int sent = 0 ;
280
289
struct pollfd to_poll [1 ];
281
290
to_poll [0 ].fd = c -> sfd ;
282
291
to_poll [0 ].events = POLLOUT ;
283
292
284
- int ret = poll (to_poll , 1 , 1000 );
285
-
286
- if (ret < 0 ) {
287
- // fatal.
288
- return -1 ;
289
- }
293
+ if (c -> c == NULL ) return -1 ;
294
+ if (data_size == 0 ) return 0 ;
290
295
291
- if (ret == 0 ) return 0 ;
296
+ while (sent < data_size ) {
297
+ int ret = poll (to_poll , 1 , 1000 );
292
298
293
- if (to_poll [0 ].revents & POLLIN ) {
294
- char buf [1 ];
295
- int res = ((conn * )c -> c )-> read (c -> c , buf , 1 );
296
- if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK ))) {
297
- lru_crawler_close_client (c );
299
+ if (ret < 0 ) {
300
+ // fatal.
298
301
return -1 ;
299
302
}
300
- }
301
- if ((data = bipbuf_peek_all (c -> buf , & data_size )) != NULL ) {
303
+
304
+ if (ret == 0 ) return 0 ;
305
+
306
+ // check if socket was closed on us.
307
+ if (to_poll [0 ].revents & POLLIN ) {
308
+ char buf [1 ];
309
+ int res = ((conn * )c -> c )-> read (c -> c , buf , 1 );
310
+ if (res == 0 || (res == -1 && (errno != EAGAIN && errno != EWOULDBLOCK ))) {
311
+ lru_crawler_close_client (c );
312
+ return -1 ;
313
+ }
314
+ }
315
+
302
316
if (to_poll [0 ].revents & (POLLHUP |POLLERR )) {
317
+ // got socket hangup.
303
318
lru_crawler_close_client (c );
304
319
return -1 ;
305
320
} else if (to_poll [0 ].revents & POLLOUT ) {
306
- int total = ((conn * )c -> c )-> write (c -> c , data , data_size );
321
+ // socket is writeable.
322
+ int total = ((conn * )c -> c )-> write (c -> c , c -> buf + sent , data_size - sent );
307
323
if (total == -1 ) {
308
324
if (errno != EAGAIN && errno != EWOULDBLOCK ) {
309
325
lru_crawler_close_client (c );
@@ -312,29 +328,14 @@ static int lru_crawler_poll(crawler_client_t *c) {
312
328
} else if (total == 0 ) {
313
329
lru_crawler_close_client (c );
314
330
return -1 ;
315
- } else {
316
- bipbuf_poll (c -> buf , total );
317
331
}
332
+ sent += total ;
318
333
}
319
- }
320
- return 0 ;
321
- }
334
+ } // while
322
335
323
- /* Grab some space to work with, if none exists, run the poll() loop and wait
324
- * for it to clear up or close.
325
- * Return NULL if closed.
326
- */
327
- static int lru_crawler_client_getbuf (crawler_client_t * c ) {
328
- void * buf = NULL ;
329
- if (c -> c == NULL ) return -1 ;
330
- /* not enough space. */
331
- while ((buf = bipbuf_request (c -> buf , LRU_CRAWLER_WRITEBUF )) == NULL ) {
332
- // TODO: max loops before closing.
333
- int ret = lru_crawler_poll (c );
334
- if (ret < 0 ) return ret ;
335
- }
336
+ // write buffer now empty.
337
+ c -> bufused = 0 ;
336
338
337
- c -> cbuf = buf ;
338
339
return 0 ;
339
340
}
340
341
@@ -349,22 +350,39 @@ static void lru_crawler_class_done(int i) {
349
350
active_crawler_mod .mod -> doneclass (& active_crawler_mod , i );
350
351
}
351
352
353
+ // ensure we build the buffer a little bit to cut down on poll/write syscalls.
354
+ #define MIN_ITEMS_PER_WRITE 16
352
355
static void item_crawl_hash (void ) {
353
356
// get iterator from assoc. can hang for a long time.
354
357
// - blocks hash expansion
355
358
void * iter = assoc_get_iterator ();
356
359
int crawls_persleep = settings .crawls_persleep ;
357
360
item * it = NULL ;
361
+ int items = 0 ;
358
362
359
363
// loop while iterator returns something
360
364
// - iterator func handles bucket-walking
361
365
// - iterator returns with bucket locked.
362
366
while (assoc_iterate (iter , & it )) {
363
367
// if iterator returns true but no item, we're inbetween buckets and
364
- // can do sleep or cleanup work without holding a lock.
368
+ // can do cleanup work without holding an item lock.
365
369
if (it == NULL ) {
370
+ if (active_crawler_mod .c .c != NULL ) {
371
+ if (items > MIN_ITEMS_PER_WRITE ) {
372
+ int ret = lru_crawler_write (& active_crawler_mod .c );
373
+ items = 0 ;
374
+ if (ret != 0 ) {
375
+ // fail out and finalize.
376
+ break ;
377
+ }
378
+ }
379
+ } else if (active_crawler_mod .mod -> needs_client ) {
380
+ // fail out and finalize.
381
+ break ;
382
+ }
383
+
366
384
// - sleep bits from orig loop
367
- if (crawls_persleep -- <= 0 && settings .lru_crawler_sleep ) {
385
+ if (crawls_persleep <= 0 && settings .lru_crawler_sleep ) {
368
386
pthread_mutex_unlock (& lru_crawler_lock );
369
387
usleep (settings .lru_crawler_sleep );
370
388
pthread_mutex_lock (& lru_crawler_lock );
@@ -377,27 +395,29 @@ static void item_crawl_hash(void) {
377
395
continue ;
378
396
}
379
397
380
- /* Get memory from bipbuf, if client has no space, flush. */
381
- if (active_crawler_mod .c .c != NULL ) {
382
- int ret = lru_crawler_client_getbuf (& active_crawler_mod .c );
383
- if (ret != 0 ) {
384
- // fail out and finalize.
385
- break ;
386
- }
387
- } else if (active_crawler_mod .mod -> needs_client ) {
388
- // fail out and finalize.
389
- break ;
390
- }
391
-
392
398
// double check that the item isn't in a transitional state.
393
399
if (refcount_incr (it ) < 2 ) {
394
400
refcount_decr (it );
395
401
continue ;
396
402
}
397
403
404
+ // We're presently holding an item lock, so we cannot flush the
405
+ // buffer to the network socket as the syscall is both slow and could
406
+ // hang waiting for POLLOUT. Instead we must expand the buffer.
407
+ if (active_crawler_mod .c .c != NULL ) {
408
+ crawler_client_t * c = & active_crawler_mod .c ;
409
+ if (c -> buflen - c -> bufused < LRU_CRAWLER_MINBUFSPACE ) {
410
+ if (lru_crawler_expand_buf (c ) != 0 ) {
411
+ // failed to expand buffer, stop.
412
+ break ;
413
+ }
414
+ }
415
+ }
398
416
// FIXME: missing hv and i are fine for metadump eval, but not fine
399
417
// for expire eval.
400
418
active_crawler_mod .mod -> eval (& active_crawler_mod , it , 0 , 0 );
419
+ crawls_persleep -- ;
420
+ items ++ ;
401
421
}
402
422
403
423
// must finalize or we leave the hash table expansion blocked.
@@ -430,12 +450,14 @@ static void *item_crawler_thread(void *arg) {
430
450
continue ;
431
451
}
432
452
433
- /* Get memory from bipbuf, if client has no space, flush. */
434
453
if (active_crawler_mod .c .c != NULL ) {
435
- int ret = lru_crawler_client_getbuf (& active_crawler_mod .c );
436
- if (ret != 0 ) {
437
- lru_crawler_class_done (i );
438
- continue ;
454
+ crawler_client_t * c = & active_crawler_mod .c ;
455
+ if (c -> buflen - c -> bufused < LRU_CRAWLER_MINBUFSPACE ) {
456
+ int ret = lru_crawler_write (c );
457
+ if (ret != 0 ) {
458
+ lru_crawler_class_done (i );
459
+ continue ;
460
+ }
439
461
}
440
462
} else if (active_crawler_mod .mod -> needs_client ) {
441
463
lru_crawler_class_done (i );
@@ -500,8 +522,8 @@ static void *item_crawler_thread(void *arg) {
500
522
if (active_crawler_mod .mod != NULL ) {
501
523
if (active_crawler_mod .mod -> finalize != NULL )
502
524
active_crawler_mod .mod -> finalize (& active_crawler_mod );
503
- while (active_crawler_mod .c .c != NULL && bipbuf_used ( active_crawler_mod .c .buf ) ) {
504
- lru_crawler_poll (& active_crawler_mod .c );
525
+ while (active_crawler_mod .c .c != NULL && active_crawler_mod .c .bufused != 0 ) {
526
+ lru_crawler_write (& active_crawler_mod .c );
505
527
}
506
528
// Double checking in case the client closed during the poll
507
529
if (active_crawler_mod .c .c != NULL ) {
@@ -626,10 +648,14 @@ static int lru_crawler_set_client(crawler_module_t *cm, void *c, const int sfd)
626
648
crawlc -> c = c ;
627
649
crawlc -> sfd = sfd ;
628
650
629
- crawlc -> buf = bipbuf_new (1024 * 128 );
651
+ size_t size = LRU_CRAWLER_MINBUFSPACE * 16 ;
652
+ crawlc -> buf = malloc (size );
653
+
630
654
if (crawlc -> buf == NULL ) {
631
655
return -2 ;
632
656
}
657
+ crawlc -> buflen = size ;
658
+ crawlc -> bufused = 0 ;
633
659
return 0 ;
634
660
}
635
661
0 commit comments