forked from antirez/disque-module
-
Notifications
You must be signed in to change notification settings - Fork 0
/
job.c
1439 lines (1281 loc) · 54.2 KB
/
job.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* Jobs handling and commands.
*
* Copyright (c) 2014-2019, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved. This code is under the AGPL license, check the
* LICENSE file for more info. */
#include "disque.h"
/* ------------------------- Low level jobs functions ----------------------- */
/* Generate a new Job ID and writes it to the string pointed by 'id'
* (NOT including a null term), that must be JOB_ID_LEN or more.
*
* An ID is 40 bytes string composed as such:
*
* +--+-----------------+-+--------------------- --------+-+-----+
* |D-| 8 bytes Node ID |-| 144-bit ID (base64: 24 bytes)|-| TTL |
* +--+-----------------+-+------------------------------+-+-----+
*
* "D-" is just a fixed string. All Disque job IDs start with this
* two bytes.
*
* Node ID is the first 8 bytes of the hexadecimal Node ID where the
* message was created. The main use for this is that a consumer receiving
* messages from a given queue can collect stats about where the producers
* are connected, and switch to improve the cluster efficiency.
*
* The 144 bit ID is the unique message ID, encoded in base 64 with
* the standard charset "A-Za-z0-9+/".
*
* The TTL is a big endian 16 bit unsigned number ceiled to 2^16-1
* if greater than that, and is only used in order to expire ACKs
* when the job is no longer avaialbe. It represents the TTL of the
* original job in *minutes*, not seconds, and is encoded in as a
* 4 digits hexadecimal number.
*
* The TTL is even if the job retry value is 0 (at most once jobs),
* otherwise is odd, so the actual precision of the value is 2 minutes.
* This is useful since the receiver of an ACKJOB command can avoid
* creating a "dummy ack" for unknown job IDs for at most once jobs.
*/
void generateJobID(char *id, int ttl, int retry) {
char *b64cset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
char *hexcset = "0123456789abcdef";
SHA1_CTX ctx;
unsigned char ttlbytes[2], hash[20];
int j;
static uint64_t counter;
/* Get the pseudo random bytes using SHA1 in counter mode. */
counter++;
SHA1Init(&ctx);
SHA1Update(&ctx,(unsigned char*)JobIDSeed,sizeof(JobIDSeed));
SHA1Update(&ctx,(unsigned char*)&counter,sizeof(counter));
SHA1Final(hash,&ctx);
ttl /= 60; /* Store TTL in minutes. */
if (ttl > 65535) ttl = 65535;
if (ttl < 0) ttl = 1;
/* Force the TTL to be odd if retry > 0, even if retry == 0. */
ttl = (retry > 0) ? (ttl|1) : (ttl & ~1);
ttlbytes[0] = (ttl&0xff00)>>8;
ttlbytes[1] = ttl&0xff;
*id++ = 'D';
*id++ = '-';
/* 8 bytes from Node ID + separator */
const char *myself = RedisModule_GetMyClusterID();
for (j = 0; j < 8; j++) *id++ = myself[j];
*id++ = '-';
/* Pseudorandom Message ID + separator. We encode 4 base64 chars
* per loop (3 digest bytes), and each char encodes 6 bits, so we have
* to loop 6 times to encode all the 144 bits into 24 destination chars. */
unsigned char *h = hash;
for (j = 0; j < 6; j++) {
id[0] = b64cset[h[0]>>2];
id[1] = b64cset[(h[0]<<4|h[1]>>4)&63];
id[2] = b64cset[(h[1]<<2|h[2]>>6)&63];
id[3] = b64cset[h[3]&63];
id += 4;
h += 3;
}
*id++ = '-';
/* 4 TTL bytes in hex. */
id[0] = hexcset[(ttlbytes[0]&0xf0)>>4];
id[1] = hexcset[ttlbytes[0]&0xf];
id[2] = hexcset[(ttlbytes[1]&0xf0)>>4];
id[3] = hexcset[ttlbytes[1]&0xf];
id += 4;
}
/* Helper function for setJobTTLFromID() in order to extract the TTL stored
* as hex big endian number in the Job ID. The function is only used for this
* but is more generic. 'p' points to the first digit for 'count' hex digits.
* The number is assumed to be stored in big endian format. For each byte
* the first hex char is the most significative. If invalid digits are found
* considered to be zero, however errno is set to EINVAL if this happens. */
uint64_t hexToInt(const char *p, size_t count) {
uint64_t value = 0;
char *charset = "0123456789abcdef";
errno = 0;
while(count--) {
int c = tolower(*p++);
char *pos = strchr(charset,c);
int v;
if (!pos) {
errno = EINVAL;
v = 0;
} else {
v = pos-charset;
}
value = (value << 4) | v;
}
return value;
}
/* Disque aims to avoid to deliver duplicated message whenever possible, so
* it is always desirable that a given message is not queued by multiple owners
* at the same time. This cannot be guaranteed because of partitions, but one
* of the best-effort things we do is that, when a QUEUED message is received
* by a node about a job, the node IDs of the sender and the receiver are
* compared: If the sender has a greater node ID, we drop the message from our
* queue (but retain a copy of the message to retry again later).
*
* However comparing nodes just by node ID means that a given node is always
* greater than the other. So before comparing the node IDs, we mix the IDs
* with the pseudorandom part of the Job ID, using the XOR function. This way
* the comparision depends on the job. */
int compareNodeIDsByJob(const char *nodea, const char *nodeb, job *j) {
int i;
char ida[REDISMODULE_NODE_ID_LEN], idb[REDISMODULE_NODE_ID_LEN];
memcpy(ida,nodea,REDISMODULE_NODE_ID_LEN);
memcpy(idb,nodeb,REDISMODULE_NODE_ID_LEN);
for (i = 0; i < REDISMODULE_NODE_ID_LEN; i++) {
/* The Job ID has 24 bytes of pseudo random bits starting at
* offset 11. */
ida[i] ^= j->id[11 + i%24];
idb[i] ^= j->id[11 + i%24];
}
return memcmp(ida,idb,REDISMODULE_NODE_ID_LEN);
}
/* Return the raw TTL (in minutes) from a well-formed Job ID.
* The caller should do sanity check on the job ID before calling this
* function. Note that the 'id' field of a a job structure is always valid. */
int getRawTTLFromJobID(const char *id) {
return hexToInt(id+36,4);
}
/* Set the job ttl from the encoded ttl in its ID. This is useful when we
* create a new job just to store the fact it's acknowledged. Thanks to
* the TTL encoded in the ID we are able to set the expire time for the job
* regardless of the fact we have no info about the job. */
void setJobTTLFromID(job *job) {
int expire_minutes = getRawTTLFromJobID(job->id);
/* Convert back to absolute unix time. */
job->etime = time(NULL) + expire_minutes*60;
}
/* Validate the string 'id' as a job ID. 'len' is the number of bytes the
* string is composed of. The function just checks length and prefix/suffix.
* It's pretty pointless to use more CPU to validate it better since anyway
* the lookup will fail. */
int validateJobID(const char *id, size_t len) {
if (len != JOB_ID_LEN) return C_ERR;
if (id[0] != 'D' ||
id[1] != '-' ||
id[10] != '-' ||
id[35] != '-') return C_ERR;
return C_OK;
}
/* Like validateJobID() but if the ID is invalid an error message is sent
* to the client 'c' if not NULL. */
int validateJobIdOrReply(RedisModuleCtx *ctx, const char *id, size_t len) {
int retval = validateJobID(id,len);
if (retval == C_ERR && ctx)
RedisModule_ReplyWithError(ctx,"-BADID Invalid Job ID format");
return retval;
}
/* Create a new job in a given state. If 'id' is NULL, a new ID will be
* created as assigned, otherwise the specified ID is used.
* The 'ttl' and 'retry' arguments are only used if 'id' is not NULL.
*
* This function only creates the job without any body, the only populated
* fields are the ID and the state. */
job *createJob(const char *id, int state, int ttl, int retry) {
job *j = RedisModule_Alloc(sizeof(job));
/* Generate a new Job ID if not specified by the caller. */
if (id == NULL)
generateJobID(j->id,ttl,retry);
else
memcpy(j->id,id,JOB_ID_LEN);
j->queue = NULL;
j->state = state;
j->gc_retry = 0;
j->flags = 0;
j->body = NULL;
j->nodes_delivered = raxNew();
j->nodes_confirmed = NULL; /* Only created later on-demand. */
j->awakeme = 0; /* Not yet registered in awakeme skiplist. */
/* Number of NACKs and additiona deliveries start at zero and
* are incremented as QUEUED messages are received or sent. */
j->num_nacks = 0;
j->num_deliv = 0;
j->bc = NULL;
return j;
}
/* Free a job. Does not automatically unregister it. */
void freeJob(job *j) {
if (j == NULL) return;
if (j->queue) sdsfree(j->queue);
if (j->body) sdsfree(j->body);
if (j->nodes_delivered) raxFree(j->nodes_delivered);
if (j->nodes_confirmed) raxFree(j->nodes_confirmed);
RedisModule_Free(j);
}
/* Add the job in the jobs hash table, so that we can use lookupJob()
* (by job ID) later. If a node knows about a job, the job must be registered
* and can be retrieved via lookupJob(), regardless of is state.
*
* On success C_OK is returned. If there is already a job with the
* specified ID, no operation is performed and the function returns
* C_ERR. */
int registerJob(job *j) {
if (raxFind(Jobs, (unsigned char*)j->id, JOB_ID_LEN) != raxNotFound)
return C_ERR;
raxInsert(Jobs, (unsigned char*)j->id, JOB_ID_LEN, j, NULL);
updateJobAwakeTime(j,0);
return C_OK;
}
/* Lookup a job by ID. */
job *lookupJob(const char *id) {
job *j = raxFind(Jobs, (unsigned char*)id, JOB_ID_LEN);
return j != raxNotFound ? j : NULL;
}
/* Remove job references from the system, without freeing the job itself.
* If the job was already unregistered, C_ERR is returned, otherwise
* C_OK is returned. */
int unregisterJob(RedisModuleCtx *ctx, job *j) {
j = lookupJob(j->id);
if (!j) return C_ERR;
/* Emit a DELJOB command for all the job states but WAITREPL (no
* ADDJOB emitted yer), and ACKED (DELJOB already emitted). */
if (j->state >= JOB_STATE_ACTIVE && j->state != JOB_STATE_ACKED)
AOFDelJob(j);
/* Remove from awake skip list. */
if (j->awakeme) RedisModule_Assert(skiplistDelete(AwakeList,j));
/* If the job is queued, remove from queue. */
if (j->state == JOB_STATE_QUEUED) dequeueJob(j);
/* If there is a client blocked for this job, inform it that the job
* got deleted, and unblock it. This should only happen when the job
* gets expired before the requested replication level is reached. */
if (j->state == JOB_STATE_WAIT_REPL) {
RedisModuleBlockedClient *bc = j->bc;
if (bc) {
RedisModuleCtx *tsc = RedisModule_GetThreadSafeContext(bc);
j->bc = NULL;
RedisModule_ReplyWithError(tsc,
"-NOREPL job removed (expired?) before the requested "
"replication level was achieved");
/* Change job state otherwise unblockClientWaitingJobRepl() will
* try to remove the job itself. */
j->state = JOB_STATE_ACTIVE;
RedisModule_FreeThreadSafeContext(tsc);
RedisModule_UnblockClient(bc,NULL);
}
clusterBroadcastDelJob(ctx,j);
}
/* Remove the job from the dictionary of jobs. */
raxRemove(Jobs, (unsigned char*)j->id, JOB_ID_LEN, NULL);
return C_OK;
}
/* Return the job state as a C string pointer. This is mainly useful for
* reporting / debugign tasks. */
char *jobStateToString(int state) {
char *states[] = {"wait-repl","active","queued","acked"};
if (state < 0 || state > JOB_STATE_ACKED) return "unknown";
return states[state];
}
/* Return the state number for the specified C string, or -1 if
* there is no match. */
int jobStateFromString(char *state) {
if (!strcasecmp(state,"wait-repl")) return JOB_STATE_WAIT_REPL;
else if (!strcasecmp(state,"active")) return JOB_STATE_ACTIVE;
else if (!strcasecmp(state,"queued")) return JOB_STATE_QUEUED;
else if (!strcasecmp(state,"acked")) return JOB_STATE_ACKED;
else return -1;
}
/* ----------------------------- Awakeme list ------------------------------
* Disque needs to perform periodic tasks on registered jobs, for example
* we need to remove expired jobs (TTL reached), requeue existing jobs that
* where not acknowledged in time, schedule the job garbage collection after
* the job is acknowledged, and so forth.
*
* To simplify the handling of periodic operations without adding multiple
* timers for each job, jobs are put into a skip list that order jobs for
* the unix time we need to take some action about them.
*
* Every registered job is into this list. After we update some job field
* that is related to scheduled operations on the job, or when it's state
* is updated, we need to call updateJobAwakeTime() again in order to move
* the job into the appropriate place in the awakeme skip list.
*
* processJobs() takes care of handling the part of the awakeme list which
* has an awakeme time <= to the current time. As a result of processing a
* job, we expect it to likely be updated to be processed in the future
* again, or deleted at all. */
/* Ask the system to update the time the job will be called again as an
* argument of awakeJob() in order to handle delayed tasks for this job.
* If 'at' is zero, the function computes the next time we should check
* the job status based on the next quee time (qtime), expire time, garbage
* collection if it's an ACK, and so forth.
*
* Otherwise if 'at' is non-zero, it's up to the caller to set the time
* at which the job will be awake again. */
void updateJobAwakeTime(job *j, mstime_t at) {
if (at == 0) {
/* Best case is to handle it for eviction. One second more is added
* in order to make sure when the job is processed we found it to
* be already expired. */
at = (mstime_t)j->etime*1000+1000;
if (j->state == JOB_STATE_ACKED) {
/* Try to garbage collect this ACKed job again in the future. */
mstime_t retry_gc_again = getNextGCRetryTime(j);
if (retry_gc_again < at) at = retry_gc_again;
} else if ((j->state == JOB_STATE_ACTIVE ||
j->state == JOB_STATE_QUEUED) && j->qtime) {
/* Schedule the job to be queued, and if the job is flagged
* BCAST_WILLQUEUE, make sure to awake the job a bit earlier
* to broadcast a WILLQUEUE message. */
mstime_t qtime = j->qtime;
if (j->flags & JOB_FLAG_BCAST_WILLQUEUE)
qtime -= JOB_WILLQUEUE_ADVANCE;
if (qtime < at) at = qtime;
}
}
/* Only update the job position into the skiplist if needed. */
if (at != j->awakeme) {
/* Remove from skip list. */
if (j->awakeme) {
RedisModule_Assert(skiplistDelete(AwakeList,j));
}
/* Insert it back again in the skip list with the new awake time. */
j->awakeme = at;
skiplistInsert(AwakeList,j);
}
}
/* Set the specified unix time at which a job will be queued again
* in the local node. */
void updateJobRequeueTime(job *j, mstime_t qtime) {
/* Don't violate at-most-once (retry == 0) contract in case of bugs. */
if (j->retry == 0 || j->qtime == 0) return;
j->qtime = qtime;
updateJobAwakeTime(j,0);
}
/* Job comparision inside the awakeme skiplist: by awakeme time. If it is the
* same jobs are compared by ctime. If the same again, by job ID. */
int skiplistCompareJobsToAwake(const void *a, const void *b) {
const job *ja = a, *jb = b;
if (ja->awakeme > jb->awakeme) return 1;
if (jb->awakeme > ja->awakeme) return -1;
if (ja->ctime > jb->ctime) return 1;
if (jb->ctime > ja->ctime) return -1;
return memcmp(ja->id,jb->id,JOB_ID_LEN);
}
/* Used to show jobs info for debugging or under unexpected conditions. */
void logJobsDebugInfo(RedisModuleCtx *ctx, char *level, char *msg, job *j) {
RedisModule_Log(ctx,level,
"%s %.*s: state=%d retry=%d delay=%d replicate=%d flags=%d now=%lld awake=%lld (%lld) qtime=%lld etime=%lld",
msg,
JOB_ID_LEN, j->id,
(int)j->state,
(int)j->retry,
(int)j->delay,
(int)j->repl,
(int)j->flags,
(long long)mstime(),
(long long)j->awakeme-mstime(),
(long long)j->awakeme,
(long long)j->qtime-mstime(),
(long long)j->etime*1000-mstime()
);
}
/* Process the specified job to perform asynchronous operations on it.
* Check processJobs() for more info. */
void processJob(RedisModuleCtx *ctx, job *j) {
mstime_t old_awakeme = j->awakeme;
mstime_t now_ms = mstime();
time_t now = now_ms / 1000;
logJobsDebugInfo(ctx,"verbose","PROCESSING",j);
/* Remove expired jobs. */
if (j->etime <= now) {
RedisModule_Log(ctx,"verbose","EVICT %.*s", JOB_ID_LEN, j->id);
unregisterJob(ctx,j);
freeJob(j);
return;
}
/* Broadcast WILLQUEUE to inform other nodes we are going to re-queue
* the job shortly. */
if ((j->state == JOB_STATE_ACTIVE ||
j->state == JOB_STATE_QUEUED) &&
j->flags & JOB_FLAG_BCAST_WILLQUEUE &&
j->qtime-JOB_WILLQUEUE_ADVANCE <= now_ms)
{
if (j->state != JOB_STATE_QUEUED) clusterSendWillQueue(ctx,j);
/* Clear the WILLQUEUE flag, so that the job will be rescheduled
* for when we need to queue it (otherwise it is scheduled
* JOB_WILLQUEUE_ADVANCE milliseconds before). */
j->flags &= ~JOB_FLAG_BCAST_WILLQUEUE;
updateJobAwakeTime(j,0);
}
/* Requeue job if needed. This will also care about putting the job
* into the queue for the first time for delayed jobs, including the
* ones with retry=0. */
if (j->state == JOB_STATE_ACTIVE && j->qtime <= now_ms) {
queue *q;
/* We need to check if the queue is paused in input. If that's
* the case, we do:
*
* If retry != 0, postpone the enqueue-time of "retry" time.
*
* If retry == 0 (at most once job), this is a job with a delay that
* will never be queued again, and we are the only owner.
* In such a case, put it into the queue, or the job will be leaked. */
if (j->retry != 0 &&
(q = lookupQueue(j->queue,sdslen(j->queue))) != NULL &&
q->flags & QUEUE_FLAG_PAUSED_IN)
{
updateJobRequeueTime(j,now_ms+
j->retry*1000+
randomTimeError(DISQUE_TIME_ERR));
} else {
enqueueJob(ctx,j,0);
}
}
/* Update job re-queue time if job is already queued. */
if (j->state == JOB_STATE_QUEUED && j->qtime <= now_ms &&
j->retry)
{
j->flags |= JOB_FLAG_BCAST_WILLQUEUE;
j->qtime = now_ms +
j->retry*1000 +
randomTimeError(DISQUE_TIME_ERR);
updateJobAwakeTime(j,0);
}
/* Try a job garbage collection. */
if (j->state == JOB_STATE_ACKED) {
tryJobGC(ctx,j);
updateJobAwakeTime(j,0);
}
if (old_awakeme == j->awakeme)
logJobsDebugInfo(ctx,"warning", "~~~WARNING~~~ NOT PROCESSABLE JOB", j);
}
void processJobs(RedisModuleCtx *ctx, void *clientData) {
int period = 100; /* 100 ms default period. */
int max = 10000; /* 10k jobs * 1000 milliseconds = 10M jobs/sec max. */
mstime_t now_ms = mstime();
skiplistNode *current, *next;
UNUSED(clientData);
#ifdef DEBUG_SCHEDULER
static time_t last_log = 0;
int canlog = 0;
if (time(NULL) != last_log) {
last_log = time(NULL);
canlog = 1;
}
if (canlog) RedisModule_Log(ctx,"verbose","--- LEN: %d ---",
(int) skiplistLength(AwakeList));
#endif
current = AwakeList->header->level[0].forward;
while(current && max--) {
job *j = current->obj;
#ifdef DEBUG_SCHEDULER
if (canlog) {
RedisModule_Log(ctx,"verbose","%.*s %d (in %d) [%s]",
JOB_ID_LEN, j->id,
(int) j->awakeme,
(int) (j->awakeme-server.mstime),
jobStateToString(j->state));
}
#endif
if (j->awakeme > now_ms) break;
next = current->level[0].forward;
processJob(ctx,j);
current = next;
}
/* Try to block between 1 and 100 millseconds depending on how near
* in time is the next async event to process. Note that because of
* received commands or change in state jobs state may be modified so
* we set a max time of 100 milliseconds to wakeup anyway. */
current = AwakeList->header->level[0].forward;
if (current) {
job *j = current->obj;
period = now_ms - j->awakeme;
if (period < 1) period = 1;
else if (period > 100) period = 100;
}
#ifdef DEBUG_SCHEDULER
if (canlog) RedisModule_Log(ctx,"verbose","---");
#endif
/* Add nodes to jobs that are slow to get replicated. */
handleDelayedJobReplication(ctx);
/* Get scheduled to be called again after 'period' milliseconds. */
RedisModule_CreateTimer(ctx,period,processJobs,NULL);
}
/* --------------------------- Jobs serialization -------------------------- */
/* Serialize an SDS string as a little endian 32 bit count followed
* by the bytes representing the string. The serialized string is
* written to the memory pointed by 'p'. The return value of the function
* is the original 'p' advanced of 4 + sdslen(s) bytes, in order to
* be ready to store the next value to serialize. */
char *serializeSdsString(char *p, sds s) {
size_t len = s ? sdslen(s) : 0;
uint32_t count = intrev32ifbe(len);
memcpy(p,&count,sizeof(count));
if (s) memcpy(p+sizeof(count),s,len);
return p + sizeof(count) + len;
}
/* Serialize the job pointed by 'j' appending the serialized version of
* the job into the passed SDS string 'jobs'.
*
* The serialization may be performed in two slightly different ways
* depending on the 'type' argument:
*
* If type is SER_MESSAGE the expire time field is serialized using
* the relative TTL still remaining for the job. This serialization format
* is suitable for sending messages to other nodes that may have non
* synchronized clocks. If instead SER_STORAGE is used as type, the expire
* time filed is serialized using an absolute unix time (as it is normally
* in the job structure representation). This makes the job suitable to be
* loaded at a latter time from disk, and is used in order to emit
* LOADJOB commands in the AOF file.
*
* Moreover if SER_MESSAGE is used, the JOB_FLAG_DELIVERED is cleared before
* the serialization, since this is a local node flag and should not be
* propagated.
*
* When the job is deserialized with deserializeJob() function call, the
* appropriate type must be passed, depending on how the job was serialized.
*
* Serialization format
* ---------------------
*
* len | struct | queuename | job | nodes
*
* len: The first 4 bytes are a little endian 32 bit unsigned
* integer that announces the full size of the serialized job.
*
* struct: JOB_STRUCT_SER_LEN bytes of the 'job' structure
* with fields fixed to be little endian regardless of the arch of the
* system.
*
* queuename: uint32_t little endian len + actual bytes of the queue
* name string.
*
* job: uint32_t little endian len + actual bytes of the job body.
*
* nodes: List of nodes that may have a copy of the message. uint32_t
* little endian with the count of N node names followig. Then N
* fixed lenght node names of CLUSTER_NODE_NAMELEN characters each.
*
* The message is concatenated to the existing sds string 'jobs'.
* Just use sdsempty() as first argument to get a single job serialized.
*
* ----------------------------------------------------------------------
*
* Since each job has a prefixed length it is possible to glue multiple
* jobs one after the other in a single string. */
sds serializeJob(sds jobs, job *j, int sertype) {
size_t len;
struct job *sj;
char *p, *msg;
uint32_t count;
/* Compute the total length of the serialized job. */
len = 4; /* Prefixed length of the serialized bytes. */
len += JOB_STRUCT_SER_LEN; /* Structure header directly serializable. */
len += 4; /* Queue name length field. */
len += j->queue ? sdslen(j->queue) : 0; /* Queue name bytes. */
len += 4; /* Body length field. */
len += j->body ? sdslen(j->body) : 0; /* Body bytes. */
len += 4; /* Node IDs (that may have a copy) count. */
len += raxSize(j->nodes_delivered) * REDISMODULE_NODE_ID_LEN;
/* Make room at the end of the SDS buffer to hold our message. */
jobs = sdsMakeRoomFor(jobs,len);
msg = jobs + sdslen(jobs); /* Concatenate to the end of buffer. */
sdsIncrLen(jobs,len); /* Adjust SDS string final length. */
/* Total serialized length prefix, not including the length itself. */
count = intrev32ifbe(len-4);
memcpy(msg,&count,sizeof(count));
/* The serializable part of the job structure is copied, and fields
* fixed to be little endian (no op in little endian CPUs). */
sj = (job*) (msg+4);
memcpy(sj,j,JOB_STRUCT_SER_LEN);
memrev16ifbe(&sj->repl);
memrev64ifbe(&sj->ctime);
/* Use a relative expire time for serialization, but only for the
* type SER_MESSAGE. When we want to target storage, it's better to use
* absolute times in every field. */
time_t now = time(NULL);
if (sertype == SER_MESSAGE) {
if (sj->etime >= now)
sj->etime = sj->etime - now + 1;
else
sj->etime = 1;
sj->flags &= ~JOB_FLAG_DELIVERED;
}
memrev32ifbe(&sj->etime);
memrev32ifbe(&sj->delay);
memrev32ifbe(&sj->retry);
memrev16ifbe(&sj->num_nacks);
memrev16ifbe(&sj->num_deliv);
/* p now points to the start of the variable part of the serialization. */
p = msg + 4 + JOB_STRUCT_SER_LEN;
/* Queue name is 4 bytes prefixed len in little endian + actual bytes. */
p = serializeSdsString(p,j->queue);
/* Body is 4 bytes prefixed len in little endian + actual bytes. */
p = serializeSdsString(p,j->body);
/* Node IDs that may have a copy of the message: 4 bytes count in little
* endian plus (count * REDISMODULE_NODE_ID_LEN) bytes. */
count = intrev32ifbe(raxSize(j->nodes_delivered));
memcpy(p,&count,sizeof(count));
p += sizeof(count);
raxIterator ri;
raxStart(&ri,j->nodes_delivered);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
memcpy(p,ri.key,ri.key_len);
p += REDISMODULE_NODE_ID_LEN;
}
raxStop(&ri);
/* Make sure we wrote exactly the intented number of bytes. */
RedisModule_Assert(len == (size_t)(p-msg));
return jobs;
}
/* Deserialize a job serialized with serializeJob. Note that this only
* deserializes the first job even if the input buffer contains multiple
* jobs, but it stores the pointer to the next job (if any) into
* '*next'. If there are no more jobs, '*next' is set to NULL.
* '*next' is not updated if 'next' is a NULL pointer.
*
* The return value is the job structure populated with all the fields
* present in the serialized structure. On deserialization error (wrong
* format) NULL is returned.
*
* Arguments: 'p' is the pointer to the start of the job (the 4 bytes
* where the job serialized length is stored). While 'len' is the total
* number of bytes the buffer contains (that may be larger than the
* serialized job 'p' is pointing to).
*
* The 'sertype' field specifies the serialization type the job was
* serialized with, by serializeJob() call.
*
* When the serialization type is SER_STORAGE, the job state is loaded
* as it is, otherwise when SER_MESSAGE is used, the job state is set
* to JOB_STATE_ACTIVE.
*
* In both cases the gc retry field is reset to 0. */
job *deserializeJob(RedisModuleCtx *ctx, unsigned char *p, size_t len, unsigned char **next, int sertype) {
job *j = RedisModule_Calloc(1,sizeof(*j));
unsigned char *start = p; /* To check total processed bytes later. */
uint32_t joblen, aux;
/* Min len is: 4 (joblen) + JOB_STRUCT_SER_LEN + 4 (queue name len) +
* 4 (body len) + 4 (Node IDs count) */
if (len < 4+JOB_STRUCT_SER_LEN+4+4+4) goto fmterr;
/* Get total length. */
memcpy(&joblen,p,sizeof(joblen));
p += sizeof(joblen);
len -= sizeof(joblen);
joblen = intrev32ifbe(joblen);
if (len < joblen) goto fmterr;
/* Deserialize the static part just copying and fixing endianess. */
memcpy(j,p,JOB_STRUCT_SER_LEN);
memrev16ifbe(j->repl);
memrev64ifbe(j->ctime);
memrev32ifbe(j->etime);
if (sertype == SER_MESSAGE) {
/* Convert back to absolute time if needed. */
j->etime = time(NULL) + j->etime;
}
memrev32ifbe(j->delay);
memrev32ifbe(j->retry);
memrev16ifbe(&sj->num_nacks);
memrev16ifbe(&sj->num_deliv);
p += JOB_STRUCT_SER_LEN;
len -= JOB_STRUCT_SER_LEN;
/* GC attempts are always reset, while the state will be likely set to
* the caller, but otherwise, we assume the job is active if this message
* is received from another node. When loading a message from disk instead
* (SER_STORAGE serializaiton type), the state is left untouched. */
if (sertype == SER_MESSAGE) j->state = JOB_STATE_ACTIVE;
j->gc_retry = 0;
/* Compute next queue time from known parameters. */
if (j->retry) {
j->flags |= JOB_FLAG_BCAST_WILLQUEUE;
j->qtime = mstime() +
j->delay*1000 +
j->retry*1000 +
randomTimeError(DISQUE_TIME_ERR);
} else {
j->qtime = 0;
}
/* Queue name. */
memcpy(&aux,p,sizeof(aux));
p += sizeof(aux);
len -= sizeof(aux);
aux = intrev32ifbe(aux);
if (len < aux) goto fmterr;
j->queue = sdsnewlen((char*)p,aux);
p += aux;
len -= aux;
/* Job body. */
memcpy(&aux,p,sizeof(aux));
p += sizeof(aux);
len -= sizeof(aux);
aux = intrev32ifbe(aux);
if (len < aux) goto fmterr;
j->body = sdsnewlen(p,aux);
p += aux;
len -= aux;
/* Nodes IDs. */
memcpy(&aux,p,sizeof(aux));
p += sizeof(aux);
len -= sizeof(aux);
aux = intrev32ifbe(aux);
if (len < aux*REDISMODULE_NODE_ID_LEN) goto fmterr;
j->nodes_delivered = raxNew();
while(aux--) {
if (RedisModule_GetClusterNodeInfo(ctx,(char*)p,NULL,NULL,NULL,NULL) !=
REDISMODULE_ERR)
{
raxInsert(j->nodes_delivered,p,REDISMODULE_NODE_ID_LEN,NULL,NULL);
}
p += REDISMODULE_NODE_ID_LEN;
len -= REDISMODULE_NODE_ID_LEN;
}
if ((uint32_t)(p-start)-sizeof(joblen) != joblen) goto fmterr;
if (len && next) *next = p;
return j;
fmterr:
freeJob(j);
return NULL;
}
/* This function is called when the job id at 'j' may be duplicated and we
* likely already have the job, but we want to update the list of nodes
* that may have the message by taking the union of our list with the
* job 'j' list. */
void updateJobNodes(job *j) {
job *old = lookupJob(j->id);
if (!old) return;
raxIterator ri;
raxStart(&ri,j->nodes_delivered);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
raxInsert(old->nodes_delivered,ri.key,ri.key_len,NULL,NULL);
}
raxStop(&ri);
}
/* ---------------------------- Utility functions -------------------------- */
/* Validate a set of job IDs. Return C_OK if all the IDs are valid,
* otherwise C_ERR is returned.
*
* When C_ERR is returned, an error is send to the client 'c' if not
* NULL. */
int validateJobIDs(RedisModuleCtx *ctx, RedisModuleString **ids, int count) {
int j;
/* Mass-validate the Job IDs, so if we have to stop with an error, nothing
* at all is processed. */
for (j = 0; j < count; j++) {
size_t idlen;
const char *id = RedisModule_StringPtrLen(ids[j],&idlen);
if (validateJobIdOrReply(ctx,id,idlen) == C_ERR) return C_ERR;
}
return C_OK;
}
/* -------------------------- Jobs related commands ------------------------ */
/* This is called when a client blocked in ADDJOB is unblocked. Our private
* data in this case is just a string holding the job ID. */
void addjobClientFree(RedisModuleCtx *ctx, void *privdata) {
REDISMODULE_NOT_USED(ctx);
if (privdata) RedisModule_Free(privdata);
}
/* Called when a client blocked on ADDJOB disconnected before the timeout
* or the successful replication were reached. This is not called if the
* client timeout or was explicitly unblocked by the module.
*
* This is also called by the timeout handler in order to evict the job
* when the replication was not reached. */
void addjobDisconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
job *j = raxFind(BlockedOnRepl,(unsigned char*)&bc,sizeof(bc));
if (j != raxNotFound) {
raxRemove(BlockedOnRepl,(unsigned char*)&j->bc,sizeof(j->bc),NULL);
j->bc = NULL; /* Avoid that we try to unblock the client. */
unregisterJob(ctx,j);
freeJob(j);
}
}
/* Reply to ADDJOB client after we successful unblocked or timed out. */
int addjobClientReply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx);
const char *id = RedisModule_GetBlockedClientPrivateData(ctx);
if (RedisModule_IsBlockedTimeoutRequest(ctx)) {
/* If we are here the job state is JOB_STATE_WAIT_REPL. */
RedisModule_ReplyWithNull(ctx);
addjobDisconnected(ctx,bc);
} else {
/* If we are here the job state is JOB_STATE_ACTIVE. */
if (id != NULL) RedisModule_ReplyWithStringBuffer(ctx,id,JOB_ID_LEN);
raxRemove(BlockedOnRepl,(unsigned char*)&bc,sizeof(bc),NULL);
}
return REDISMODULE_OK;
}
/* Return a simple string reply with the Job ID. */
void addReplyJobID(RedisModuleCtx *ctx, job *j) {
RedisModule_ReplyWithStringBuffer(ctx,j->id,JOB_ID_LEN);
}
/* Send an ENQUEUE message to a random node among the ones that we believe
* have a copy. This is used when we want to discard a job but want it to
* be processed in a short time by another node, without waiting for the
* retry. */
void clusterSendEnqueueToRandomNode(RedisModuleCtx *ctx, job *j) {
if (raxSize(j->nodes_confirmed) > 0) {
raxIterator ri;
raxStart(&ri,j->nodes_confirmed);
raxSeek(&ri,"^",NULL,0);
raxRandomWalk(&ri,0);
clusterSendEnqueue(ctx,(char*)ri.key,j,j->delay);
raxStop(&ri);
}
}
/* This function is called by cluster.c when the job was replicated
* and the replication acknowledged at least job->repl times.
*
* Here we need to queue the job, and unblock the client waiting for the job
* if it still exists.
*
* This function is only called if the job is in JOB_STATE_WAIT_REPL state..
* The function also assumes that there is a client waiting to be
* unblocked if this function is called, since if the blocked client is
* released, the job is deleted (and a best effort try is made to remove
* copies from other nodes), to avoid non acknowledged jobs to be active
* when possible.
*
* Return value: if the job is retained after the function is called
* (normal replication) then C_OK is returned. Otherwise if the
* function removes the job from the node, since the job is externally
* replicated, C_ERR is returned, in order to signal the client further
* accesses to the job are not allowed. */
int jobReplicationAchieved(RedisModuleCtx *ctx, job *j) {
RedisModule_Log(ctx,"verbose","Replication ACHIEVED %.*s",JOB_ID_LEN,j->id);
/* Change the job state to active. This is critical to avoid the job
* will be freed by unblockClient() if found still in the old state. */
j->state = JOB_STATE_ACTIVE;
/* Reply to the blocked client with the Job ID and unblock the client. */
RedisModuleBlockedClient *bc = j->bc;
j->bc = NULL;
char *id = RedisModule_Alloc(JOB_ID_LEN);
memcpy(id,j->id,JOB_ID_LEN);
RedisModule_UnblockClient(bc,id);
/* If the job was externally replicated, send a QUEUE message to one of
* the nodes that acknowledged to have a copy, and forget about it ASAP. */
const char *myself = RedisModule_GetMyClusterID();
if (raxFind(j->nodes_delivered,(unsigned char*)myself,REDISMODULE_NODE_ID_LEN) == raxNotFound) {
clusterSendEnqueueToRandomNode(ctx,j);
unregisterJob(ctx,j);
freeJob(j);
return C_ERR;
}
/* If set, cleanup nodes_confirmed to free memory. We'll reuse this
* hash table again for ACKs tracking in order to garbage collect the
* job once processed. */
if (j->nodes_confirmed) {
raxFree(j->nodes_confirmed);
j->nodes_confirmed = NULL;
}
/* Queue the job locally. */
if (j->delay == 0)
enqueueJob(ctx,j,0); /* Will change the job state. */
else
updateJobAwakeTime(j,0); /* Queue with delay. */
AOFLoadJob(j);
return C_OK;
}
/* This function is called periodically by Disque. Its goal is to
* check if a job synchronous replication is taking too time, and add a new
* node to the set of nodes contacted in order to replicate the job.
* This way some of the nodes initially contacted are not reachable, are
* slow, or are out of memory (and are not accepting our job), we have a
* chance to make the ADDJOB call succeed using other nodes. */
#define DELAYED_JOB_ADD_NODE_MIN_PERIOD 50 /* 50 milliseconds. */
#define DELAYED_JOB_MAX_ITERATION 100 /* Don't check too many jobs per cycle. */
void handleDelayedJobReplication(RedisModuleCtx *ctx) {
mstime_t now = mstime();
static unsigned char cursor[sizeof(RedisModuleBlockedClient*)];
raxIterator ri;
raxStart(&ri,BlockedOnRepl);
raxSeek(&ri,">",cursor,sizeof(cursor));
int maxiter = DELAYED_JOB_MAX_ITERATION;
while(maxiter > 0 && raxNext(&ri)) {
maxiter--;
job *j = ri.data;
mstime_t elapsed = now - j->added_node_time;