Skip to content

From S3 to kafka, when using Idempotent filter, Do we still fetch S3 file content ? #314

Closed
@akhileshacc

Description

@akhileshacc

Hi,

I am using camel's aws and kafka components.

from("aws-s3://checkoutk1?autocloseBody=true&deleteAfterRead=false&args=....")
                .idempotentConsumer(header("CamelAwsS3ETag"),
                        FileIdempotentRepository.fileIdempotentRepository(new File("target/checkoutk1.data"), 250, 512000))                
                .to("kafka:test1?brokers=localhost:9092")

I have added 1 text file in S3. It works well and adds file content to kafka only once.
But I see these logs, seems like S3 object is constantly fetched, and only filtered before adding to kafka.

Am i correct in understanding this behaviour ?

If yes, Is there a work around so the file is not fetched.

Logs

If you check text file CamelAwsS3ContentLength, its always CamelAwsS3ContentLength=88.

020-07-03 14:41:13,793 [s3.Basic.main()] INFO  AppInfoParser                  - Kafka version: 2.5.0
2020-07-03 14:41:13,793 [s3.Basic.main()] INFO  AppInfoParser                  - Kafka commitId: 66563e712b0b9f84
2020-07-03 14:41:13,793 [s3.Basic.main()] INFO  AppInfoParser                  - Kafka startTimeMs: 1593767473790
2020-07-03 14:41:13,806 [s3.Basic.main()] INFO  InternalRouteStartupManager    - Route: route1 started and consuming from: aws-s3://checkoutk1
2020-07-03 14:41:13,806 [s3.Basic.main()] INFO  AbstractCamelContext           - Total 1 routes, of which 1 are started
2020-07-03 14:41:13,807 [s3.Basic.main()] INFO  AbstractCamelContext           - Apache Camel 3.5.0-SNAPSHOT (camel-1) started in 1.174 seconds
2020-07-03 14:41:14,037 [ad | producer-1] INFO  Metadata                       - [Producer clientId=producer-1] Cluster ID: 2NaFZgsTQg-4L3vQzwmlkQ
2020-07-03 14:41:15,004 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:15,005 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/
2020-07-03 14:41:15,012 [s3://checkoutk1] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=0, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=application/x-directory, CamelAwsS3ETag=d41d8cd98f00b204e9800998ecf8427e, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=0, Content-Type=application/x-directory, ETag=d41d8cd98f00b204e9800998ecf8427e, Last-Modified=Thu Jul 02 16:30:15 IST 2020}, CamelAwsS3Key=data/, CamelAwsS3LastModified=Thu Jul 02 16:30:15 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}
2020-07-03 14:41:15,015 [s3://checkoutk1] INFO  route1                         - This is new message
2020-07-03 14:41:15,050 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:15,050 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/text2.txt
2020-07-03 14:41:15,051 [s3://checkoutk1] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=88, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=text/plain, CamelAwsS3ETag=f638e3b03d75d5d1d11984585c23bb8d, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=88, Content-Type=text/plain, ETag=f638e3b03d75d5d1d11984585c23bb8d, Last-Modified=Fri Jul 03 14:28:25 IST 2020}, CamelAwsS3Key=data/text2.txt, CamelAwsS3LastModified=Fri Jul 03 14:28:25 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}
2020-07-03 14:41:15,052 [s3://checkoutk1] INFO  route1                         - This is new message
2020-07-03 14:41:15,068 [Producer[test1]] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=0, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=application/x-directory, CamelAwsS3ETag=d41d8cd98f00b204e9800998ecf8427e, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=0, Content-Type=application/x-directory, ETag=d41d8cd98f00b204e9800998ecf8427e, Last-Modified=Thu Jul 02 16:30:15 IST 2020}, CamelAwsS3Key=data/, CamelAwsS3LastModified=Thu Jul 02 16:30:15 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null, org.apache.kafka.clients.producer.RecordMetadata=[test1-0@93]}
2020-07-03 14:41:15,069 [Producer[test1]] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=88, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=text/plain, CamelAwsS3ETag=f638e3b03d75d5d1d11984585c23bb8d, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=88, Content-Type=text/plain, ETag=f638e3b03d75d5d1d11984585c23bb8d, Last-Modified=Fri Jul 03 14:28:25 IST 2020}, CamelAwsS3Key=data/text2.txt, CamelAwsS3LastModified=Fri Jul 03 14:28:25 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null, org.apache.kafka.clients.producer.RecordMetadata=[test1-0@94]}
2020-07-03 14:41:20,208 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:20,209 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/
2020-07-03 14:41:20,209 [s3://checkoutk1] INFO  route1 - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=0, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=application/x-directory, CamelAwsS3ETag=d41d8cd98f00b204e9800998ecf8427e, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=0, Content-Type=application/x-directory, ETag=d41d8cd98f00b204e9800998ecf8427e, Last-Modified=Thu Jul 02 16:30:15 IST 2020}, CamelAwsS3Key=data/, CamelAwsS3LastModified=Thu Jul 02 16:30:15 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}
2020-07-03 14:41:20,210 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:20,210 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/text2.txt
2020-07-03 14:41:20,210 [s3://checkoutk1] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=88, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=text/plain, CamelAwsS3ETag=f638e3b03d75d5d1d11984585c23bb8d, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=88, Content-Type=text/plain, ETag=f638e3b03d75d5d1d11984585c23bb8d, Last-Modified=Fri Jul 03 14:28:25 IST 2020}, CamelAwsS3Key=data/text2.txt, CamelAwsS3LastModified=Fri Jul 03 14:28:25 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}
Jul 03, 2020 2:41:20 PM com.amazonaws.services.s3.internal.S3AbortableInputStream close
WARNING: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
2020-07-03 14:41:25,519 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:25,519 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/
2020-07-03 14:41:25,520 [s3://checkoutk1] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=0, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=application/x-directory, CamelAwsS3ETag=d41d8cd98f00b204e9800998ecf8427e, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=0, Content-Type=application/x-directory, ETag=d41d8cd98f00b204e9800998ecf8427e, Last-Modified=Thu Jul 02 16:30:15 IST 2020}, CamelAwsS3Key=data/, CamelAwsS3LastModified=Thu Jul 02 16:30:15 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}
2020-07-03 14:41:25,520 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:25,520 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/text2.txt
2020-07-03 14:41:25,521 [s3://checkoutk1] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=88, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=text/plain, CamelAwsS3ETag=f638e3b03d75d5d1d11984585c23bb8d, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=88, Content-Type=text/plain, ETag=f638e3b03d75d5d1d11984585c23bb8d, Last-Modified=Fri Jul 03 14:28:25 IST 2020}, CamelAwsS3Key=data/text2.txt, CamelAwsS3LastModified=Fri Jul 03 14:28:25 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}
Jul 03, 2020 2:41:25 PM com.amazonaws.services.s3.internal.S3AbortableInputStream close
WARNING: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
2020-07-03 14:41:30,770 [s3://checkoutk1] INFO  consuming                      - Consumer Fired!
2020-07-03 14:41:30,771 [s3://checkoutk1] INFO  route1                         - Replay Message Sent to file:s3out data/
2020-07-03 14:41:30,771 [s3://checkoutk1] INFO  route1                         - {CamelAwsS3BucketName=checkoutk1, CamelAwsS3ContentControl=null, CamelAwsS3ContentDisposition=null, CamelAwsS3ContentEncoding=null, CamelAwsS3ContentLength=0, CamelAwsS3ContentMD5=null, CamelAwsS3ContentType=application/x-directory, CamelAwsS3ETag=d41d8cd98f00b204e9800998ecf8427e, CamelAwsS3ExpirationTime=null, CamelAwsS3Headers={Accept-Ranges=bytes, Content-Length=0, Content-Type=application/x-directory, ETag=d41d8cd98f00b204e9800998ecf8427e, Last-Modified=Thu Jul 02 16:30:15 IST 2020}, CamelAwsS3Key=data/, CamelAwsS3LastModified=Thu Jul 02 16:30:15 IST 2020, CamelAwsS3ReplicationStatus=null, CamelAwsS3ServerSideEncryption=null, CamelAwsS3StorageClass=null, CamelAwsS3UserMetadata={}, CamelAwsS3VersionId=null}   

...
...
It keeps on going like above

I didnt sent it to mailing list, since i got no reply last time.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions