-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[Flink]add Flink Writer Metric #2193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Flink]add Flink Writer Metric #2193
Conversation
0ea3f1b
to
64da853
Compare
64da853
to
c934cac
Compare
c934cac
to
776a03e
Compare
|
||
if (writerMetrics != null) { | ||
writerMetrics.incWriteRecordNum(); | ||
writerMetrics.updateWriteCostMS(System.currentTimeMillis() - start); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For most of the time, System.currentTimeMillis() - start
will be 0
because records are only buffered in memory. This metrics is only meaningful when we write files to disks or perform compaction.
Also, calling System.currentTimeMillis()
for each write will introduce performance issue. Please be very careful with code which is called per record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
public void updatePrepareCommitCostMS(long cost) { | ||
this.prepareCommitCostMS.update(cost); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: prepareCommitCostMillis
or prepareCommitCostMs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
compactManager | ||
.getCompactionResult(blocking) | ||
.ifPresent( | ||
result -> { | ||
compactBefore.addAll(result.before()); | ||
compactAfter.addAll(result.after()); | ||
}); | ||
writerMetrics.updateSyncLastestCompactionCostMS(System.currentTimeMillis() - start); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If blocking
is false this value is certainly 0
because we won't wait for compaction to end. Why do you record this metric?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
if (writerMetrics != null) { | ||
writerMetrics.incWriteRecordNum(); | ||
writerMetrics.updateWriteCostMS(System.currentTimeMillis() - start); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
Optional<CompactResult> result = compactManager.getCompactionResult(blocking); | ||
result.ifPresent(this::updateCompactResult); | ||
writerMetrics.updateSyncLastestCompactionCostMS(System.currentTimeMillis() - start); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
@@ -125,4 +126,6 @@ List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier) | |||
|
|||
/** With metrics to measure compaction. */ | |||
FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry); | |||
|
|||
WriterMetrics getWriterMetrics(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you add this method in the interface when it is only used internally by AbstractFileStoreWrite
and its subclasses?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public void setMemoryPreemptCount(Supplier<Long> bufferPreemptNumSupplier) { | ||
this.stats.bufferPreemptCount = bufferPreemptNumSupplier; | ||
} | ||
|
||
public void setUsedWriteBufferSize(Supplier<Long> usedWriteBufferSize) { | ||
this.stats.usedWriteBufferSize = usedWriteBufferSize; | ||
} | ||
|
||
public void setTotaldWriteBufferSize(Supplier<Long> totaldWriteBufferSize) { | ||
this.stats.totalWriteBufferSize = totaldWriteBufferSize; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Call these methods directly from MemoryPoolFactory
and we don't need to introduce the inner Stats
class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public MetricGroup tableMetricGroup(String groupName, String tableName, String commitUser) { | ||
Map<String, String> variables = new LinkedHashMap<>(); | ||
variables.put(KEY_TABLE, tableName); | ||
if (commitUser != null) { | ||
variables.put(COMMIT_USER, commitUser); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why adding commitUser
as a metric group variable? For Flink jobs using job id is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to associate the Flink job with the commitUser through the tag, making it easier for users to track which Flink job generated the snapshot of the Paimon table. This might be a bit redundant.
8f24a21
to
a517e95
Compare
ba5a338
to
d413373
Compare
public void updateBufferFlushCostMS(long bufferFlushCost) { | ||
bufferFlushCostMillis.update(bufferFlushCost); | ||
} | ||
|
||
public void updatePrepareCommitCostMS(long cost) { | ||
this.prepareCommitCostMillis.update(cost); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should also change method name to Millis
, not MS
.
@@ -77,7 +78,10 @@ public abstract class AbstractFileStoreWrite<T> | |||
private boolean closeCompactExecutorWhenLeaving = true; | |||
private boolean ignorePreviousFiles = false; | |||
protected boolean isStreamingMode = false; | |||
private MetricRegistry metricRegistry = null; | |||
private MetricRegistry metricRegistry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove useless changes.
@Nullable | ||
public WriterMetrics getWriterMetrics() { | ||
if (metricRegistry != null && writerMetrics == null) { | ||
writerMetrics = new WriterMetrics(metricRegistry, tableName); | ||
} | ||
return writerMetrics; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for lazy allocation. Just create a new writeMetrics
in withMetricRegistry
.
private static final String GROUP_NAME = "writer"; | ||
|
||
private static final int WINDOW_SAMPLE_SIZE = 100; | ||
private static final String WRITE_RECORD_NUM = "writeRecordCount"; | ||
|
||
private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount"; | ||
|
||
private static final String USED_WRITE_BUFFER_SIZE = "usedWriteBufferSizeByte"; | ||
|
||
private static final String TOTAL_WRITE_BUFFER_SIZE = "totalWriteBufferSizeByte"; | ||
|
||
private static final String FLUSH_COST_MILLIS = "flushCostMillis"; | ||
|
||
public static final String PREPARE_COMMIT_COST_MILLIS = "prepareCommitCostMillis"; | ||
|
||
private final Counter writeRecordNumCounter; | ||
|
||
private final Histogram bufferFlushCostMillis; | ||
|
||
private final Histogram prepareCommitCostMillis; | ||
|
||
private MetricGroup metricGroup; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that buffer flushing and prepare commit is related to a certain writer (a certain bucket), while buffer preempt and buffer counts are related to all writers in this subtask. Why do you mix these metrics into one class?
c88c5b5
to
ca0b6be
Compare
private final String tableName; | ||
private final FileStorePathFactory pathFactory; | ||
|
||
protected WriterBufferMetric writerBufferMetric; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this class member ever used?
Supplier<MemoryPoolFactory> memoryPoolFactorySupplier = writeBufferPoolSupplier(); | ||
if (metricRegistry != null) { | ||
writerBufferMetric = | ||
new WriterBufferMetric(memoryPoolFactorySupplier, metricRegistry, tableName); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for a Supplier
. Just override this method in MemoryFileStoreWrite
.
// cost | ||
bufferFlushCostMillis = metricGroup.histogram(FLUSH_COST_MILLIS, WINDOW_SAMPLE_SIZE); | ||
|
||
// prepareCommittime | ||
prepareCommitCostMillis = | ||
metricGroup.histogram(PREPARE_COMMIT_COST_MILLIS, WINDOW_SAMPLE_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't quite understand what these comments mean. You should either delete them or add more details.
public WriterMetrics(MetricRegistry registry, String tableName, String parition, int bucket) { | ||
MetricGroup metricGroup = | ||
registry.bucketMetricGroup(GROUP_NAME, tableName, parition, bucket); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bucket metric groups should be unregistered after the corresponding writers are closed, because a long-running streaming job may modify many many buckets. Add a close
method to this class to avoid OOM.
042b16a
to
4a18b17
Compare
private MetricGroup metricGroup; | ||
|
||
public WriterBufferMetric( | ||
Supplier<MemoryPoolFactory> memoryPoolFactorySupplier, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need a Supplier
? Why not use MemoryPoolFactory
directly?
a013b93
to
332784f
Compare
332784f
to
5de9022
Compare
add writer metrics:
Test