Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1146] Session based implementation of IoTDB sink (fix issues in current JDBC based implementation) #1166

Merged
merged 5 commits into from
Jan 27, 2023

Conversation

SteveYurongSu
Copy link
Member

Purpose

Recently, I was trying to play around streampipes and found that iotdb sink could not work properly.

I checked the source code and found that the current JDBC based implementation of iotdb sink is outdated, especially for the current production versions of iotdb (0.13.x and 1.0.x).

I‘d like to contribute a brand new iotdb sink implementation, which has the following features:

  • Support production ready versions of IoTDB (version >= 0.13.0)
  • Support by iotdb-session API, which can provide higher throughput and lower latency
  • Support schema-less write: no need to create storage group (database) before insertion

See #1146 for more detail :D

Remarks

PR introduces (a) breaking change(s): no

PR introduces (a) deprecation(s): yes
Old versions (< v0.13) of IoTDB are no longer supported.

Tests

I setup a pipeline as the following:
image

Then I checked the data that IoTDB received:
image
image

@bossenti bossenti added this to the 1.0.0 milestone Jan 26, 2023
@bossenti bossenti added bug Something isn't working enhancement New feature or request java Pull requests that update Java code refactoring Indicates when a pull request or issue contains larger refactoring aspects. deprecation Assigned to pull requests or issues that introduce a deprecation. pipeline elements Relates to pipeline elements installer Affects the StreamPipes installer labels Jan 26, 2023
@bossenti
Copy link
Contributor

@SteveYurongSu Thank you so much for this awesome contribution 🙏🏼

@bossenti bossenti requested review from tenthe, bossenti and dominikriemer and removed request for tenthe and bossenti January 26, 2023 18:18
@tenthe
Copy link
Contributor

tenthe commented Jan 27, 2023

Hi @SteveYurongSu, thank you so much for this PR and updating the IoTDB implementation.
I tried it locally and everything worked fine. Thanks for updating the IoTDB version in the CLI.

I have a question regarding the "session".
The session is created in the onInvocation and closed in the onDetach and what happens if the connection dies during the execution of the pipeline. Would it be possible to add a reconnection in the onEvent for this case or is this not necessary?
Maybe this can be done when the "IoTDBConnectionException" occurs.

@SteveYurongSu
Copy link
Member Author

Hi @tenthe, thanks a lot for your reviewing.

IoTDB has a native mechanism to handle the case that the connection dies. I will update the PR with the mechanism ASAP :-)

Hi @SteveYurongSu, thank you so much for this PR and updating the IoTDB implementation.
I tried it locally and everything worked fine. Thanks for updating the IoTDB version in the CLI.

I have a question regarding the "session".
The session is created in the onInvocation and closed in the onDetach and what happens if the connection dies during the execution of the pipeline. Would it be possible to add a reconnection in the onEvent for this case or is this not necessary?
Maybe this can be done when the "IoTDBConnectionException" occurs.

@tenthe
Copy link
Contributor

tenthe commented Jan 27, 2023

Awsome, thanks a lot.
I am excited to see how this will be implemented by IoTDB

@SteveYurongSu
Copy link
Member Author

SteveYurongSu commented Jan 27, 2023

Hi @tenthe, here is the update 😊

In streampipes-extensions/streampipes-sinks-databases-jvm/src/main/java/org/apache/streampipes/sinks/databases/jvm/iotdb/IotDb.java

I implemented the IoTDB sink by SessionPool instead of Session.

About SessionPool:

We provide a connection pool (`SessionPool) for Native API. Using the interface, you need to define the pool size.

If you can not get a session connection in 60 seconds, there is a warning log but the program will hang.

If a session has finished an operation, it will be put back to the pool automatically. If a session connection is broken, the session will be removed automatically and the pool will try to create a new session and redo the operation.

@dominikriemer
Copy link
Member

Thank you @SteveYurongSu !

@tenthe
Copy link
Contributor

tenthe commented Jan 27, 2023

@SteveYurongSu thanks a lot for the super fast update.
I really like the new implementation

@tenthe tenthe merged commit 3abfd88 into apache:dev Jan 27, 2023
@bossenti bossenti modified the milestones: 1.0.0, 0.91.0 Jan 30, 2023
@bossenti bossenti removed this from the 0.91.0 milestone Feb 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working deprecation Assigned to pull requests or issues that introduce a deprecation. enhancement New feature or request installer Affects the StreamPipes installer java Pull requests that update Java code pipeline elements Relates to pipeline elements refactoring Indicates when a pull request or issue contains larger refactoring aspects.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Session based implementation of IoTDB sink (fix issues in current JDBC based implementation)
4 participants