12
12
# limitations under the License.
13
13
14
14
import abc
15
+ import numbers
15
16
import threading
16
17
import time
17
18
@@ -100,13 +101,14 @@ def __init__(self,
100
101
self .logger .error (enums .Errors .INVALID_INPUT .format ('notification_center' ))
101
102
self .notification_center = _notification_center .NotificationCenter ()
102
103
104
+ self .executor = None
103
105
if start_on_init is True :
104
106
self .start ()
105
107
106
108
@property
107
109
def is_running (self ):
108
110
""" Property to check if consumer thread is alive or not. """
109
- return self .executor .isAlive ()
111
+ return self .executor .isAlive () if self . executor else False
110
112
111
113
def _validate_intantiation_props (self , prop , prop_name ):
112
114
""" Method to determine if instantiation properties like batch_size, flush_interval
@@ -121,12 +123,18 @@ def _validate_intantiation_props(self, prop, prop_name):
121
123
False if property name is batch_size and value is a floating point number.
122
124
True otherwise.
123
125
"""
124
- if (prop_name == 'batch_size' and not isinstance (prop , int )) or prop is None or prop <= 0 or \
125
- not validator .is_finite_number (prop ):
126
+ is_valid = True
127
+
128
+ if prop is None or not validator .is_finite_number (prop ) or prop <= 0 :
129
+ is_valid = False
130
+
131
+ if prop_name == 'batch_size' and not isinstance (prop , numbers .Integral ):
132
+ is_valid = False
133
+
134
+ if is_valid is False :
126
135
self .logger .info ('Using default value for {}.' .format (prop_name ))
127
- return False
128
136
129
- return True
137
+ return is_valid
130
138
131
139
def _get_time (self , _time = None ):
132
140
""" Method to return rounded off time as integer in seconds. If _time is None, uses current time.
@@ -279,7 +287,8 @@ def stop(self):
279
287
self .event_queue .put (self ._SHUTDOWN_SIGNAL )
280
288
self .logger .warning ('Stopping Scheduler.' )
281
289
282
- self .executor .join (self .timeout_interval .total_seconds ())
290
+ if self .executor :
291
+ self .executor .join (self .timeout_interval .total_seconds ())
283
292
284
293
if self .is_running :
285
294
self .logger .error ('Timeout exceeded while attempting to close for ' + str (self .timeout_interval ) + ' ms.' )
0 commit comments