Description
Hello,
I have been playing around with Strimzi and camel's kafka connectors, in particular the S3 sink connector.
My setup consists of Strimzi kafka and Confluent schema registry, all running in k8s
The KafkaConnect looks like this:
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
namespace: messaging
name: aws-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 2.5.0
image: my-image
logging:
type: inline
loggers:
connect.root.logger.level: "INFO"
replicas: 1
bootstrapServers: my-cluster-kafka-brokers.messaging.svc:9092
externalConfiguration:
volumes:
- name: kafka-aws-credentials
secret:
secretName: kafka-aws-credentials
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://confluent-cp-schema-registry.messaging.svc:8081
key.converter.schemas.enable: false
value.converter.schemas.enable: true
template:
pod:
imagePullSecrets:
- name: docker-registry-secret
the KafkaConnector looks like this:
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: s3-sink-connector
labels:
strimzi.io/cluster: aws-connect
spec:
class: org.apache.camel.kafkaconnector.awss3.CamelAwss3SinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://confluent-cp-schema-registry.messaging.svc:8081
topics: topic-to-export
camel.sink.path.bucketNameOrArn: s3-bucket-name
camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
camel.sink.maxPollDuration: 10000
camel.component.aws-s3.configuration.autocloseBody: false
camel.component.aws-s3.accessKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_access_key_id}
camel.component.aws-s3.secretKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_secret_access_key}
camel.component.aws-s3.region: S3_REGION
When I run the Connector it starts reading the topic correctly, but just before sending it to S3 it says the following:
org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: org.apache.kafka.connect.data.Struct to the required type: java.io.InputStream
I can see that the Struct message contains the correct data from the logs, but it can not be converted to InputStream.
I just want to have the message as a JSON in S3 at the end. Not any special transformations.
Here is how my KafkaConnect image looks like
FROM strimzi/kafka:0.17.0-kafka-2.4.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel-aws-s3
COPY ./camel-aws-s3/* /opt/kafka/plugins/camel-aws-s3/
USER 1001
and the camel-aws-s3 directory, from which I'm copying looks like this:
annotations-13.0.jar camel-core-engine-3.7.0.jar commons-compress-1.20.jar jackson-mapper-asl-1.9.13.jar medeia-validator-core-1.1.1.jar
apicurio-registry-common-1.3.1.Final.jar camel-core-languages-3.7.0.jar commons-logging-1.2.jar jboss-jaxrs-api_2.1_spec-2.0.1.Final.jar medeia-validator-jackson-1.1.1.jar
apicurio-registry-distro-connect-converter-1.3.0.Final.jar camel-core-model-3.7.0.jar common-utils-5.5.0.jar jmespath-java-1.11.714.jar NOTICE.txt
apicurio-registry-rest-client-1.3.1.Final.jar camel-core-processor-3.7.0.jar connect-json-2.6.0.jar joda-time-2.8.1.jar okhttp-3.14.9.jar
apicurio-registry-utils-converter-1.3.1.Final.jar camel-core-reifier-3.7.0.jar converter-jackson-2.9.0.jar kafka-avro-serializer-5.5.0.jar okio-1.17.2.jar
apicurio-registry-utils-serde-1.3.1.Final.jar camel-direct-3.7.0.jar httpclient-4.5.13.jar kafka-clients-2.6.0.jar protobuf-java-3.13.0.jar
avro-1.10.0.jar camel-jackson-3.7.0.jar httpcore-4.4.14.jar kafka-connect-avro-converter-5.5.0.jar README.adoc
aws-java-sdk-core-1.11.714.jar camel-kafka-3.7.0.jar ion-java-1.0.2.jar kafka-connect-avro-data-5.5.0.jar retrofit-2.9.0.jar
aws-java-sdk-kms-1.11.714.jar camel-kafka-connector-0.7.0.jar jackson-annotations-2.11.3.jar kafka-schema-registry-client-5.5.0.jar slf4j-api-1.7.30.jar
aws-java-sdk-s3-1.11.714.jar camel-main-3.7.0.jar jackson-core-2.11.3.jar kafka-schema-serializer-5.5.0.jar snappy-java-1.1.7.3.jar
camel-api-3.7.0.jar camel-management-api-3.7.0.jar jackson-core-asl-1.9.13.jar kotlin-reflect-1.3.20.jar zstd-jni-1.4.4-7.jar
camel-aws-s3-3.7.0.jar camel-support-3.7.0.jar jackson-databind-2.11.3.jar kotlin-stdlib-1.3.20.jar
camel-aws-s3-kafka-connector-0.7.0.jar camel-util-3.7.0.jar jackson-dataformat-avro-2.11.3.jar kotlin-stdlib-common-1.3.20.jar
camel-base-3.7.0.jar common-config-5.5.0.jar jackson-dataformat-cbor-2.11.3.jar LICENSE.txt
camel-base-engine-3.7.0.jar commons-codec-1.15.jar jackson-datatype-jdk8-2.10.2.jar lz4-java-1.7.1.jar
Basically, it contains unzipped camel-aws-s3-kafka-connector plus the confluent's libraries for reading from Avro schema registry. I was following this example to get confluent's libraries
Do you have any suggestions on how I can proceed
PS: If I don't use the io.confluent.connect.avro.AvroConverter for the values, but rather use org.apache.kafka.connect.storage.StringConverter it all works great and reaches S3 with no issues. The problem is that the message does not look great