Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions fbclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,9 @@ def __init__(self, config: Config, start_wait: float = 15.):
if not isinstance(self._update_processor, NullUpdateProcessor):
log.info("FB Python SDK: Waiting for Client initialization in %s seconds" % str(start_wait))

update_processor_ready.wait(start_wait)
if isinstance(self._data_storage, NullDataStorage) or (not self._data_storage.initialized and not self._config.is_offline):
log.warning("FB Python SDK: SDK just returns default variation because of no data found in the given environment")

update_processor_ready.wait(start_wait)
if not self._update_processor.initialized:
log.warning("FB Python SDK: SDK was not successfully initialized")
else:
Expand Down
34 changes: 23 additions & 11 deletions fbclient/event_processor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from concurrent.futures import ThreadPoolExecutor
from queue import Empty, Queue
from threading import BoundedSemaphore, Lock, Thread
from threading import BoundedSemaphore, Condition, Lock, Thread
from typing import List, Optional

from fbclient.common_types import FBEvent
Expand Down Expand Up @@ -76,7 +76,7 @@ def stop(self):
log.info('FB Python SDK: event processor is stopping')
self.__closed = True
self.__flush_task.stop()
self.flush()
self.__put_message_async(MessageType.FLUSH)
self.__put_message_and_wait_terminate(MessageType.SHUTDOWN)


Expand All @@ -94,6 +94,7 @@ def __init__(self, config: Config, sender: Sender, inbox: "Queue[EventMessage]")
self.__events_buffer_to_next_flush = []
self.__flush_workers = ThreadPoolExecutor(max_workers=self.__MAX_FLUSH_WORKERS_NUMBER)
self.__permits = BoundedSemaphore(value=self.__MAX_FLUSH_WORKERS_NUMBER)
self.__lock = Condition(Lock())

# blocks until at least one message is available and then:
# 1: transfer the events to event buffer
Expand Down Expand Up @@ -137,6 +138,11 @@ def __put_events_to_buffer(self, event: FBEvent):
self.__events_buffer_to_next_flush.append(event)

def __trigger_flush(self):
def flush_payload_done(fn):
self.__permits.release()
with self.__lock:
self.__lock.notify_all()

if not self.__closed and len(self.__events_buffer_to_next_flush) > 0:
log.debug('trigger flush')
# get all the current events from event buffer
Expand All @@ -146,21 +152,27 @@ def __trigger_flush(self):
# get an available flush worker to send events
self.__flush_workers \
.submit(FlushPayloadRunner(self.__config, self.__sender, payloads).run) \
.add_done_callback(lambda x: self.__permits.release())
.add_done_callback(flush_payload_done)
# clear the buffer for the next flush
self.__events_buffer_to_next_flush.clear()
# if no available flush worker, keep the events in the buffer

def __shutdown(self):
if not self.__closed:
try:
log.debug('event dispatcher is cleaning up thread and conn pool')
self.__closed = True
log.debug('flush worker pool is stopping...')
self.__flush_workers.shutdown(wait=True)
self.__sender.stop()
except Exception as e:
log.exception('FB Python SDK: unexpected error when closing event dispatcher: %s' % str(e))
with self.__lock:
try:
log.debug('event dispatcher is cleaning up thread and conn pool')
self.__wait_until_flush_playload_worker_down()
self.__closed = True
log.debug('flush worker pool is stopping...')
self.__flush_workers.shutdown(wait=True)
self.__sender.stop()
except Exception as e:
log.exception('FB Python SDK: unexpected error when closing event dispatcher: %s' % str(e))

def __wait_until_flush_playload_worker_down(self):
while self.__permits._value != self.__MAX_FLUSH_WORKERS_NUMBER:
self.__lock.wait()


class FlushPayloadRunner:
Expand Down
2 changes: 1 addition & 1 deletion fbclient/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = "1.1.1"
VERSION = "1.1.2"