Description
I am trying to use kafka-connect to transfer files from bucketA to bucketB using the aws2-s3 source and sink. The source appears to be working but the sink does not seem to be writing into bucketB. After attaching a console-consumer to the topic I can see the message continuously being readded to the topic, as if the sink is writing into the topic instead of writing it into S3.
After further investigating and increasing the log levels of both camel and software.amazon.amazonsdk, I saw that it's writing to the source bucket instead of the target bucket. So what's happening is this:
bucketA -> camelSource -> kafkatopic -> camelSink -> bucketA
but should be:
bucketA -> camelSource -> kafkatopic -> camelSink -> bucketB
I can also confirm it's writing to bucket A by constantly refreshing the console. I can see the file disappear (the source picking it up) and the sink writing it back into the source bucket (the file appearing again in the source bucket).
After some further Googling I came across this page which basically says to remove headers from the original request: https://camel.apache.org/components/latest/aws2-s3-component.html#_moving_stuff_between_a_bucket_and_another_bucket
I guess this is not happening in the kafka-connector and causing this behavior?
For reference here are my source and sink configs in json format to POST them to the KafkaConnect REST API. There's a few parameters that are still there just to try out and see if they had any effect (what does the pojo request actually mean from the connector's perspective?) but I don't think they did.
(Also the org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter doesn't work if you don't set camel.component.aws2-s3.includeBody and camel.component.aws2-s3.autocloseBody to false, but I guess that's for another issue)
{
"name": "testsource",
"config":
{
"connector.class": "org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector",
"tasks.max": 1,
"topics": "test-topic",
"camel.source.path.bucketNameOrArn":"bucketA",
"camel.source.endpoint.delay": "5000",
"camel.component.aws2-s3.autoCreateBucket": "false",
"camel.component.aws2-s3.region": "eu-central-1",
"camel.component.aws2-s3.useIAMCredentials": "true",
"camel.component.aws2-s3.includeBody": "false",
"camel.component.aws2-s3.autocloseBody": "false",
"camel.component.aws2-s3.pojoRequest": "true",
"camel.source.endpoint.prefix": "sourcefolder/upload",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.camel.kafkaconnector.aws2s3.converters.S3ObjectConverter"
},
}
{
"name": "testsink",
"config":
{
"connector.class": "org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector",
"topics": "test-topic",
"camel.sink.path.bucketNameOrArn": "bucketB",
"camel.component.aws2-s3.autoCreateBucket": "false",
"camel.component.aws2-s3.region": "eu-central-1",
"camel.component.aws2-s3.useIAMCredentials": "true",
"camel.component.aws2-s3.pojoRequest": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
},
}