Skip to content

Commit d5ec9f8

Browse files
committed
[FLINK-36525] address comments.
1 parent 3bbcb6d commit d5ec9f8

File tree

4 files changed

+14
-17
lines changed

4 files changed

+14
-17
lines changed

docs/content/docs/core-concept/transform.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ The following built-in models are provided:
415415
| parameter | type | optional/required | meaning |
416416
|--------------------|--------|-------------------|--------------------------------------------------------------------------------------------------------------------------------------|
417417
| openai.model | STRING | required | Name of model to be called, for example: "gpt-4o-mini", Available options are "gpt-4o-mini", "gpt-4o", "gpt-4-32k", "gpt-3.5-turbo". |
418-
| openai.host | STRING | required | Host of the Model server to be connected, for example: "http://langchain4j.dev/demo/openai/v1". |
418+
| openai.host | STRING | required | Host of the Model server to be connected, for example: `http://langchain4j.dev/demo/openai/v1`. |
419419
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
420420
| openai.chat.prompt | STRING | optional | Prompt for chatting with OpenAI, for example: "Please summary this ". |
421421

@@ -424,7 +424,7 @@ The following built-in models are provided:
424424
| parameter | type | optional/required | meaning |
425425
|---------------|--------|-------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
426426
| openai.model | STRING | required | Name of model to be called, for example: "text-embedding-3-small", Available options are "text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002". |
427-
| openai.host | STRING | required | Host of the Model server to be connected, for example: "http://langchain4j.dev/demo/openai/v1". |
427+
| openai.host | STRING | required | Host of the Model server to be connected, for example: `http://langchain4j.dev/demo/openai/v1`. |
428428
| openai.apikey | STRING | required | Api Key for verification of the Model server, for example, "demo". |
429429

430430

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@
3838
*/
3939
public class TransformTranslator {
4040

41+
/** Package of built-in model. */
42+
public static final String PREFIX_CLASSPATH_BUILT_IN_MODEL =
43+
"org.apache.flink.cdc.runtime.model.";
44+
4145
public DataStream<Event> translatePreTransform(
4246
DataStream<Event> input,
4347
List<TransformDef> transforms,
@@ -100,7 +104,10 @@ public DataStream<Event> translatePostTransform(
100104
}
101105

102106
private Tuple3<String, String, Map<String, String>> modelToUDFTuple(ModelDef model) {
103-
return Tuple3.of(model.getModelName(), model.getClassName(), model.getParameters());
107+
return Tuple3.of(
108+
model.getModelName(),
109+
PREFIX_CLASSPATH_BUILT_IN_MODEL + model.getClassName(),
110+
model.getParameters());
104111
}
105112

106113
private Tuple3<String, String, Map<String, String>> udfDefToUDFTuple(UdfDef udf) {

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -829,7 +829,7 @@ void testComplicatedFlinkUdf(ValuesDataSink.SinkApi sinkApi, String language) th
829829

830830
@ParameterizedTest
831831
@MethodSource("testParams")
832-
void testTransformWithModel(ValuesDataSink.SinkApi sinkApi, String language) throws Exception {
832+
void testTransformWithModel(ValuesDataSink.SinkApi sinkApi) throws Exception {
833833
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
834834

835835
// Setup value source

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/UserDefinedFunctionDescriptor.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,30 +40,20 @@ public class UserDefinedFunctionDescriptor implements Serializable {
4040
private final String className;
4141
private final DataType returnTypeHint;
4242
private final boolean isCdcPipelineUdf;
43-
4443
private final Map<String, String> parameters;
4544

46-
/** Package of built-in model. */
47-
public static final String PREFIX_CLASSPATH_BUILT_IN_MODEL =
48-
"org.apache.flink.cdc.runtime.model.";
49-
5045
public UserDefinedFunctionDescriptor(String name, String classpath) {
5146
this(name, classpath, new HashMap<>());
5247
}
5348

5449
public UserDefinedFunctionDescriptor(
5550
String name, String classpath, Map<String, String> parameters) {
56-
if (classpath.contains(".")) {
57-
this.className = classpath.substring(classpath.lastIndexOf('.') + 1);
58-
this.classpath = classpath;
59-
} else {
60-
this.className = classpath;
61-
this.classpath = PREFIX_CLASSPATH_BUILT_IN_MODEL + classpath;
62-
}
6351
this.name = name;
6452
this.parameters = parameters;
53+
this.classpath = classpath;
54+
this.className = classpath.substring(classpath.lastIndexOf('.') + 1);
6555
try {
66-
Class<?> clazz = Class.forName(this.classpath);
56+
Class<?> clazz = Class.forName(classpath);
6757
isCdcPipelineUdf = isCdcPipelineUdf(clazz);
6858
if (isCdcPipelineUdf) {
6959
// We use reflection to invoke UDF methods since we may add more methods

0 commit comments

Comments
 (0)