Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscriptions seem to fail until after a service restart #2269

Open
Jarrah-libremfg opened this issue Nov 12, 2024 · 7 comments
Open

Subscriptions seem to fail until after a service restart #2269

Jarrah-libremfg opened this issue Nov 12, 2024 · 7 comments
Labels
bug Something isn't working ingress_kafka

Comments

@Jarrah-libremfg
Copy link

Description

When using Restate subscriptions with the Restate Go SDK messages don't seem to trigger
execution until a restart of the deployed application.

Reproduction

  1. Start Restate and connect it to a Kafka broker
  2. Start a Restate application and register the deployment
  3. Create a subscription, in my case using the following request body:
    {
    	"source": "kafka://redpanda/message-start",
    	"sink": "service://message_start_service/Handle",
    	"options": {}
    }
    
  4. Send a message to Kafka on the above topic.
  5. Note that the message_start_service/Handle function is never triggered.
  6. Restart the application and send another message. Note that the function triggers.

Attempted fixes

In my case I have a test that does all of these steps automatically. I've tried adding pauses
into the system to ensure messages aren't being sent too early. I've also tried sending multiple
messages in case the first one always gets skipped. Neither of these have resolved the issue.

If I run the test twice without rebuilding the restate/Kafka deployment (effectively step 2 and down)
everything works perfectly.

@slinkydeveloper
Copy link
Contributor

Do you have more logs of the restate-server? You can get more logs of the kafka component using the following env variable: RUST_LOG="info,restate_ingress_kafka=trace".

@slinkydeveloper slinkydeveloper added bug Something isn't working ingress_kafka labels Nov 12, 2024
@slinkydeveloper
Copy link
Contributor

Perhaps could the problem be that you're trying to create a subscription before the service is actually registered? In that case, the subscription creating will return back an error.

@Jarrah-libremfg
Copy link
Author

Jarrah-libremfg commented Nov 12, 2024

Do you have more logs of the restate-server? You can get more logs of the kafka component using the following env variable: RUST_LOG="info,restate_ingress_kafka=trace".

Even with that there really aren't a lot of relevant logs. The following is out of a test run. There's a bunch of info level registration logs above.

[runtime]  | 2024-11-12T21:57:07.924005Z INFO restate_invoker_impl::invocation_task::service_protocol_runner
[runtime]  |   Executing invocation at deployment
[runtime]  |     invocation.id: inv_17UMsllpxV0Q2QAaOy3gzSuAQKiG4h7wUp
[runtime]  |     deployment.address: http://172.18.0.1:29080/
[runtime]  |     deployment.service_protocol_version: 1
[runtime]  |     path: /invoke/WorkflowSpecification/Set
[runtime]  | on rt:pp-6
[runtime]  | 2024-11-12T21:57:07.948503Z INFO restate_ingress_http::handler::service_handler
[runtime]  |   Processing ingress request
[runtime]  | on rs:ingress-16
[runtime]  |   in restate_ingress_http::handler::service_handler::ingress
[runtime]  |     restate.invocation.id: inv_17UMsllpxV0Q3w5MYku7MqOADl68iEpGg1
[runtime]  |     restate.invocation.target: message_start/{key}/Enable
[runtime]  | 2024-11-12T21:57:07.954056Z INFO restate_invoker_impl::invocation_task::service_protocol_runner
[runtime]  |   Executing invocation at deployment
[runtime]  |     invocation.id: inv_17UMsllpxV0Q3w5MYku7MqOADl68iEpGg1
[runtime]  |     deployment.address: http://172.18.0.1:29080/
[runtime]  |     deployment.service_protocol_version: 1
[runtime]  |     path: /invoke/message_start/Enable
[runtime]  | on rt:pp-6
[runtime]  | 2024-11-12T21:57:07.967561Z DEBUG restate_ingress_kafka::subscription_controller::task_orchestrator
[runtime]  |   Spawning the consumer task for subscription id sub_16hfvhIEXwevdkyKeNrv1Vn
[runtime]  | on rs:worker-13
[runtime]  | 2024-11-12T21:57:07.967672Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Starting consumer for topics ["message-start"] with configuration ClientConfig { conf_map: {"enable.auto.commit": "true", "enable.auto.offset.store": "false", "group.id": "sub_16hfvhIEXwevdkyKeNrv1Vn", "client.id": "restate", "metadata.broker.list": "PLAINTEXT://redpanda:29092"}, log_level: Debug }
[runtime]  | on rs:worker-13

If I create a listener to that Kafka topic I would see a message on the above topic shortly after that subscription is created, but there's nothing in the logs or execution of the service that would indicate it was received by the Restate consumer.

Perhaps could the problem be that you're trying to create a subscription before the service is actually registered? In that case, the subscription creating will return back an error.

Not unless the deployment process can take longer than 2 seconds after the HTTP call replies. The very first thing the test code does is start and register the service. I've added pauses in in case there is some delay after registration with no change.

It's probably worth noting that I call other parts of the service to load data in before creating the subscription and that succeeds. So Restate absolutely knows it's there and how to call it.

@Jarrah-libremfg
Copy link
Author

I've added in a bunch of my own logging to try to work out where the issue is. It would seem the first time no message comes from the rdkafka::consumer::StreamConsumer.

Failing logs:

[runtime]  | 2024-11-13T04:23:08.070524Z INFO restate_invoker_impl::invocation_task::service_protocol_runner
[runtime]  |   Executing invocation at deployment
[runtime]  |     invocation.id: inv_17UMsllpxV0Q5cfVpSh5n187gfEkFaMKpX
[runtime]  |     deployment.address: http://172.18.0.1:29080/
[runtime]  |     deployment.service_protocol_version: 1
[runtime]  |     path: /invoke/message_start/Enable
[runtime]  | on rt:pp-6
[runtime]  | 2024-11-13T04:23:08.125171Z DEBUG restate_ingress_kafka::subscription_controller::task_orchestrator
[runtime]  |   Spawning the consumer task for subscription id sub_12x4St13qmf4QBVUXCEcdjj
[runtime]  | on rs:worker-0
[runtime]  | 2024-11-13T04:23:08.125240Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Starting consumer for topics ["message-start"] with configuration ClientConfig { conf_map: {"client.id": "restate", "metadata.broker.list": "PLAINTEXT://redpanda:29092", "group.id": "sub_12x4St13qmf4QBVUXCEcdjj", "enable.auto.commit": "true", "enable.auto.offset.store": "false"}, log_level: Debug }
[runtime]  | on rs:worker-0
[runtime]  | 2024-11-13T04:23:08.126405Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Creating consumer with topics [
[runtime]  |     "message-start",
[runtime]  | ]
[runtime]  | on rs:worker-0
[runtime]  | 2024-11-13T04:23:08.126533Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Starting listener loop for topics: [
[runtime]  |     "message-start",
[runtime]  | ]
[runtime]  | on rs:worker-0

Exactly the same test run a second time results in:

runtime]  | 2024-11-13T04:24:05.121916Z INFO restate_invoker_impl::invocation_task::service_protocol_runner
[runtime]  |   Executing invocation at deployment
[runtime]  |     invocation.id: inv_17UMsllpxV0Q02UYrhb54uozzoaMAjTxqV
[runtime]  |     deployment.address: http://172.18.0.1:29080/
[runtime]  |     deployment.service_protocol_version: 1
[runtime]  |     path: /invoke/message_start/Enable
[runtime]  | on rt:pp-6
[runtime]  | 2024-11-13T04:24:05.133712Z DEBUG restate_ingress_kafka::subscription_controller::task_orchestrator
[runtime]  |   Spawning the consumer task for subscription id sub_14NOwbMfb9oSsTPpLxEYhJn
[runtime]  | on rs:worker-10
[runtime]  | 2024-11-13T04:24:05.133772Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Starting consumer for topics ["message-start"] with configuration ClientConfig { conf_map: {"metadata.broker.list": "PLAINTEXT://redpanda:29092", "group.id": "sub_14NOwbMfb9oSsTPpLxEYhJn", "client.id": "restate", "enable.auto.offset.store": "false", "enable.auto.commit": "true"}, log_level: Debug }
[runtime]  | on rs:worker-10
[runtime]  | 2024-11-13T04:24:05.134379Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Creating consumer with topics [
[runtime]  |     "message-start",
[runtime]  | ]
[runtime]  | on rs:worker-10
[runtime]  | 2024-11-13T04:24:05.134529Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Starting listener loop for topics: [
[runtime]  |     "message-start",
[runtime]  | ]
[runtime]  | on rs:worker-10
[runtime]  | 2024-11-13T04:24:05.654514Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Received message
[runtime]  | on rs:worker-0
[runtime]  | 2024-11-13T04:24:05.654551Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Received Message on topic = message-start, partition = 0, offset = 1
[runtime]  | on rs:worker-0
[runtime]  | 2024-11-13T04:24:05.654577Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Received message
[runtime]  | on rs:worker-10
[runtime]  | 2024-11-13T04:24:05.654603Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Received Message on topic = message-start, partition = 0, offset = 1
[runtime]  | on rs:worker-10
[runtime]  | 2024-11-13T04:24:05.654636Z INFO restate_ingress_kafka::consumer_task
[runtime]  |   Processing Kafka ingress request
[runtime]  | on rs:worker-0
[runtime]  |   in restate_ingress_kafka::consumer_task::kafka_ingress_consume
[runtime]  |     otel.name: "kafka_ingress_consume"
[runtime]  |     messaging.system: "kafka"
[runtime]  |     messaging.operation: "receive"
[runtime]  |     messaging.source.name: "message-start"
[runtime]  |     messaging.destination.name: service://message_start_service/Handle

My reading of this is that the subscription is being created the first time but nothing is being sent on it. The second time a subscription is created everything starts flowing correctly. Both runs create new subscriptions, which can be seen in the admin API:

{
  "subscriptions": [
    {
      "id": "sub_12x4St13qmf4QBVUXCEcdjj",
      "source": "kafka://redpanda/message-start",
      "sink": "service://message_start_service/Handle",
      "options": {
        "client.id": "restate",
        "group.id": "sub_12x4St13qmf4QBVUXCEcdjj"
      }
    },
    {
      "id": "sub_14NOwbMfb9oSsTPpLxEYhJn",
      "source": "kafka://redpanda/message-start",
      "sink": "service://message_start_service/Handle",
      "options": {
        "client.id": "restate",
        "group.id": "sub_14NOwbMfb9oSsTPpLxEYhJn"
      }
    }
  ]
}

@slinkydeveloper
Copy link
Contributor

I have another hypothesis, that actually comes from our own Kafka tests: https://github.com/restatedev/sdk-test-suite/blob/main/src/main/kotlin/dev/restate/sdktesting/tests/KafkaIngress.kt#L99

The subscription process is completely asynchronous, after restate accepts the subscription it replies with 200 and then it takes some time before the consumer group is created in Kafka. Due to that, if you send a kafka record before the consumer group starts (essentially you do step 3 of your test before step 2 creates the consumer group). Now consumer groups default configuration is just to start from latest record, rather than from first record, thus you won't see records that were published before the consumer group actually started. Most likely in the second test attempt, you get now the request but from the first subscription, and not the second one you created in another attempt.

Unfortunately the log lines you posted miss some details to verify this hypothesis (I need to fix that on our side), but one thing you can do (and perhaps it's a good practice to do in the tests anyway) is to set auto.offset.reset to earliest (what's shown here https://docs.restate.dev/operate/invocation#managing-kafka-subscriptions)

@Jarrah-libremfg
Copy link
Author

I've just tried a few things to test that hypothesis. Some good news: adding the auto.offset.reset option does make the test work. Sadly it also has the side effect of replaying all messages whenever a subscription is created on an existing topic, which breaks our runtime goal.

The subscription process is completely asynchronous, after restate accepts the subscription it replies with 200 and then it takes some time before the consumer group is created in Kafka. Due to that, if you send a kafka record before the consumer group starts (essentially you do step 3 of your test before step 2 creates the consumer group)

To test this theory I added a 15 second pause between requesting the subscription and sending the message. That did seem to work correctly.

Most likely in the second test attempt, you get now the request but from the first subscription, and not the second one you created in another attempt.

I don't believe this is what is happening. If it were I'd expect that deleting the first subscription then running the test would result in a test failure (no messages received). However if I start everything, run the test twice so it passes, then delete all subscriptions and re-run the test it still passes. It seems to me more like there is a slower startup the first time a new subscription is created and I was hitting that on the cold startup attempt.

Unfortunately the log lines you posted miss some details to verify this hypothesis (I need to fix that on our side)

With your additional logs added:

[runtime]  | 2024-11-14T03:25:59.740571Z DEBUG restate_ingress_kafka::subscription_controller::task_orchestrator
[runtime]  |   Spawning the consumer task for subscription id sub_17Jj4g7zV5F3OChvjyNImRz
[runtime]  | on rs:worker-12
[runtime]  | 2024-11-14T03:25:59.740661Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Starting consumer for topics ["message-start"] with configuration ClientConfig { conf_map: {"client.id": "restate", "enable.auto.offset.store": "false", "metadata.broker.list": "PLAINTEXT://redpanda:29092", "group.id": "sub_17Jj4g7zV5F3OChvjyNImRz", "enable.auto.commit": "true"}, log_level: Debug }
[runtime]  |     restate.subscription.id: sub_17Jj4g7zV5F3OChvjyNImRz
[runtime]  |     messaging.consumer.group.name: "sub_17Jj4g7zV5F3OChvjyNImRz"
[runtime]  | on rs:worker-12
[runtime]  | 2024-11-14T03:25:59.741857Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Creating consumer with topics [
[runtime]  |     "message-start",
[runtime]  | ]
[runtime]  | on rs:worker-12
[runtime]  | 2024-11-14T03:25:59.742573Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Assigned topic/partitions/offset: TPL {}
[runtime]  |     restate.subscription.id: sub_17Jj4g7zV5F3OChvjyNImRz
[runtime]  |     messaging.consumer.group.name: "sub_17Jj4g7zV5F3OChvjyNImRz"
[runtime]  | on rs:worker-12
[runtime]  | 2024-11-14T03:25:59.742598Z DEBUG restate_ingress_kafka::consumer_task
[runtime]  |   Starting listener loop for topics: [
[runtime]  |     "message-start",
[runtime]  | ]
[runtime]  | on rs:worker-12

Thanks for your help on this by the way.

@slinkydeveloper
Copy link
Contributor

Some good news: adding the auto.offset.reset option does make the test work. Sadly it also has the side effect of replaying all messages whenever a subscription is created on an existing topic, which breaks our runtime goal.

Agreed, for production this probably doesn't make sense, but in tests where you have two processes involved the consumer booting is generally always asynchronous, that is true in the "usual" kafka APIs too, so in that case auto.offset.reset is probably legit, or a long sleep, or even better you could "await" in the test the moment the consumer group exists in the kafka admin api.

Not sure if there's anything else actionable in this task now :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working ingress_kafka
Projects
None yet
Development

No branches or pull requests

2 participants