Skip to content

Conversation

wg1026688210
Copy link
Contributor

@wg1026688210 wg1026688210 commented Oct 29, 2023

add writer metrics:

  • writeRecordCount
  • bufferPreemptCount
  • usedWriteBufferSizeByte
  • totalWriteBufferSizeByte
  • flushCostMillis
  • prepareCommitCostMillis

Test

  • WriterOperatorTestBase.java
  • AppendOnlyWriterOperatorTest.java
  • PrimaryKeyWriterOperatorTest

@wg1026688210 wg1026688210 force-pushed the metric/add_write_metric_v2 branch from 0ea3f1b to 64da853 Compare October 30, 2023 02:31
@wg1026688210 wg1026688210 changed the title [WIP]add write metric [WIP]add Flink Write Metric Oct 30, 2023
@wg1026688210 wg1026688210 force-pushed the metric/add_write_metric_v2 branch from 64da853 to c934cac Compare November 12, 2023 15:08
@wg1026688210 wg1026688210 marked this pull request as ready for review November 12, 2023 15:09
@wg1026688210
Copy link
Contributor Author

PTAL @tsreaper @schnappi17 @JingsongLi

@wg1026688210 wg1026688210 force-pushed the metric/add_write_metric_v2 branch from c934cac to 776a03e Compare November 12, 2023 15:16
@wg1026688210 wg1026688210 changed the title [WIP]add Flink Write Metric [Flink]add Flink Writer Metric Nov 13, 2023

if (writerMetrics != null) {
writerMetrics.incWriteRecordNum();
writerMetrics.updateWriteCostMS(System.currentTimeMillis() - start);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For most of the time, System.currentTimeMillis() - start will be 0 because records are only buffered in memory. This metrics is only meaningful when we write files to disks or perform compaction.

Also, calling System.currentTimeMillis() for each write will introduce performance issue. Please be very careful with code which is called per record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

public void updatePrepareCommitCostMS(long cost) {
this.prepareCommitCostMS.update(cost);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: prepareCommitCostMillis or prepareCommitCostMs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

compactManager
.getCompactionResult(blocking)
.ifPresent(
result -> {
compactBefore.addAll(result.before());
compactAfter.addAll(result.after());
});
writerMetrics.updateSyncLastestCompactionCostMS(System.currentTimeMillis() - start);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If blocking is false this value is certainly 0 because we won't wait for compaction to end. Why do you record this metric?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


if (writerMetrics != null) {
writerMetrics.incWriteRecordNum();
writerMetrics.updateWriteCostMS(System.currentTimeMillis() - start);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Optional<CompactResult> result = compactManager.getCompactionResult(blocking);
result.ifPresent(this::updateCompactResult);
writerMetrics.updateSyncLastestCompactionCostMS(System.currentTimeMillis() - start);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

@@ -125,4 +126,6 @@ List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)

/** With metrics to measure compaction. */
FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);

WriterMetrics getWriterMetrics();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you add this method in the interface when it is only used internally by AbstractFileStoreWrite and its subclasses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 115 to 62
public void setMemoryPreemptCount(Supplier<Long> bufferPreemptNumSupplier) {
this.stats.bufferPreemptCount = bufferPreemptNumSupplier;
}

public void setUsedWriteBufferSize(Supplier<Long> usedWriteBufferSize) {
this.stats.usedWriteBufferSize = usedWriteBufferSize;
}

public void setTotaldWriteBufferSize(Supplier<Long> totaldWriteBufferSize) {
this.stats.totalWriteBufferSize = totaldWriteBufferSize;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call these methods directly from MemoryPoolFactory and we don't need to introduce the inner Stats class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 36 to 41
public MetricGroup tableMetricGroup(String groupName, String tableName, String commitUser) {
Map<String, String> variables = new LinkedHashMap<>();
variables.put(KEY_TABLE, tableName);
if (commitUser != null) {
variables.put(COMMIT_USER, commitUser);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why adding commitUser as a metric group variable? For Flink jobs using job id is enough.

Copy link
Contributor Author

@wg1026688210 wg1026688210 Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to associate the Flink job with the commitUser through the tag, making it easier for users to track which Flink job generated the snapshot of the Paimon table. This might be a bit redundant.

@wg1026688210 wg1026688210 force-pushed the metric/add_write_metric_v2 branch 2 times, most recently from 8f24a21 to a517e95 Compare November 14, 2023 10:51
@wg1026688210 wg1026688210 force-pushed the metric/add_write_metric_v2 branch from ba5a338 to d413373 Compare November 15, 2023 07:37
Comment on lines 70 to 66
public void updateBufferFlushCostMS(long bufferFlushCost) {
bufferFlushCostMillis.update(bufferFlushCost);
}

public void updatePrepareCommitCostMS(long cost) {
this.prepareCommitCostMillis.update(cost);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should also change method name to Millis, not MS.

@@ -77,7 +78,10 @@ public abstract class AbstractFileStoreWrite<T>
private boolean closeCompactExecutorWhenLeaving = true;
private boolean ignorePreviousFiles = false;
protected boolean isStreamingMode = false;
private MetricRegistry metricRegistry = null;
private MetricRegistry metricRegistry;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove useless changes.

Comment on lines 376 to 381
@Nullable
public WriterMetrics getWriterMetrics() {
if (metricRegistry != null && writerMetrics == null) {
writerMetrics = new WriterMetrics(metricRegistry, tableName);
}
return writerMetrics;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for lazy allocation. Just create a new writeMetrics in withMetricRegistry.

Comment on lines 31 to 44
private static final String GROUP_NAME = "writer";

private static final int WINDOW_SAMPLE_SIZE = 100;
private static final String WRITE_RECORD_NUM = "writeRecordCount";

private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";

private static final String USED_WRITE_BUFFER_SIZE = "usedWriteBufferSizeByte";

private static final String TOTAL_WRITE_BUFFER_SIZE = "totalWriteBufferSizeByte";

private static final String FLUSH_COST_MILLIS = "flushCostMillis";

public static final String PREPARE_COMMIT_COST_MILLIS = "prepareCommitCostMillis";

private final Counter writeRecordNumCounter;

private final Histogram bufferFlushCostMillis;

private final Histogram prepareCommitCostMillis;

private MetricGroup metricGroup;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that buffer flushing and prepare commit is related to a certain writer (a certain bucket), while buffer preempt and buffer counts are related to all writers in this subtask. Why do you mix these metrics into one class?

@wg1026688210 wg1026688210 force-pushed the metric/add_write_metric_v2 branch 2 times, most recently from c88c5b5 to ca0b6be Compare November 21, 2023 01:40
private final String tableName;
private final FileStorePathFactory pathFactory;

protected WriterBufferMetric writerBufferMetric;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this class member ever used?

Comment on lines 368 to 372
Supplier<MemoryPoolFactory> memoryPoolFactorySupplier = writeBufferPoolSupplier();
if (metricRegistry != null) {
writerBufferMetric =
new WriterBufferMetric(memoryPoolFactorySupplier, metricRegistry, tableName);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a Supplier. Just override this method in MemoryFileStoreWrite.

Comment on lines 49 to 53
// cost
bufferFlushCostMillis = metricGroup.histogram(FLUSH_COST_MILLIS, WINDOW_SAMPLE_SIZE);

// prepareCommittime
prepareCommitCostMillis =
metricGroup.histogram(PREPARE_COMMIT_COST_MILLIS, WINDOW_SAMPLE_SIZE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't quite understand what these comments mean. You should either delete them or add more details.

Comment on lines 44 to 46
public WriterMetrics(MetricRegistry registry, String tableName, String parition, int bucket) {
MetricGroup metricGroup =
registry.bucketMetricGroup(GROUP_NAME, tableName, parition, bucket);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bucket metric groups should be unregistered after the corresponding writers are closed, because a long-running streaming job may modify many many buckets. Add a close method to this class to avoid OOM.

@wg1026688210 wg1026688210 force-pushed the metric/add_write_metric_v2 branch 3 times, most recently from 042b16a to 4a18b17 Compare November 21, 2023 11:53
private MetricGroup metricGroup;

public WriterBufferMetric(
Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a Supplier? Why not use MemoryPoolFactory directly?

@wg1026688210 wg1026688210 force-pushed the metric/add_write_metric_v2 branch 2 times, most recently from a013b93 to 332784f Compare November 22, 2023 08:25
@wg1026688210 wg1026688210 force-pushed the metric/add_write_metric_v2 branch from 332784f to 5de9022 Compare November 22, 2023 10:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants