@@ -57,26 +57,29 @@ def load_oplog(self):
57
57
return None
58
58
59
59
def replay (self , oplog ):
60
- for n , entry in enumerate ( oplog ) :
60
+ for entry in oplog :
61
61
# TODO: log excep
62
62
self .docman .process (entry )
63
63
64
64
self ._last_ts = entry ['ts' ]
65
65
LOG .info ('Current progress={}' .format (ts2localtime (self ._last_ts )))
66
66
67
67
def run (self ):
68
- while self ._running :
69
- LOG .info ('Loading ts={}, last progress={}' .format (
70
- self ._last_ts , ts2localtime (self ._last_ts )))
71
- oplog = self .load_oplog ()
72
-
73
- if oplog is None :
74
- LOG .info ('Loaded None. No more oplog to sync.' )
75
- time .sleep (10 )
76
- else :
77
- LOG .info ('Loaded ts={}, size={}' .format (
78
- self ._last_ts , len (oplog )))
79
- self .replay (oplog )
68
+ try :
69
+ while self ._running :
70
+ LOG .info ('Loading ts={}, last progress={}' .format (
71
+ self ._last_ts , ts2localtime (self ._last_ts )))
72
+ oplog = self .load_oplog ()
73
+
74
+ if oplog is None :
75
+ LOG .info ('Loaded None. No more oplog to sync.' )
76
+ time .sleep (10 )
77
+ else :
78
+ LOG .info ('Loaded ts={}, size={}' .format (
79
+ self ._last_ts , len (oplog )))
80
+ self .replay (oplog )
81
+ except Exception as e :
82
+ LOG .error (str (e ), exc_info = True )
80
83
81
84
LOG .warning ('Oplog syncing stopped.' )
82
85
@@ -85,7 +88,7 @@ def start(self):
85
88
self ._running = True
86
89
self ._thread = threading .Thread (target = self .run )
87
90
self ._thread .start ()
88
- LOG .info ('Started pid={}, syncing thread={}' .format (
91
+ LOG .warning ('Started pid={}, syncing thread={}' .format (
89
92
os .getpid (), self ._thread .ident ))
90
93
91
94
def safe_stop (self ):
0 commit comments