|
115 | 115 | -callback shards(St :: term(), [#shard{}]) -> |
116 | 116 | {[#shard{}], St1 :: term()}. |
117 | 117 |
|
118 | | -% Optional |
| 118 | +% Optional. Called right after a shard file is opened so it gets a Db handle. |
| 119 | +% Should return the change feed start sequence and a list of options along with any changes |
| 120 | +% in a private context. The change feed start sequence should normally be 0 and the list |
| 121 | +% of option can be []. The list of options will be passed directly to couch_db:fold_changes, |
| 122 | +% so any {dir, Dir}, {end_key, EndSeq} could work there. |
| 123 | +% |
119 | 124 | -callback db_opened(St :: term(), Db :: term()) -> |
120 | | - {ok, St :: term()}. |
| 125 | + {ChangesSeq :: non_neg_integer(), ChangesOpts :: [term()], St1 :: term()}. |
121 | 126 |
|
122 | 127 | % Optional. If doc and doc_fdi are not defined, then doc_id default |
123 | 128 | % action is {skip, St}. If it is defined, the default action is {ok, St}. |
|
178 | 183 | cursor, |
179 | 184 | shards_db, |
180 | 185 | db, |
| 186 | + changes_seq = 0, |
| 187 | + changes_opts = [], |
181 | 188 | checkpoint_sec = 0, |
182 | 189 | start_sec = 0, |
183 | 190 | skip_dbs, |
@@ -370,7 +377,8 @@ scan_docs(#st{} = St, #shard{name = ShardDbName}) -> |
370 | 377 | try |
371 | 378 | St2 = St1#st{db = Db}, |
372 | 379 | St3 = db_opened_callback(St2), |
373 | | - {ok, St4} = couch_db:fold_docs(Db, fun scan_docs_fold/2, St3, []), |
| 380 | + #st{changes_seq = Seq, changes_opts = Opts} = St3, |
| 381 | + {ok, St4} = couch_db:fold_changes(Db, Seq, fun scan_docs_fold/2, St3, Opts), |
374 | 382 | St5 = db_closing_callback(St4), |
375 | 383 | erlang:garbage_collect(), |
376 | 384 | St5#st{db = undefined} |
@@ -521,13 +529,13 @@ resume_callback(#{} = Cbks, SId, #{} = EJsonPSt) when is_binary(SId) -> |
521 | 529 |
|
522 | 530 | db_opened_callback(#st{pst = PSt, callbacks = Cbks, db = Db} = St) -> |
523 | 531 | #{db_opened := DbOpenedCbk} = Cbks, |
524 | | - {ok, PSt1} = DbOpenedCbk(PSt, Db), |
525 | | - St#st{pst = PSt1}. |
| 532 | + {Seq, Opts, PSt1} = DbOpenedCbk(PSt, Db), |
| 533 | + St#st{pst = PSt1, changes_seq = Seq, changes_opts = Opts}. |
526 | 534 |
|
527 | 535 | db_closing_callback(#st{pst = PSt, callbacks = Cbks, db = Db} = St) -> |
528 | 536 | #{db_closing := DbClosingCbk} = Cbks, |
529 | 537 | {ok, PSt1} = DbClosingCbk(PSt, Db), |
530 | | - St#st{pst = PSt1}. |
| 538 | + St#st{pst = PSt1, changes_seq = 0, changes_opts = []}. |
531 | 539 |
|
532 | 540 | shards_callback(#st{pst = PSt, callbacks = Cbks} = St, Shards) -> |
533 | 541 | #{shards := ShardsCbk} = Cbks, |
@@ -601,7 +609,7 @@ default_shards(Mod, _F, _A) when is_atom(Mod) -> |
601 | 609 | end. |
602 | 610 |
|
603 | 611 | default_db_opened(Mod, _F, _A) when is_atom(Mod) -> |
604 | | - fun(St, _Db) -> {ok, St} end. |
| 612 | + fun(St, _Db) -> {0, [], St} end. |
605 | 613 |
|
606 | 614 | default_doc_id(Mod, _F, _A) when is_atom(Mod) -> |
607 | 615 | case is_exported(Mod, doc, 3) orelse is_exported(Mod, doc_fdi, 3) of |
|
0 commit comments