Skip to content

[metrics] Add io-metrics for paimon in Flink #5550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

zhuanshenbsj1
Copy link
Contributor

@zhuanshenbsj1 zhuanshenbsj1 commented Apr 28, 2025

Purpose

  1. Monitor the input/output (IO) of Flink during read and write operations.

Tests

API and Format

Documentation

@wwj6591812
Copy link
Contributor

1、You add lots of metric, I suggest you sent the discuss email to dev@paimon.apache.org first.
2、modify the doc.

@zhuanshenbsj1
Copy link
Contributor Author

1、You add lots of metric, I suggest you sent the discuss email to dev@paimon.apache.org first. 2、modify the doc.

Roger that.

@@ -198,6 +198,12 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this dep

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this dep

I moved some metrics classes from paimon-core to paimon-common project, and the class DescriptiveStatisticsHistogram depends on this.

*
* <p>It allows users to monitor and track file I/O operations (e.g., read, write, delete, rename).
*/
public class FileIOWrapper implements FileIO {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetricsFileIO?

public static final String GROUP_NAME = "source";
private final MetricGroup metricGroup;

private final AtomicLong readBytes = new AtomicLong(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only keep:

  String READ_BYTES = "read.bytes";
  String READ_OPERATIONS = "read.operations";
  String WRITE_BYTES = "write.bytes";
  String WRITE_OPERATIONS = "write.operations";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only keep:

  String READ_BYTES = "read.bytes";
  String READ_OPERATIONS = "read.operations";
  String WRITE_BYTES = "write.bytes";
  String WRITE_OPERATIONS = "write.operations";

Other metrics have been removed.

@@ -104,7 +110,7 @@ protected AbstractFileStoreTable(
Path path,
TableSchema tableSchema,
CatalogEnvironment catalogEnvironment) {
this.fileIO = fileIO;
this.fileIO = new FileIOWrapper(fileIO);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may need to introduce an option to enable MetricsFileIO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

We may need to introduce an option to enable MetricsFileIO.

@JingsongLi
Copy link
Contributor

I'v done refactor for making with MetricRegistry public in #5578

@zhuanshenbsj1
Copy link
Contributor Author

I'v done refactor for making with MetricRegistry public in #5578

rebase on this && add uts & all checks passed, cc~ @JingsongLi

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please only use TableWrite.withMetricRegistry and TableRead.withMetricRegistry, do not introduce any API for metrics.

And please make sure only io metric enabled, the file io can be MetricsFileIO.

@Override
public SeekableInputStream newInputStream(Path path) throws IOException {
SeekableInputStream inputStream = fileIO.newInputStream(path);
return new SeekableInputStreamIOWrapper(inputStream, this.inputMetrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider SeekableInputStream can be VectoredReadable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider SeekableInputStream can be VectoredReadable.

done.

import java.util.concurrent.atomic.AtomicLong;

/** Collects and monitors outp stream metrics. */
public class OutputMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please merge these tow classes into one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please merge these tow classes into one.

done.


/** Collects and monitors outp stream metrics. */
public class OutputMetrics {
public static final String GROUP_NAME = "sink";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group name should be io.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group name should be io.

done.

@zhuanshenbsj1
Copy link
Contributor Author

Please only use TableWrite.withMetricRegistry and TableRead.withMetricRegistry, do not introduce any API for metrics.

And please make sure only io metric enabled, the file io can be MetricsFileIO.

If we only use TableRead.withMetricRegistry, it would require too may modifications. However, in reality, we're always using and modifying the same AbstractFileStoreTable fileio reference. I've attempted to register metrics directly to the AbstractFileStoreTable within the ReadBuilder. This way, all subsequent operations like new scan, new streamscan, or new read can directly reuse the metrics collected by the AbstractFileStoreTable.

@zhuanshenbsj1 zhuanshenbsj1 force-pushed the metrics branch 4 times, most recently from f2e8fe8 to 99e93e4 Compare May 15, 2025 09:32
@zhuanshenbsj1 zhuanshenbsj1 requested a review from JingsongLi May 16, 2025 01:41
@zhuanshenbsj1 zhuanshenbsj1 force-pushed the metrics branch 4 times, most recently from 20eabb5 to 2b3fd4c Compare May 21, 2025 02:56
@zhuanshenbsj1
Copy link
Contributor Author

@JingsongLi Could you please take a look at this PR again for any remaining issues, thank ~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants