Skip to content

Commit fd26e08

Browse files
committed
[FLINK-28804][formats] Use proper stand-ins for missing metric groups
1 parent 4e79064 commit fd26e08

File tree

3 files changed

+6
-29
lines changed

3 files changed

+6
-29
lines changed

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/DeserializationSchemaAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.core.fs.FileInputSplit;
3030
import org.apache.flink.core.fs.Path;
3131
import org.apache.flink.metrics.MetricGroup;
32+
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
3233
import org.apache.flink.table.data.RowData;
3334
import org.apache.flink.util.Collector;
3435
import org.apache.flink.util.InstantiationUtil;
@@ -64,8 +65,7 @@ private DeserializationSchema<RowData> createDeserialization() throws IOExceptio
6465
new DeserializationSchema.InitializationContext() {
6566
@Override
6667
public MetricGroup getMetricGroup() {
67-
throw new UnsupportedOperationException(
68-
"MetricGroup is unsupported in BulkFormat.");
68+
return new UnregisteredMetricsGroup();
6969
}
7070

7171
@Override

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/SerializationSchemaAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.api.common.serialization.Encoder;
2323
import org.apache.flink.api.common.serialization.SerializationSchema;
2424
import org.apache.flink.metrics.MetricGroup;
25+
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
2526
import org.apache.flink.table.data.RowData;
2627
import org.apache.flink.util.UserCodeClassLoader;
2728

@@ -59,8 +60,7 @@ private void checkOpened() throws IOException {
5960
new SerializationSchema.InitializationContext() {
6061
@Override
6162
public MetricGroup getMetricGroup() {
62-
throw new UnsupportedOperationException(
63-
"MetricGroup is unsupported in BulkFormat.");
63+
return new UnregisteredMetricsGroup();
6464
}
6565

6666
@Override

flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919

2020
import org.apache.flink.api.common.serialization.SerializationSchema;
2121
import org.apache.flink.api.common.serialization.SimpleStringSchema;
22-
import org.apache.flink.metrics.MetricGroup;
22+
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
2323
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
2424
import org.apache.flink.util.TestLogger;
25-
import org.apache.flink.util.UserCodeClassLoader;
2625

2726
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
2827
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
@@ -351,28 +350,6 @@ public int partition(
351350
}
352351

353352
private void open(KafkaRecordSerializationSchema<String> schema) throws Exception {
354-
schema.open(
355-
new SerializationSchema.InitializationContext() {
356-
@Override
357-
public MetricGroup getMetricGroup() {
358-
return null;
359-
}
360-
361-
@Override
362-
public UserCodeClassLoader getUserCodeClassLoader() {
363-
return new UserCodeClassLoader() {
364-
@Override
365-
public ClassLoader asClassLoader() {
366-
return KafkaRecordSerializationSchemaBuilderTest.class
367-
.getClassLoader();
368-
}
369-
370-
@Override
371-
public void registerReleaseHookIfAbsent(
372-
String releaseHookName, Runnable releaseHook) {}
373-
};
374-
}
375-
},
376-
null);
353+
schema.open(new DummyInitializationContext(), null);
377354
}
378355
}

0 commit comments

Comments
 (0)