Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Apache Beam FlinkRunner does not process Kafka messages #32709

Open
2 of 17 tasks
Jerryporter opened this issue Oct 9, 2024 · 1 comment
Open
2 of 17 tasks

[Bug]: Apache Beam FlinkRunner does not process Kafka messages #32709

Jerryporter opened this issue Oct 9, 2024 · 1 comment

Comments

@Jerryporter
Copy link

What happened?

I am working on a pipeline that integrates Apache Beam with Kafka, using Flink as the runner for the pipeline. The pipeline works fine for small, bounded datasets (e.g., the WordCount example), but when I switch to Kafka to process unbounded streams, no output is generated. Despite the pipeline executing without obvious errors, it fails to process Kafka messages.

QUESTION:

  • Is there a specific configuration I need to ensure Beam's Kafka integration works with Flink?
  • Do I need to handle the unbounded stream differently, such as with specific Window or Trigger functions, to ensure each Kafka message is processed immediately?
  • What additional debugging steps should I take to determine why Kafka messages are not being consumed?
  • Could you please provide an example of Kafka working correctly with Flink?

Kafka is running inside Docker, and the Flink cluster is started locally using start-cluster.sh. The pipeline runs successfully, but the Kafka messages are not consumed or processed as expected. Below is the relevant code and additional details.

class ParseKafkaMessage(beam.DoFn):
    def process(self, kafka_message):
        image_path = kafka_message[1].decode('utf-8')
        yield {"image_path": image_path}


def map_message_image_path(result):
    return result['image_path']


def print_message(record):
    print(f"Received message: {record.decode('utf8')}")
    return record 

def run(argv=None, save_main_session=True):
    """
    Args:
      argv: Command line arguments defined for this example.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--bootstrap_servers', dest='bootstrap_servers', required=True,
                         help='Kafka bootstrap servers.')
     parser.add_argument('--topic', dest='topic', required=True,
                      help='Kafka topic to consume from.')
     known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions()
    logging.getLogger().setLevel(logging.INFO)

    pipeline_options.view_as(
        SetupOptions).save_main_session = save_main_session
    print('start!')


    CONSUMER_CONFIG = {
        "bootstrap.servers": "localhost:9092",
        # "auto.offset.reset": "earliest",
        'group.id': 'temp_group'
    }

    CONSUMER_TOPICS = ["test"]

    pipeline_options = beam.options.pipeline_options.PipelineOptions([
        "--runner=FlinkRunner",
         "--flink_master=localhost:8081",
         "--environment_type=LOOPBACK",
    ]
    )

    with beam.Pipeline(options=pipeline_options) as pipeline:
        consumed = (
            pipeline
            | ReadFromKafka(
                consumer_config=CONSUMER_CONFIG,
                topics=CONSUMER_TOPICS,
                # max_num_records=3,
                # max_read_time=60,
            )
            # | beam.Map(print))
            # | beam.Map(lambda record: convert_kafka_record_to_dictionary(record)))
            | beam.Map(lambda record: print_message(record)))

if __name__ == '__main__':
    run("--output /home/root/beam-llm/kafka_output.txt --runner FlinkRunner".split())

Steps to reproduce:

  1. Set up a local Flink cluster (start-cluster.sh) and a Kafka broker running in Docker.
  2. Create a Kafka topic (test) and publish some messages containing image paths.
  3. Implement the following Beam pipeline to consume and process messages from Kafka:

Expected behavior:

The Kafka messages should be consumed by the Beam pipeline, and the print_message() function should output the messages to the console.

Environment:
Apache Beam version: [2.59.0]
Flink version: [1.18.1]
Kafka version: [docker:last]
Runner: FlinkRunner

Actual behavior:

The pipeline starts without any errors. The Flink job is successfully submitted, but the Kafka messages are not consumed or processed, and no output is generated. I have verified that Kafka is working correctly and that messages are available in the topic. However, the Beam pipeline does not consume the messages as expected.

What I have tried:

  • Kafka Consumer Testing: Kafka works as expected. I can produce and consume messages using standalone Kafka utilities (kafka-console-producer, kafka-console-consumer).
  • Flink Job Testing: The WordCount example runs successfully with FlinkRunner.
  • Print Statements: Added print_message() to print the Kafka messages, but nothing is printed.

Log Files:

INFO:apache_beam.utils.subprocess_server:Using cached job server jar from https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.59.0/beam-sdks-java-io-expansion-service-2.59.0.jar
INFO:root:Starting a JAR-based expansion service from JAR /home/chy/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.59.0.jar 
INFO:apache_beam.utils.subprocess_server:Starting service with ['java' '-jar' '/home/chy/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.59.0.jar' '51799' '--filesToStage=/home/chy/.apache_beam/cache/jars/beam-sdks-java-io-expansion-service-2.59.0.jar' '--alsoStartLoopbackWorker']
INFO:apache_beam.utils.subprocess_server:WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
......
INFO:apache_beam.utils.subprocess_server:INFO: Source: Impulse (1/1) (359af0cc3ed2f4c625e558bebd61c64b_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FINISHED.
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 0
INFO:apache_beam.runners.worker.sdk_worker:Creating insecure control channel for localhost:43293.
INFO:apache_beam.runners.worker.sdk_worker:Control channel established.
INFO:apache_beam.runners.worker.sdk_worker:Initializing SDKHarness with unbounded number of workers.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService control
INFO:apache_beam.utils.subprocess_server:INFO: Beam Fn Control client connected with id 1-2
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder restoreState
INFO:apache_beam.utils.subprocess_server:INFO: Finished to build heap keyed state-backend.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder restoreState
INFO:apache_beam.utils.subprocess_server:INFO: Finished to build heap keyed state-backend.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder restoreState
INFO:apache_beam.utils.subprocess_server:INFO: Finished to build heap keyed state-backend.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder restoreState
INFO:apache_beam.utils.subprocess_server:INFO: Finished to build heap keyed state-backend.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackend <init>
INFO:apache_beam.utils.subprocess_server:INFO: Initializing heap keyed state backend with stream factory.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackend <init>
INFO:apache_beam.utils.subprocess_server:INFO: Initializing heap keyed state backend with stream factory.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackend <init>
INFO:apache_beam.utils.subprocess_server:INFO: Initializing heap keyed state backend with stream factory.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:25 AM org.apache.flink.runtime.state.heap.HeapKeyedStateBackend <init>
INFO:apache_beam.utils.subprocess_server:INFO: Initializing heap keyed state backend with stream factory.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:43 AM org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory createEnvironment
INFO:apache_beam.utils.subprocess_server:INFO: Still waiting for startup of environment apache/beam_java21_sdk:2.59.0 for worker id 1-1
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:48 AM org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory createEnvironment
INFO:apache_beam.utils.subprocess_server:INFO: Still waiting for startup of environment apache/beam_java21_sdk:2.59.0 for worker id 1-1
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:54 AM org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory createEnvironment
INFO:apache_beam.utils.subprocess_server:INFO: Still waiting for startup of environment apache/beam_java21_sdk:2.59.0 for worker id 1-1
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:30:59 AM 
....
INFO:apache_beam.utils.subprocess_server:SEVERE: Docker container a969a3cff5433b0137518e12fa0b197d3390b381d9f66292373dc3ae72c889d5 logs:
INFO:apache_beam.utils.subprocess_server:2024/10/08 03:32:38 Failed to obtain provisioning information: failed to dial server at localhost:34507
INFO:apache_beam.utils.subprocess_server:	caused by:
INFO:apache_beam.utils.subprocess_server:context deadline exceeded
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.taskmanager.Task transitionState
INFO:apache_beam.utils.subprocess_server:WARNING: [3]ReadFromKafka(beam:transform:org.apache.beam:kafka_read_without_metadata:v1)/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/4)#0 (359af0cc3ed2f4c625e558bebd61c64b_24761c05670fa5069ce6a1b3d4c931eb_0_0) switched from INITIALIZING to FAILED with failure cause:
INFO:apache_beam.utils.subprocess_server:org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: No container running for id a969a3cff5433b0137518e12fa0b197d3390b381d9f66292373dc3ae72c889d5
....
INFO:apache_beam.utils.subprocess_server:INFO: Stopping Pekko RPC service.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.pekko.event.slf4j.Slf4jLogger$$anonfun$receive$1 $anonfun$applyOrElse$3
INFO:apache_beam.utils.subprocess_server:INFO: Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.rpc.pekko.PekkoRpcService closeAsync
INFO:apache_beam.utils.subprocess_server:INFO: Stopping Pekko RPC service.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.rpc.pekko.PekkoRpcService lambda$closeAsync$7
INFO:apache_beam.utils.subprocess_server:INFO: Stopped Pekko RPC service.
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.pekko.event.slf4j.Slf4jLogger$$anonfun$receive$1 $anonfun$applyOrElse$3
INFO:apache_beam.utils.subprocess_server:INFO: Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.blob.AbstractBlobCache close
INFO:apache_beam.utils.subprocess_server:INFO: Shutting down BLOB cache
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.blob.AbstractBlobCache close
INFO:apache_beam.utils.subprocess_server:INFO: Shutting down BLOB cache
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.blob.BlobServer close
INFO:apache_beam.utils.subprocess_server:INFO: Stopped BLOB server at 0.0.0.0:43151
INFO:apache_beam.utils.subprocess_server:Oct 08, 2024 11:32:39 AM org.apache.flink.runtime.rpc.pekko.PekkoRpcService lambda$closeAsync$7
INFO:apache_beam.utils.subprocess_server:INFO: Stopped Pekko RPC service.
start!
Traceback (most recent call last):
  File "/home/chy/beam-llm/kafka-beam.py", line 196, in <module>
    run()
  File "/home/chy/beam-llm/kafka-beam.py", line 176, in run
    with beam.Pipeline(options=pipeline_options) as pipeline:
  File "/home/chy/beam-llm/beam/lib/python3.10/site-packages/apache_beam/pipeline.py", line 621, in __exit__
    self.result.wait_until_finish()
  File "/home/chy/beam-llm/beam/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", line 568, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline BeamApp-chy-1008033013-53c548ce_c23a0db3-f61e-41fb-b989-f867c0923852 failed in state FAILED: java.lang.IllegalStateException: No container running for id a969a3cff5433b0137518e12fa0b197d3390b381d9f66292373dc3ae72c889d5

It seems that the Docker container did not start, but in fact, when checking with docker ps, it shows that Flink has started a container and it is running. However, it appears that this container did not execute the task correctly.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@liferoad
Copy link
Collaborator

Do you see the error mentioned in #32743?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants