Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mongo-connector): support copy.existing in source and topics/topic.override in sink #156

Open
ericsyh opened this issue Dec 20, 2021 · 8 comments

Comments

@ericsyh
Copy link
Member

ericsyh commented Dec 20, 2021

Motivation

Compares to mongodb-kafka-connector, pulsar-kafka-connector is quite simple. So it will be better to add some useful features on pulsar-kafka-connector to enhance:

Pls refer the open source code of mongo-kafka.

@lzqdename
Copy link

Okay, I will read the code of mongo-kafka firstly!

@lzqdename
Copy link

I will read the pulsar io source code next week

@lzqdename
Copy link

lzqdename commented Dec 25, 2021

In pulsar io of mongodb, the offset is not saved , whether this feature should be supported or not in this issue?
@ericsyh

@ericsyh
Copy link
Member Author

ericsyh commented Dec 26, 2021

In pulsar io of mongodb, the offset is not saved , whether this feature should be supported or not in this issue? @ericsyh

@lzqdename Sorry I am not the maintainer of mongo pulsar io, can you give more details about the offset you mentioned that not saving? In source or sink connector? And in which process that not saving?

@lzqdename
Copy link

lzqdename commented Dec 27, 2021

okay,I know, the offset is the position at which the consumer will consume when restarted,
you can see :
com.mongodb.kafka.connect.source.MongoSourceTask.createCursor 【Pls refer the open source code of mongo-kafka.】
in this function, there is a sub function call : getResumeToken(sourceConfig)
in function getResumeToken(sourceConfig), it will read the last consumer offset position
in product environment,we shoudl save the offset of the consumed record to avoid repeated cosume action

okay, I will write a basic implementation of "copy.existing" due to lackness of time
@ericsyh

@ericsyh
Copy link
Member Author

ericsyh commented Dec 27, 2021

@lzqdename Cause Pulsar consumer has no need to save offset since the offset info will be saved by Pulsar broker. For example, Pulsar consumer use a subscription name sub-1 to consume for a while and exit for some reason. This consumer just need to connect to pulsar with the same subscription name sub-1, Pulsar broker will send the next message from the last consumed position to this consumer.

@lzqdename
Copy link

lzqdename commented Dec 27, 2021

consume mongodb,not pulsar,
u can see https://github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

line 397 , getResumeToken(sourceConfig)
the resume token is token of mongdb
@ericsyh

@ericsyh
Copy link
Member Author

ericsyh commented Dec 28, 2021

@lzqdename OK i see, i think there is no need for you to support saving mongodb offset in this task.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants