Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to produce to AWS MSK from AWS Lambda #513

Closed
scimas opened this issue Nov 7, 2022 · 8 comments
Closed

Unable to produce to AWS MSK from AWS Lambda #513

scimas opened this issue Nov 7, 2022 · 8 comments

Comments

@scimas
Copy link

scimas commented Nov 7, 2022

I'm trying to produce some messages to a topic in MSK and having issues. I have verified that this is a not a networking issue by creating a python replica of the lambda - with the same VPC and security group configuration; and the python code is able to produce to the topic without any issues.

All I've received from the rdkafka logs is as follows. There are no errors, the lambda just times out after 1 minute. Increasing the timeout does not help. Any suggestion on how to proceed from here would be appreciated.

2022-11-07T20:37:37.920Z TRACE [rdkafka::client] Create new librdkafka client 0x55a7306ea0
2022-11-07T20:37:37.920Z INFO  [rust_testing] sending message
2022-11-07T20:37:37.920Z TRACE [rdkafka::producer::base_producer] Polling thread loop started

The rust code:

use std::time::Duration;
use rdkafka::{producer::{FutureProducer, FutureRecord, Producer}, util::Timeout};
use lambda_runtime::LambdaEvent;

#[tokio::main]
async fn main() -> Result<(), lambda_runtime::Error> {
    simple_logger::init_with_level(log::Level::Trace)?;
    let lambda_fn = lambda_runtime::service_fn(lambda_handler);
    lambda_runtime::run(lambda_fn).await?;
    Ok(())
}

async fn lambda_handler(_payload: LambdaEvent<serde_json::Value>) -> Result<(), lambda_runtime::Error> {
    let topic = "my-topic";
    let bootstrap_servers = "server1:9092,server2:9092".to_string();
    let producer: FutureProducer = rdkafka::config::ClientConfig::new()
        .set("bootstrap.servers", bootstrap_servers)
        .set("security.protocol", "plaintext")
        .create()?;
    let payload = r#"{"key": "myval"}"#.to_string();
    log::info!("sending message");
    match producer.send(FutureRecord::<String, _>::to(&topic).payload(&payload), Timeout::After(Duration::from_secs(15))).await {
        Ok(_) => (),
        Err(e) => println!("error: {e:?}"),
    }
    Ok(())
}

And the python code that works:

import json
from kafka import KafkaProducer

def lambda_handler(event, context):
    topic = "my-topic"
    bootstrap_servers = "server1:9092,server2:9092"
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode("utf-8"))
    data = {"key": "myval"}
    producer.send(topic, data)
    producer.flush()
    producer.close()
@scimas
Copy link
Author

scimas commented Nov 15, 2022

I ran the same test case with the kafka crate and it works. So I'm pretty sure this issue is with rdkafka and/or the configuration of the client. Again, any hint of how to proceed would be nice, since the kafka crate does not seem to be maintained very actively.

@gklijs
Copy link
Contributor

gklijs commented Nov 25, 2022

The result is a future, so you will also need to await that. Like here. Currently, it probably quits before really sending the record.

@scimas
Copy link
Author

scimas commented Nov 25, 2022

The result is not a future, as far as I can tell. FutureProducer.send is a future, which I'm awaiting in the match. FutureProducer.send(...).await returns an OwnedDeliveryResult, which is just a Result, I don't see how I can .await it. And that's why I have the

match producer.send(...).await {
    Ok(_) => (),
    Err(e) => println,
}

block in the first place.

The example you linked is simply wrapping the OwnedDeliveryResult into a Future because of the async move block.

@gklijs
Copy link
Contributor

gklijs commented Nov 26, 2022

Fair enough, you could log the value of the result though, that hopefully points to the right direction.

@scimas
Copy link
Author

scimas commented Nov 29, 2022

There is no result to log, that's the issue. The .await never resolves. Otherwise the function would end either in an error through the Err(e) => println path or the "do nothing" Ok(_) => () path.

@scimas
Copy link
Author

scimas commented Nov 30, 2022

So it is looking likely that this is an issue with librdkafka - AWS MSK / AWS Lambda.

I tried the same test as my original post with python again. The first test I had stated was done with the kafka-python package, which is a python-native implementation of a kafka client. Today I tried a test with the confluent-kafka package which, like rust-rdkakfa, uses librdkafka. This test failed. Same behavior where the call to Producer.flush() never returns.

I will be reporting this at the librdkafka repo, referencing this issue. I'll leave it up to you whether to keep this issue open or not.

I am still at a loss as to what to report other than "it works without librdkafka, but doesn't with it."

@benesch
Copy link
Collaborator

benesch commented Nov 30, 2022

Recommend that you enable debug logs, via config.set("debug", "all"). You should also make sure you have env_logger or such installed, so that the log messages actually get printed to stderr.

@scimas
Copy link
Author

scimas commented Dec 1, 2022

Solved.

Issue was that the broker had min.insync.replicas=2, the replication factor of the topic I was testing with was 1 and librdkafka requires acks=all by default. So it kept failing and silently retrying.

Two possible solutions

  1. Set acks explicitly to be equal to min(replication factor, min.insync.replicas).
  2. Set message.timeout.ms to a value acceptable for you. By default it is 5 min, which is way longer than I was waiting (1 min).

I went with 1 for testing purposes, probably will go for 2 in actual use.

There is still an issue with approach 2. Even though it returns an error after the timeout, it doesn't tell you why the timeout happened, only that it did. You wouldn't be able to tell that the actual issue was with replication and acks configuration without the debug log.

@scimas scimas closed this as completed Dec 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants