Skip to content

Commit 15c4117

Browse files
committed
update metrics
1 parent f9263a6 commit 15c4117

File tree

10 files changed

+70
-96
lines changed

10 files changed

+70
-96
lines changed

paimon-common/src/main/java/org/apache/paimon/fs/MetricsFileIO.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020

2121
import org.apache.paimon.annotation.VisibleForTesting;
2222
import org.apache.paimon.catalog.CatalogContext;
23-
import org.apache.paimon.fs.metrics.InputMetrics;
24-
import org.apache.paimon.fs.metrics.OutputMetrics;
23+
import org.apache.paimon.fs.metrics.IOMetrics;
2524

2625
import java.io.IOException;
2726

@@ -33,16 +32,14 @@
3332
public class MetricsFileIO implements FileIO {
3433

3534
protected final FileIO fileIO;
36-
protected InputMetrics inputMetrics = null;
37-
protected OutputMetrics outputMetrics = null;
35+
protected IOMetrics ioMetrics = null;
3836

3937
public MetricsFileIO(FileIO fileIO) {
4038
this.fileIO = fileIO;
4139
}
4240

43-
public MetricsFileIO withMetrics(InputMetrics inputMetrics, OutputMetrics outputMetrics) {
44-
this.inputMetrics = inputMetrics;
45-
this.outputMetrics = outputMetrics;
41+
public MetricsFileIO withMetrics(IOMetrics ioMetrics) {
42+
this.ioMetrics = ioMetrics;
4643
return this;
4744
}
4845

@@ -56,7 +53,7 @@ public FileIO getFileIOInternal() {
5653

5754
@VisibleForTesting
5855
public Boolean isMetricsEnabled() {
59-
return inputMetrics != null && outputMetrics != null;
56+
return ioMetrics != null;
6057
}
6158

6259
@Override
@@ -72,13 +69,13 @@ public void configure(CatalogContext context) {
7269
@Override
7370
public SeekableInputStream newInputStream(Path path) throws IOException {
7471
SeekableInputStream inputStream = fileIO.newInputStream(path);
75-
return new SeekableInputStreamIOWrapper(inputStream, this.inputMetrics);
72+
return new SeekableInputStreamIOWrapper(inputStream, this.ioMetrics);
7673
}
7774

7875
@Override
7976
public PositionOutputStream newOutputStream(Path path, boolean overwrite) throws IOException {
8077
PositionOutputStream outputStream = fileIO.newOutputStream(path, overwrite);
81-
return new PositionOutputStreamIOWrapper(outputStream, outputMetrics);
78+
return new PositionOutputStreamIOWrapper(outputStream, this.ioMetrics);
8279
}
8380

8481
@Override

paimon-common/src/main/java/org/apache/paimon/fs/PositionOutputStreamIOWrapper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,19 @@
1818

1919
package org.apache.paimon.fs;
2020

21-
import org.apache.paimon.fs.metrics.OutputMetrics;
21+
import org.apache.paimon.fs.metrics.IOMetrics;
2222

2323
import java.io.IOException;
2424

2525
/** Wrap a {@link PositionOutputStream}. */
2626
public class PositionOutputStreamIOWrapper extends PositionOutputStream {
2727

2828
protected final PositionOutputStream out;
29-
private OutputMetrics metrics;
29+
private IOMetrics metrics;
3030

31-
public PositionOutputStreamIOWrapper(PositionOutputStream out, OutputMetrics outputMetrics) {
31+
public PositionOutputStreamIOWrapper(PositionOutputStream out, IOMetrics metrics) {
3232
this.out = out;
33-
this.metrics = outputMetrics;
33+
this.metrics = metrics;
3434
}
3535

3636
@Override

paimon-common/src/main/java/org/apache/paimon/fs/SeekableInputStreamIOWrapper.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,17 @@
1818

1919
package org.apache.paimon.fs;
2020

21-
import org.apache.paimon.fs.metrics.InputMetrics;
21+
import org.apache.paimon.fs.metrics.IOMetrics;
2222

2323
import java.io.IOException;
2424

2525
/** Wrap a {@link SeekableInputStream}. */
26-
public class SeekableInputStreamIOWrapper extends SeekableInputStream {
26+
public class SeekableInputStreamIOWrapper extends SeekableInputStream implements VectoredReadable {
2727

2828
protected final SeekableInputStream in;
29-
private InputMetrics metrics;
29+
private IOMetrics metrics;
3030

31-
public SeekableInputStreamIOWrapper(SeekableInputStream in, InputMetrics metrics) {
31+
public SeekableInputStreamIOWrapper(SeekableInputStream in, IOMetrics metrics) {
3232
this.in = in;
3333
this.metrics = metrics;
3434
}
@@ -73,4 +73,24 @@ public int read(byte[] b, int off, int len) throws IOException {
7373
public void close() throws IOException {
7474
in.close();
7575
}
76+
77+
@Override
78+
public int pread(long position, byte[] buffer, int offset, int length) throws IOException {
79+
int bytesRead = 0;
80+
try {
81+
if (in instanceof VectoredReadable) {
82+
bytesRead = ((VectoredReadable) in).pread(position, buffer, offset, length);
83+
} else {
84+
long originalPos = in.getPos();
85+
in.seek(position);
86+
bytesRead = in.read(buffer, offset, length);
87+
in.seek(originalPos);
88+
}
89+
} finally {
90+
if (metrics != null && bytesRead != -1) {
91+
metrics.recordReadEvent(bytesRead);
92+
}
93+
}
94+
return bytesRead;
95+
}
7696
}

paimon-common/src/main/java/org/apache/paimon/fs/metrics/InputMetrics.java renamed to paimon-common/src/main/java/org/apache/paimon/fs/metrics/IOMetrics.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,31 @@
2424
import java.util.concurrent.atomic.AtomicLong;
2525

2626
/** Collects and monitors input stream metrics. */
27-
public class InputMetrics {
27+
public class IOMetrics {
2828

29-
public static final String GROUP_NAME = "source";
29+
public static final String GROUP_NAME = "io";
3030
private final MetricGroup metricGroup;
3131

3232
public static final String READ_BYTES = "read.bytes";
3333
public static final String READ_OPERATIONS = "read.operations";
34+
public static final String WRITE_BYTES = "write.bytes";
35+
public static final String WRITE_OPERATIONS = "write.operations";
3436

3537
private final AtomicLong readBytes = new AtomicLong(0);
3638
private final AtomicLong readOperations = new AtomicLong(0);
39+
private final AtomicLong writeBytes = new AtomicLong(0);
40+
private final AtomicLong writeOperations = new AtomicLong(0);
3741

38-
public InputMetrics(MetricRegistry registry, String tableName) {
42+
public IOMetrics(MetricRegistry registry, String tableName) {
3943
metricGroup = registry.createTableMetricGroup(GROUP_NAME, tableName);
4044
registerMetrics();
4145
}
4246

4347
private void registerMetrics() {
4448
metricGroup.gauge(READ_BYTES, this::getReadBytes);
4549
metricGroup.gauge(READ_OPERATIONS, this::getReadOperations);
50+
metricGroup.gauge(WRITE_BYTES, this::getWriteBytes);
51+
metricGroup.gauge(WRITE_OPERATIONS, this::getWriteOperations);
4652
}
4753

4854
public void recordReadEvent(long bytes) {
@@ -57,4 +63,17 @@ public long getReadBytes() {
5763
public long getReadOperations() {
5864
return readOperations.get();
5965
}
66+
67+
public void recordWriteEvent(long bytes) {
68+
writeBytes.addAndGet(bytes);
69+
writeOperations.incrementAndGet();
70+
}
71+
72+
public long getWriteBytes() {
73+
return writeBytes.get();
74+
}
75+
76+
public long getWriteOperations() {
77+
return writeOperations.get();
78+
}
6079
}

paimon-common/src/main/java/org/apache/paimon/fs/metrics/OutputMetrics.java

Lines changed: 0 additions & 57 deletions
This file was deleted.

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@
2525
import org.apache.paimon.fs.FileIO;
2626
import org.apache.paimon.fs.MetricsFileIO;
2727
import org.apache.paimon.fs.Path;
28-
import org.apache.paimon.fs.metrics.InputMetrics;
29-
import org.apache.paimon.fs.metrics.OutputMetrics;
28+
import org.apache.paimon.fs.metrics.IOMetrics;
3029
import org.apache.paimon.manifest.IndexManifestEntry;
3130
import org.apache.paimon.manifest.ManifestEntry;
3231
import org.apache.paimon.manifest.ManifestFileMeta;
@@ -94,8 +93,6 @@ abstract class AbstractFileStoreTable implements FileStoreTable {
9493
private static final String WATERMARK_PREFIX = "watermark-";
9594

9695
protected final MetricsFileIO fileIO;
97-
private InputMetrics inputMetrics;
98-
private OutputMetrics outputMetrics;
9996
protected final Path path;
10097
protected final TableSchema tableSchema;
10198
protected final CatalogEnvironment catalogEnvironment;
@@ -127,9 +124,8 @@ public AbstractFileStoreTable withMetricRegistry(MetricRegistry registry) {
127124
catalogEnvironment.identifier() != null
128125
? catalogEnvironment.identifier().getTableName()
129126
: "unknown";
130-
this.inputMetrics = new InputMetrics(registry, tableName);
131-
this.outputMetrics = new OutputMetrics(registry, tableName);
132-
this.fileIO.withMetrics(inputMetrics, outputMetrics);
127+
IOMetrics ioMetrics = new IOMetrics(registry, tableName);
128+
this.fileIO.withMetrics(ioMetrics);
133129
}
134130
return this;
135131
}

paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public class ReadBuilderImpl implements ReadBuilder {
5555

5656
private boolean dropStats = false;
5757

58+
private MetricRegistry metricRegistry = null;
59+
5860
public ReadBuilderImpl(InnerTable table) {
5961
this.table = table;
6062
}
@@ -66,6 +68,7 @@ public String tableName() {
6668

6769
@Override
6870
public void withMetricsRegistry(MetricRegistry metricRegistry) {
71+
this.metricRegistry = metricRegistry;
6972
this.table.withMetricRegistry(metricRegistry);
7073
}
7174

@@ -156,7 +159,8 @@ public TableScan newScan() {
156159

157160
@Override
158161
public StreamTableScan newStreamScan() {
159-
return (StreamTableScan) configureScan(table.newStreamScan());
162+
return (StreamTableScan)
163+
configureScan(table.newStreamScan()).withMetricRegistry(metricRegistry);
160164
}
161165

162166
private InnerTableScan configureScan(InnerTableScan scan) {
@@ -188,7 +192,8 @@ private InnerTableScan configureScan(InnerTableScan scan) {
188192

189193
@Override
190194
public TableRead newRead() {
191-
InnerTableRead read = table.newRead().withFilter(filter);
195+
InnerTableRead read =
196+
table.newRead().withFilter(filter).withMetricRegistry(this.metricRegistry);
192197
if (readType != null) {
193198
read.withReadType(readType);
194199
}

paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@
2727
import org.apache.paimon.data.BinaryString;
2828
import org.apache.paimon.data.GenericRow;
2929
import org.apache.paimon.data.InternalRow;
30+
import org.apache.paimon.fs.FileIO;
31+
import org.apache.paimon.fs.MetricsFileIO;
3032
import org.apache.paimon.function.Function;
3133
import org.apache.paimon.function.FunctionChange;
3234
import org.apache.paimon.function.FunctionDefinition;
33-
import org.apache.paimon.fs.FileIO;
34-
import org.apache.paimon.fs.MetricsFileIO;
3535
import org.apache.paimon.options.Options;
3636
import org.apache.paimon.partition.Partition;
3737
import org.apache.paimon.partition.PartitionStatistics;

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.paimon.options.Options;
2626
import org.apache.paimon.table.BucketMode;
2727
import org.apache.paimon.table.source.ReadBuilder;
28-
import org.apache.paimon.table.source.StreamDataTableScan;
2928
import org.apache.paimon.table.source.StreamTableScan;
3029

3130
import org.apache.flink.api.connector.source.Boundedness;
@@ -83,10 +82,6 @@ public SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> restoreEnu
8382
readBuilder.withMetricsRegistry(new FlinkMetricRegistry(context.metricGroup()));
8483
}
8584
StreamTableScan scan = readBuilder.newStreamScan();
86-
if (metricGroup(context) != null) {
87-
((StreamDataTableScan) scan)
88-
.withMetricRegistry(new FlinkMetricRegistry(context.metricGroup()));
89-
}
9085
scan.restore(nextSnapshotId);
9186
return buildEnumerator(context, splits, nextSnapshotId, scan);
9287
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ public SourceReader<RowData, FileStoreSourceSplit> createReader(SourceReaderCont
6666
FileStoreSourceReaderMetrics sourceReaderMetrics =
6767
new FileStoreSourceReaderMetrics(metricGroup);
6868
this.readBuilder.withMetricsRegistry(new FlinkMetricRegistry(metricGroup));
69-
TableRead tableRead =
70-
readBuilder.newRead().withMetricRegistry(new FlinkMetricRegistry(metricGroup));
69+
TableRead tableRead = readBuilder.newRead();
7170
return new FileStoreSourceReader(
7271
context,
7372
tableRead,

0 commit comments

Comments
 (0)