-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[metrics] Add io-metrics for paimon in Flink #5550
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
1、You add lots of metric, I suggest you sent the discuss email to dev@paimon.apache.org first. |
Roger that. |
@@ -198,6 +198,12 @@ under the License. | |||
<scope>test</scope> | |||
</dependency> | |||
|
|||
<dependency> | |||
<groupId>org.apache.commons</groupId> | |||
<artifactId>commons-math3</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this dep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this dep
I moved some metrics classes from paimon-core to paimon-common project, and the class DescriptiveStatisticsHistogram depends on this.
* | ||
* <p>It allows users to monitor and track file I/O operations (e.g., read, write, delete, rename). | ||
*/ | ||
public class FileIOWrapper implements FileIO { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MetricsFileIO
?
public static final String GROUP_NAME = "source"; | ||
private final MetricGroup metricGroup; | ||
|
||
private final AtomicLong readBytes = new AtomicLong(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only keep:
String READ_BYTES = "read.bytes";
String READ_OPERATIONS = "read.operations";
String WRITE_BYTES = "write.bytes";
String WRITE_OPERATIONS = "write.operations";
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only keep:
String READ_BYTES = "read.bytes"; String READ_OPERATIONS = "read.operations"; String WRITE_BYTES = "write.bytes"; String WRITE_OPERATIONS = "write.operations";
Other metrics have been removed.
@@ -104,7 +110,7 @@ protected AbstractFileStoreTable( | |||
Path path, | |||
TableSchema tableSchema, | |||
CatalogEnvironment catalogEnvironment) { | |||
this.fileIO = fileIO; | |||
this.fileIO = new FileIOWrapper(fileIO); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to introduce an option to enable MetricsFileIO
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
We may need to introduce an option to enable
MetricsFileIO
.
I'v done refactor for making with MetricRegistry public in #5578 |
rebase on this && add uts & all checks passed, cc~ @JingsongLi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please only use TableWrite.withMetricRegistry
and TableRead.withMetricRegistry
, do not introduce any API for metrics.
And please make sure only io metric enabled, the file io can be MetricsFileIO
.
@Override | ||
public SeekableInputStream newInputStream(Path path) throws IOException { | ||
SeekableInputStream inputStream = fileIO.newInputStream(path); | ||
return new SeekableInputStreamIOWrapper(inputStream, this.inputMetrics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider SeekableInputStream
can be VectoredReadable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider
SeekableInputStream
can beVectoredReadable
.
done.
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
/** Collects and monitors outp stream metrics. */ | ||
public class OutputMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please merge these tow classes into one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please merge these tow classes into one.
done.
|
||
/** Collects and monitors outp stream metrics. */ | ||
public class OutputMetrics { | ||
public static final String GROUP_NAME = "sink"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
group name should be io
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
group name should be
io
.
done.
If we only use TableRead.withMetricRegistry, it would require too may modifications. However, in reality, we're always using and modifying the same AbstractFileStoreTable fileio reference. I've attempted to register metrics directly to the AbstractFileStoreTable within the ReadBuilder. This way, all subsequent operations like new scan, new streamscan, or new read can directly reuse the metrics collected by the AbstractFileStoreTable. |
f2e8fe8
to
99e93e4
Compare
20eabb5
to
2b3fd4c
Compare
@JingsongLi Could you please take a look at this PR again for any remaining issues, thank ~ |
Purpose
Tests
API and Format
Documentation