Skip to content

Commit 5a86d02

Browse files
committed
Apply style fix
1 parent ea444a3 commit 5a86d02

File tree

8 files changed

+993
-328
lines changed

8 files changed

+993
-328
lines changed

src/confluent_kafka/src/Consumer.c

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ static int Consumer_traverse(Handle *self, visitproc visit, void *arg) {
116116
****************************************************************************/
117117

118118

119-
static PyObject *Consumer_subscribe(Handle *self, PyObject *args,
120-
PyObject *kwargs) {
119+
static PyObject *
120+
Consumer_subscribe(Handle *self, PyObject *args, PyObject *kwargs) {
121121

122122
rd_kafka_topic_partition_list_t *topics;
123123
static char *kws[] = {"topics", "on_assign", "on_revoke", "on_lost",
@@ -970,7 +970,7 @@ Consumer_offsets_for_times(Handle *self, PyObject *args, PyObject *kwargs) {
970970

971971
/**
972972
* @brief Poll for a single message from the subscribed topics.
973-
*
973+
*
974974
* Instead of a single blocking call to rd_kafka_consumer_poll() with the
975975
* full timeout, this function:
976976
* 1. Splits the timeout into 200ms chunks
@@ -988,14 +988,13 @@ Consumer_offsets_for_times(Handle *self, PyObject *args, PyObject *kwargs) {
988988
* @return PyObject* Message object, None if timeout, or NULL on error
989989
* (raises KeyboardInterrupt if signal detected)
990990
*/
991-
static PyObject *Consumer_poll(Handle *self, PyObject *args,
992-
PyObject *kwargs) {
993-
double tmout = -1.0f;
994-
static char *kws[] = {"timeout", NULL};
991+
static PyObject *Consumer_poll(Handle *self, PyObject *args, PyObject *kwargs) {
992+
double tmout = -1.0f;
993+
static char *kws[] = {"timeout", NULL};
995994
rd_kafka_message_t *rkm = NULL;
996995
PyObject *msgobj;
997996
CallState cs;
998-
const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */
997+
const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */
999998
int total_timeout_ms;
1000999
int chunk_timeout_ms;
10011000
int chunk_count = 0;
@@ -1021,15 +1020,16 @@ static PyObject *Consumer_poll(Handle *self, PyObject *args,
10211020
} else {
10221021
while (1) {
10231022
/* Calculate timeout for this chunk */
1024-
chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count,
1025-
CHUNK_TIMEOUT_MS);
1023+
chunk_timeout_ms = calculate_chunk_timeout(
1024+
total_timeout_ms, chunk_count, CHUNK_TIMEOUT_MS);
10261025
if (chunk_timeout_ms == 0) {
10271026
/* Timeout expired */
10281027
break;
10291028
}
10301029

10311030
/* Poll with chunk timeout */
1032-
rkm = rd_kafka_consumer_poll(self->rk, chunk_timeout_ms);
1031+
rkm =
1032+
rd_kafka_consumer_poll(self->rk, chunk_timeout_ms);
10331033

10341034
/* If we got a message, exit the loop */
10351035
if (rkm) {
@@ -1099,7 +1099,7 @@ Consumer_memberid(Handle *self, PyObject *args, PyObject *kwargs) {
10991099
* Instead of a single blocking call to rd_kafka_consume_batch_queue() with the
11001100
* full timeout, this function:
11011101
* 1. Splits the timeout into 200ms chunks
1102-
* 2. Calls rd_kafka_consume_batch_queue() with chunk timeout
1102+
* 2. Calls rd_kafka_consume_batch_queue() with chunk timeout
11031103
* 3. Between chunks, re-acquires GIL and calls PyErr_CheckSignals()
11041104
* 4. If signal detected, returns NULL (raises KeyboardInterrupt)
11051105
* 5. Continues until messages received, timeout expired, or signal detected.
@@ -1111,11 +1111,11 @@ Consumer_memberid(Handle *self, PyObject *args, PyObject *kwargs) {
11111111
* consume per call. Default: 1. Maximum: 1000000.
11121112
* - timeout (float, optional): Timeout in seconds.
11131113
* Default: -1.0 (infinite timeout)
1114-
* @return PyObject* List of Message objects, empty list if timeout, or NULL on error
1115-
* (raises KeyboardInterrupt if signal detected)
1114+
* @return PyObject* List of Message objects, empty list if timeout, or NULL on
1115+
* error (raises KeyboardInterrupt if signal detected)
11161116
*/
1117-
static PyObject *Consumer_consume(Handle *self, PyObject *args,
1118-
PyObject *kwargs) {
1117+
static PyObject *
1118+
Consumer_consume(Handle *self, PyObject *args, PyObject *kwargs) {
11191119
unsigned int num_messages = 1;
11201120
double tmout = -1.0f;
11211121
static char *kws[] = {"num_messages", "timeout", NULL};
@@ -1124,7 +1124,7 @@ static PyObject *Consumer_consume(Handle *self, PyObject *args,
11241124
rd_kafka_queue_t *rkqu = self->u.Consumer.rkqu;
11251125
CallState cs;
11261126
Py_ssize_t i, n = 0;
1127-
const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */
1127+
const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */
11281128
int total_timeout_ms;
11291129
int chunk_timeout_ms;
11301130
int chunk_count = 0;
@@ -1160,37 +1160,40 @@ static PyObject *Consumer_consume(Handle *self, PyObject *args,
11601160
* ThreadPool. Only use wakeable poll for
11611161
* blocking calls that need to be interruptible. */
11621162
if (total_timeout_ms >= 0 && total_timeout_ms < CHUNK_TIMEOUT_MS) {
1163-
n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, total_timeout_ms,
1164-
rkmessages, num_messages);
1163+
n = (Py_ssize_t)rd_kafka_consume_batch_queue(
1164+
rkqu, total_timeout_ms, rkmessages, num_messages);
11651165

11661166
if (n < 0) {
11671167
/* Error - need to restore GIL before setting error */
11681168
PyEval_RestoreThread(cs.thread_state);
11691169
free(rkmessages);
1170-
cfl_PyErr_Format(rd_kafka_last_error(),
1171-
"%s", rd_kafka_err2str(rd_kafka_last_error()));
1170+
cfl_PyErr_Format(
1171+
rd_kafka_last_error(), "%s",
1172+
rd_kafka_err2str(rd_kafka_last_error()));
11721173
return NULL;
11731174
}
11741175
} else {
11751176
while (1) {
11761177
/* Calculate timeout for this chunk */
1177-
chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count,
1178-
CHUNK_TIMEOUT_MS);
1178+
chunk_timeout_ms = calculate_chunk_timeout(
1179+
total_timeout_ms, chunk_count, CHUNK_TIMEOUT_MS);
11791180
if (chunk_timeout_ms == 0) {
11801181
/* Timeout expired */
11811182
break;
11821183
}
11831184

11841185
/* Consume with chunk timeout */
1185-
n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, chunk_timeout_ms,
1186-
rkmessages, num_messages);
1186+
n = (Py_ssize_t)rd_kafka_consume_batch_queue(
1187+
rkqu, chunk_timeout_ms, rkmessages, num_messages);
11871188

11881189
if (n < 0) {
1189-
/* Error - need to restore GIL before setting error */
1190+
/* Error - need to restore GIL before setting
1191+
* error */
11901192
PyEval_RestoreThread(cs.thread_state);
11911193
free(rkmessages);
1192-
cfl_PyErr_Format(rd_kafka_last_error(),
1193-
"%s", rd_kafka_err2str(rd_kafka_last_error()));
1194+
cfl_PyErr_Format(
1195+
rd_kafka_last_error(), "%s",
1196+
rd_kafka_err2str(rd_kafka_last_error()));
11941197
return NULL;
11951198
}
11961199

src/confluent_kafka/src/Producer.c

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ Producer_produce(Handle *self, PyObject *args, PyObject *kwargs) {
339339

340340
/**
341341
* @brief Poll for producer events with wakeable pattern for interruptibility.
342-
*
342+
*
343343
* This function:
344344
* 1. Splits the timeout into 200ms chunks
345345
* 2. Calls rd_kafka_poll() with chunk timeout
@@ -349,14 +349,14 @@ Producer_produce(Handle *self, PyObject *args, PyObject *kwargs) {
349349
*
350350
* @param self Producer handle
351351
* @param tmout Timeout in milliseconds (-1 for infinite)
352-
* @returns -1 if callback crashed, signal detected, or poll() failed, else the number
353-
* of events served.
352+
* @returns -1 if callback crashed, signal detected, or poll() failed, else the
353+
* number of events served.
354354
*/
355355
static int Producer_poll0(Handle *self, int tmout) {
356356
int r = 0;
357357
CallState cs;
358-
const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */
359-
int total_timeout_ms = tmout;
358+
const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */
359+
int total_timeout_ms = tmout;
360360
int chunk_timeout_ms;
361361
int chunk_count = 0;
362362

@@ -371,27 +371,28 @@ static int Producer_poll0(Handle *self, int tmout) {
371371
} else {
372372
while (1) {
373373
/* Calculate timeout for this chunk */
374-
chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count,
375-
CHUNK_TIMEOUT_MS);
374+
chunk_timeout_ms = calculate_chunk_timeout(
375+
total_timeout_ms, chunk_count, CHUNK_TIMEOUT_MS);
376376
if (chunk_timeout_ms == 0) {
377377
/* Timeout expired */
378378
break;
379379
}
380380

381381
/* Poll with chunk timeout */
382-
int chunk_result = rd_kafka_poll(self->rk, chunk_timeout_ms);
382+
int chunk_result =
383+
rd_kafka_poll(self->rk, chunk_timeout_ms);
383384
/* Error from poll */
384385
if (chunk_result < 0) {
385386
r = chunk_result;
386387
break;
387388
}
388-
r += chunk_result; /* Accumulate events processed */
389+
r += chunk_result; /* Accumulate events processed */
389390

390391
chunk_count++;
391392

392393
/* Check for signals between chunks */
393394
if (check_signals_between_chunks(self, &cs)) {
394-
return -1; /* Signal detected */
395+
return -1; /* Signal detected */
395396
}
396397
}
397398
}
@@ -426,8 +427,9 @@ static PyObject *Producer_poll(Handle *self, PyObject *args, PyObject *kwargs) {
426427

427428

428429
/**
429-
* @brief Flush all messages in the producer queue with wakeable pattern for interruptibility.
430-
*
430+
* @brief Flush all messages in the producer queue with wakeable pattern for
431+
* interruptibility.
432+
*
431433
* Instead of a single blocking call to rd_kafka_flush() with the
432434
* full timeout, this function:
433435
* 1. Splits the timeout into 200ms chunks
@@ -451,7 +453,7 @@ Producer_flush(Handle *self, PyObject *args, PyObject *kwargs) {
451453
static char *kws[] = {"timeout", NULL};
452454
rd_kafka_resp_err_t err;
453455
CallState cs;
454-
const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */
456+
const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */
455457
int total_timeout_ms;
456458
int chunk_timeout_ms;
457459
int chunk_count = 0;
@@ -474,15 +476,16 @@ Producer_flush(Handle *self, PyObject *args, PyObject *kwargs) {
474476
if (total_timeout_ms >= 0 && total_timeout_ms < CHUNK_TIMEOUT_MS) {
475477
err = rd_kafka_flush(self->rk, total_timeout_ms);
476478
} else {
477-
/* For infinite timeout, we need to keep looping and checking for signals.
478-
* rd_kafka_flush() waits for messages that were in the queue when it's called.
479-
* When flush() returns NO_ERROR, it means all messages that were queued at
480-
* that point have been delivered.
481-
* Note: Messages produced after flush() starts are not included in the current flush. */
479+
/* For infinite timeout, we need to keep looping and checking
480+
* for signals. rd_kafka_flush() waits for messages that were in
481+
* the queue when it's called. When flush() returns NO_ERROR, it
482+
* means all messages that were queued at that point have been
483+
* delivered. Note: Messages produced after flush() starts are
484+
* not included in the current flush. */
482485
while (1) {
483486
/* Calculate timeout for this chunk */
484-
chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count,
485-
CHUNK_TIMEOUT_MS);
487+
chunk_timeout_ms = calculate_chunk_timeout(
488+
total_timeout_ms, chunk_count, CHUNK_TIMEOUT_MS);
486489
if (chunk_timeout_ms == 0) {
487490
/* Timeout expired */
488491
err = RD_KAFKA_RESP_ERR__TIMED_OUT;
@@ -492,10 +495,11 @@ Producer_flush(Handle *self, PyObject *args, PyObject *kwargs) {
492495
/* Flush with chunk timeout */
493496
err = rd_kafka_flush(self->rk, chunk_timeout_ms);
494497

495-
/* Always check for signals between chunks (critical for interruptibility) */
498+
/* Always check for signals between chunks (critical for
499+
* interruptibility) */
496500
chunk_count++;
497501
if (check_signals_between_chunks(self, &cs)) {
498-
return NULL; /* Signal detected */
502+
return NULL; /* Signal detected */
499503
}
500504

501505
if (err == RD_KAFKA_RESP_ERR_NO_ERROR) {

src/confluent_kafka/src/confluent_kafka.h

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -545,26 +545,29 @@ static CFL_UNUSED CFL_INLINE int cfl_timeout_ms(double tmout) {
545545
* - Infinite timeouts (-1) use the chunk size repeatedly
546546
* - Finite timeouts are properly divided and don't exceed the total
547547
* - The final chunk uses any remaining time (may be < chunk_size)
548-
548+
549549
*
550550
* @param total_timeout_ms Total timeout in milliseconds (-1 for infinite)
551551
* @param chunk_count Current chunk iteration count (0-based)
552552
* @param chunk_timeout_ms Chunk size in milliseconds (200ms by default)
553553
* @return int Chunk timeout in milliseconds, or 0 if total timeout expired
554554
*/
555-
static CFL_UNUSED CFL_INLINE int calculate_chunk_timeout(int total_timeout_ms, int chunk_count,
556-
int chunk_timeout_ms) {
555+
static CFL_UNUSED CFL_INLINE int calculate_chunk_timeout(int total_timeout_ms,
556+
int chunk_count,
557+
int chunk_timeout_ms) {
557558
if (total_timeout_ms < 0) {
558559
/* Infinite timeout - use chunk size */
559560
return chunk_timeout_ms;
560561
} else {
561562
/* Finite timeout - calculate remaining */
562-
int remaining_ms = total_timeout_ms - (chunk_count * chunk_timeout_ms);
563+
int remaining_ms =
564+
total_timeout_ms - (chunk_count * chunk_timeout_ms);
563565
if (remaining_ms <= 0) {
564566
/* Timeout expired */
565567
return 0;
566568
}
567-
return (remaining_ms < chunk_timeout_ms) ? remaining_ms : chunk_timeout_ms;
569+
return (remaining_ms < chunk_timeout_ms) ? remaining_ms
570+
: chunk_timeout_ms;
568571
}
569572
}
570573

@@ -581,27 +584,31 @@ static CFL_UNUSED CFL_INLINE int calculate_chunk_timeout(int total_timeout_ms, i
581584
* - SIGINT (Ctrl+C): Raises KeyboardInterrupt exception
582585
* - SIGTERM: Can raise SystemExit or be handled by user code
583586
* - Other signals: If the user has registered handlers via Python's `signal`
584-
* module, those will also be checked (e.g., signal.signal(signal.SIGUSR1, handler)).
585-
* User code will need to handle these signals accordingly.
587+
* module, those will also be checked (e.g., signal.signal(signal.SIGUSR1,
588+
* handler)). User code will need to handle these signals accordingly.
586589
*
587590
*
588591
* @param self Handle (Producer or Consumer)
589592
* @param cs CallState structure (thread state will be updated)
590-
* @return int 0 if no signal detected (continue), 1 if signal detected (should return NULL)
593+
* @return int 0 if no signal detected (continue), 1 if signal detected (should
594+
* return NULL)
591595
*/
592-
static CFL_UNUSED CFL_INLINE int check_signals_between_chunks(Handle *self, CallState *cs) {
596+
static CFL_UNUSED CFL_INLINE int check_signals_between_chunks(Handle *self,
597+
CallState *cs) {
593598
/* Re-acquire GIL */
594599
PyEval_RestoreThread(cs->thread_state);
595600

596601
/* Check for pending signals (KeyboardInterrupt, etc.) */
597602
/* PyErr_CheckSignals() already set the exception */
598603
if (PyErr_CheckSignals() == -1) {
599-
/* Note: GIL is already held, but CallState_end expects to restore it */
600-
/* Save thread state again so CallState_end can restore it properly */
604+
/* Note: GIL is already held, but CallState_end expects to
605+
* restore it */
606+
/* Save thread state again so CallState_end can restore it
607+
* properly */
601608
cs->thread_state = PyEval_SaveThread();
602609
if (!CallState_end(self, cs)) {
603610
/* CallState_end detected signal and cleaned up */
604-
return 1; /* Signal detected */
611+
return 1; /* Signal detected */
605612
}
606613
return 1;
607614
}

tests/integration/consumer/test_consumer_wakeable_poll_consume.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,14 @@ def test_poll_message_delivery_with_wakeable_pattern(kafka_cluster):
3535
producer.flush(timeout=1.0)
3636

3737
# Create consumer with wakeable poll pattern settings
38-
consumer_conf = kafka_cluster.client_conf({
39-
'group.id': 'test-poll-message-available',
40-
'socket.timeout.ms': 100,
41-
'session.timeout.ms': 6000,
42-
'auto.offset.reset': 'earliest'
43-
})
38+
consumer_conf = kafka_cluster.client_conf(
39+
{
40+
'group.id': 'test-poll-message-available',
41+
'socket.timeout.ms': 100,
42+
'session.timeout.ms': 6000,
43+
'auto.offset.reset': 'earliest',
44+
}
45+
)
4446
consumer = TestConsumer(consumer_conf)
4547
consumer.subscribe([topic])
4648

@@ -77,12 +79,14 @@ def test_consume_message_delivery_with_wakeable_pattern(kafka_cluster):
7779
producer.flush(timeout=1.0)
7880

7981
# Create consumer with wakeable poll pattern settings
80-
consumer_conf = kafka_cluster.client_conf({
81-
'group.id': 'test-consume-messages-available',
82-
'socket.timeout.ms': 100,
83-
'session.timeout.ms': 6000,
84-
'auto.offset.reset': 'earliest'
85-
})
82+
consumer_conf = kafka_cluster.client_conf(
83+
{
84+
'group.id': 'test-consume-messages-available',
85+
'socket.timeout.ms': 100,
86+
'session.timeout.ms': 6000,
87+
'auto.offset.reset': 'earliest',
88+
}
89+
)
8690
consumer = TestConsumer(consumer_conf)
8791
consumer.subscribe([topic])
8892

@@ -106,8 +110,7 @@ def test_consume_message_delivery_with_wakeable_pattern(kafka_cluster):
106110
assert msg.value() is not None, f"Message {i} has no value"
107111
# Verify we got the expected messages
108112
expected_value = f'test-message-{i}'.encode()
109-
expected_msg = (f"Message {i} value mismatch: expected {expected_value}, "
110-
f"got {msg.value()}")
113+
expected_msg = f"Message {i} value mismatch: expected {expected_value}, " f"got {msg.value()}"
111114
assert msg.value() == expected_value, expected_msg
112115

113116
consumer.close()

0 commit comments

Comments
 (0)