Skip to content

Producer timeouts #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 21, 2015
Merged

Conversation

orls
Copy link

@orls orls commented Sep 18, 2015

Support setting timeouts to all produce variants.

Also fixes a bug with an uninitialised obj in the CONFIRM_EXTENDED support

@EVODelavega
Copy link
Owner

Thanks for the PR... I'll go ahead and merge it in already. If you find the time, would you mind awfully looking into the refactored code base (https://github.com/EVODelavega/phpkafka/tree/feature/topic-objects/kafka_clean) and see how we can fit this in there? I'd be tempted to store the timeout value on the object itself, and allow changing it through a setter (setTimeout()), and/or pass it to the relevant methods on the KafkaTopic and/or KafkaQueue objects

EVODelavega added a commit that referenced this pull request Sep 21, 2015
@EVODelavega EVODelavega merged commit 3f24ebb into EVODelavega:master Sep 21, 2015
@orls
Copy link
Author

orls commented Sep 21, 2015

If I find the time I'll take a look...but don't count on it 😆 I'm not much of a C programmer; this was a quick fix I needed.

I've since decided to go with a pure-php library for now (better timeout control, error handling, debuggability etc) until the rewrite is done & i can re-evaluate.

It's worth noting that librdkafka has a bunch of other tunables; quite a few are probably worth exposing.

@EVODelavega
Copy link
Owner

@oris: I know librdkafka offers a lot more tweaking through its rd_kafka_config_t type (and all other config structs). The third stage of the refactoring will probably focus on exposing a number of these parameters. As of now, I'm still not familiar enough with the entire lib to expose everything. Some of the config params will probably never get exposed, because PHP is, at the end of the day, still a stateless runtime, and I wouldn't want this extension to be able to block the RSHUTDOWN sequence.

As for finding the time to look into all of this: I know the feeling 😆

@edenhill
Copy link

@EVODelavega May I suggest that you implement a pass-through config interface that allows users of your PHP bindings to set any librdkafka configuration property?
This allows users to upgrade only librdkafka to acquire new functionality without also needing to upgrade the PHP bindings.
One example is SSL support which was added without any changes to the API but only by adding new configuration properties.

@EVODelavega
Copy link
Owner

@edenhill That's the direction I want this extension to go in. What I want to end up with is a structure like this:

Kafka(string brokers, array config) <-- this would actually be the rd_kafka_conf_t inteface

Kafka::getTopic(string topic, int mode) <-- this holds the rd_kafka_topic_t, the config, actually opens a connection and holds the topic meta. Depending on the mode, the KafkaTopic instance is a producer or a consumer

KafkaTopic::produceBatch(array messages, ...) & KafkaTopic::consumeBatch(...) These methods will return a queue (KafkaQueue), which would, in turn be configurable to some degree. I might split up these classes into KafkaProducer and KafkaConsumer, each linked to a specific topic, for clarity. Same goes for the Queue instances I'm returning.

The way shared resources (rd_kafka_t and its config, meta structs etc...) are managed needs more thought than I had originally put into it. It's more than likely (in fact, it's a must) to share a connection, its config and the metadata over three instances. Exposing the config is really quite risky at times, eg:

$kafka = new Kafka($broker, $someConf);
$produceFoo = $kafka->getTopic('foo', Kafka::MODE_PRODUCER);
$secondProducer = $kafka->getTopic('foo', Kafka::MODE_PRODUCER);
$consumeFoo = $kafka->getTopic('foo', Kafka::MODE_CONSUMER);

All three KafkaTopic instances should share the same meta struct, and the first two, both being producers should share the same resources throughout, but if the user does something like this:

$queue = $produceFoo->produceBatch(
    $hugeArray,
    [
        "queue.buffering.max.messages" => count($hugeArray),
        "queue.buffering.max.ms"            => count($hugeArray)*100,//100ms per msg
    ]
);//async produce call
$secondProducer->produce("simple msg", ["queue.buffering.max.ms" => 1000]);

That might block PHP's RSHUTDOWN sequence, if the connection was set up to empty its buffers before closing...

I hope all of this dribble makes sense. I'll have some spare time on my hands over the next couple of days, and I'm planning to work some more on this project this weekend.

@edenhill
Copy link

Since a single rd_kafka_t handle is bound to either PRODUCER of CONSUMER mode I'd suggest you to make the same interface in PHP, e.g.:

   $consumer = new Kafka::Consumer($brokers, $someConf);
   $consumer->getTopic(...);
   while ($msg = $consumer->poll()) { ... }
   }

In about a month's time I'll be releasing the new consumer API that is somewhat (but not completely!) in line with the new official java consumer (http://kafka.apache.org/083/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html).
This will add support for balanced consumer groups and broker based offset storage.
The new consumer interface will look something like this (in pseudo-PHP but I'm actually describing the C interface):

   $consumer = new Kafka::Consumer($brokers, $someConf);
   // Set default topic config to use for automatically subscribed topics
   $consumer->set_default_topic_conf($default_topic_conf);
   // Subscribe to a bunch of topics, either specific ones (no "^.." prefix) or by a topic name regex.
   $consumer->subscribe(["topic1", "^topic2_regex_\d+", "topic3"]);
   // Get messages, will serve all subscribed topics and return messages for the assigned partitions.
   while (msg = $consumer->poll()) { ... }
   // Close consumer, allowing 5s to commit offsets, etc.
   $consumer->close(5000);

@edenhill
Copy link

cc @rtyler @yungchin on the previous comment

@orls
Copy link
Author

orls commented Sep 24, 2015

I might split up these classes into KafkaProducer and KafkaConsumer
...
I'd suggest you to make the same interface in PHP

I'd strongly encourage this too; from the PHP side, the current extensions's 'mixed-mode' Kafka instance was a bit of a stumbling block for devs in my use-case. So far, we're preferring the explicit Consumer/Producer API separations in kafka-php -- you might want to mimic/emulate their API a bit (speaking selfishly, if you're feeling really adventurous, total compatibility with their API would be awesome for me :D)

@EVODelavega
Copy link
Owner

@orls Splitting up the consumer/producer modes is something that I'm definitely considering. Not doing so from the start was indeed a mistake. It would be nice to be able to write something line:

public function foo(KafkaProducer $producer)
{
}

and not having to write (as is the case now):

public function foo(Kafka $conn)
{
    if ($conn->getMode() !== Kafka::MODE_PRODUCER) {
        throw new \InvalidArgumentException('Connection is not in producer mode');
    }
}

I'll certainly look into the kafka-php API for ideas, but 100% compatibility is probably something that's never going to happen. When I've finished the refactor (which is actually more like a redesign in drag anyway), I might look into writing a compatibility wrapper, though.

@edenhill Is there a branch where I can get a preview of the new consumer API, because that API would map to PHP quite easily, especially in this case. Setting a default topic config on rd_kafka_t looks like it fits right in with my idea of having the main Kafka class(es) act as connection factories. That would make the API of this extension more intuitive. It'd also help the development of this project if the way the extension maps onto your lib isn't as far apart as it is now 😄

Anyway, thanks for the input, I've got my work cut out for me the next couple of days

@edenhill
Copy link

@EVODelavega It is currently work in progress, I'll probably push a functional but not final branch within a week or two.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants