Skip to content

Commit

Permalink
fix: improve performance with batch
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanfaircloth committed Aug 31, 2023
1 parent e0d0b0e commit 78e44e2
Showing 1 changed file with 13 additions and 15 deletions.
28 changes: 13 additions & 15 deletions microsoft_azure_eventhub_source/LogSourcePlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def add_fields(self, log_record, record, message_dict):

# logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.setLevel(logging.WARNING)

logHandler = logging.StreamHandler()
formatter = CustomJsonFormatter()
Expand Down Expand Up @@ -129,18 +129,15 @@ async def run_async(self):
)
logger.info("client connected")
async with client:
while not self._cancelled:
logger.info("waiting on batch")
await client.receive_batch(
on_event_batch=self.on_event_batch,
max_wait_time=10,
starting_position="-1", # "-1" is from the beginning of the partition.
max_batch_size=300,
prefetch=1000,
track_last_enqueued_event_properties=True,
)
logger.info("ehs: run will sleep")
await asyncio.sleep(1)
logger.info("waiting on batch")
await client.receive_batch(
on_event_batch=self.on_event_batch,
max_wait_time=5,
starting_position="-1", # "-1" is from the beginning of the partition.
max_batch_size=300,
prefetch=1000,
track_last_enqueued_event_properties=True,
)

async def on_event_batch(
self, partition_context: PartitionContext, event_batch: EventData
Expand Down Expand Up @@ -173,10 +170,11 @@ async def on_event_batch(
self.post_message(record_lmsg)
else:
logger.debug(event_obj)

except Exception as argument:
logger.exception(argument)
# exit(0)
await partition_context.update_checkpoint()
if len(event_batch) > 0:
await partition_context.update_checkpoint()


def main():
Expand Down

0 comments on commit 78e44e2

Please sign in to comment.