Skip to content

[Question] How to process avro message in S3 connector #843

Closed
@gohanbg

Description

@gohanbg

Hello,

I have been playing around with Strimzi and camel's kafka connectors, in particular the S3 sink connector.

My setup consists of Strimzi kafka and Confluent schema registry, all running in k8s

The KafkaConnect looks like this:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  namespace: messaging
  name: aws-connect
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 2.5.0
  image: my-image
  logging:
    type: inline
    loggers:
      connect.root.logger.level: "INFO"
  replicas: 1
  bootstrapServers: my-cluster-kafka-brokers.messaging.svc:9092
  externalConfiguration:
    volumes:
      - name: kafka-aws-credentials
        secret:
          secretName: kafka-aws-credentials
  config:
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: http://confluent-cp-schema-registry.messaging.svc:8081
    key.converter.schemas.enable: false
    value.converter.schemas.enable: true
  template:
    pod:
      imagePullSecrets:
        - name: docker-registry-secret

the KafkaConnector looks like this:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: s3-sink-connector
  labels:
    strimzi.io/cluster: aws-connect
spec:
  class: org.apache.camel.kafkaconnector.awss3.CamelAwss3SinkConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: http://confluent-cp-schema-registry.messaging.svc:8081
    topics: topic-to-export
    camel.sink.path.bucketNameOrArn: s3-bucket-name
    camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
    camel.sink.maxPollDuration: 10000
    camel.component.aws-s3.configuration.autocloseBody: false
    camel.component.aws-s3.accessKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_access_key_id}
    camel.component.aws-s3.secretKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_secret_access_key}
    camel.component.aws-s3.region: S3_REGION

When I run the Connector it starts reading the topic correctly, but just before sending it to S3 it says the following:

org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: org.apache.kafka.connect.data.Struct to the required type: java.io.InputStream

I can see that the Struct message contains the correct data from the logs, but it can not be converted to InputStream.
I just want to have the message as a JSON in S3 at the end. Not any special transformations.

Here is how my KafkaConnect image looks like

FROM strimzi/kafka:0.17.0-kafka-2.4.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel-aws-s3
COPY ./camel-aws-s3/* /opt/kafka/plugins/camel-aws-s3/
USER 1001

and the camel-aws-s3 directory, from which I'm copying looks like this:

annotations-13.0.jar                                        camel-core-engine-3.7.0.jar      commons-compress-1.20.jar           jackson-mapper-asl-1.9.13.jar             medeia-validator-core-1.1.1.jar
apicurio-registry-common-1.3.1.Final.jar                    camel-core-languages-3.7.0.jar   commons-logging-1.2.jar             jboss-jaxrs-api_2.1_spec-2.0.1.Final.jar  medeia-validator-jackson-1.1.1.jar
apicurio-registry-distro-connect-converter-1.3.0.Final.jar  camel-core-model-3.7.0.jar       common-utils-5.5.0.jar              jmespath-java-1.11.714.jar                NOTICE.txt
apicurio-registry-rest-client-1.3.1.Final.jar               camel-core-processor-3.7.0.jar   connect-json-2.6.0.jar              joda-time-2.8.1.jar                       okhttp-3.14.9.jar
apicurio-registry-utils-converter-1.3.1.Final.jar           camel-core-reifier-3.7.0.jar     converter-jackson-2.9.0.jar         kafka-avro-serializer-5.5.0.jar           okio-1.17.2.jar
apicurio-registry-utils-serde-1.3.1.Final.jar               camel-direct-3.7.0.jar           httpclient-4.5.13.jar               kafka-clients-2.6.0.jar                   protobuf-java-3.13.0.jar
avro-1.10.0.jar                                             camel-jackson-3.7.0.jar          httpcore-4.4.14.jar                 kafka-connect-avro-converter-5.5.0.jar    README.adoc
aws-java-sdk-core-1.11.714.jar                              camel-kafka-3.7.0.jar            ion-java-1.0.2.jar                  kafka-connect-avro-data-5.5.0.jar         retrofit-2.9.0.jar
aws-java-sdk-kms-1.11.714.jar                               camel-kafka-connector-0.7.0.jar  jackson-annotations-2.11.3.jar      kafka-schema-registry-client-5.5.0.jar    slf4j-api-1.7.30.jar
aws-java-sdk-s3-1.11.714.jar                                camel-main-3.7.0.jar             jackson-core-2.11.3.jar             kafka-schema-serializer-5.5.0.jar         snappy-java-1.1.7.3.jar
camel-api-3.7.0.jar                                         camel-management-api-3.7.0.jar   jackson-core-asl-1.9.13.jar         kotlin-reflect-1.3.20.jar                 zstd-jni-1.4.4-7.jar
camel-aws-s3-3.7.0.jar                                      camel-support-3.7.0.jar          jackson-databind-2.11.3.jar         kotlin-stdlib-1.3.20.jar
camel-aws-s3-kafka-connector-0.7.0.jar                      camel-util-3.7.0.jar             jackson-dataformat-avro-2.11.3.jar  kotlin-stdlib-common-1.3.20.jar
camel-base-3.7.0.jar                                        common-config-5.5.0.jar          jackson-dataformat-cbor-2.11.3.jar  LICENSE.txt
camel-base-engine-3.7.0.jar                                 commons-codec-1.15.jar           jackson-datatype-jdk8-2.10.2.jar    lz4-java-1.7.1.jar

Basically, it contains unzipped camel-aws-s3-kafka-connector plus the confluent's libraries for reading from Avro schema registry. I was following this example to get confluent's libraries

Do you have any suggestions on how I can proceed

PS: If I don't use the io.confluent.connect.avro.AvroConverter for the values, but rather use org.apache.kafka.connect.storage.StringConverter it all works great and reaches S3 with no issues. The problem is that the message does not look great

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions