|
1 | 1 | /** CCLoudResourceLoader Flink statement utils for AI Models */ |
| 2 | +import { Logger } from "../../logging"; |
2 | 3 | import { FlinkAIModel } from "../../models/flinkAiModel"; |
3 | 4 | import type { CCloudFlinkDbKafkaCluster } from "../../models/kafkaCluster"; |
4 | 5 |
|
| 6 | +const logger = new Logger("flinkAiModelsQuery"); |
| 7 | + |
5 | 8 | /** |
6 | 9 | * Generate the query to list all available Flink AI models for a given Flink catalog+database. |
7 | | - * @see https://docs.confluent.io/cloud/current/flink/reference/statements/show.html#flink-sql-show-models |
| 10 | + * Uses INFORMATION_SCHEMA to get detailed model information including default version, version count, and comment. |
| 11 | + * @see https://docs.confluent.io/cloud/current/flink/reference/flink-sql-information-schema.html#models |
8 | 12 | */ |
9 | 13 | export function getFlinkAIModelsQuery(database: CCloudFlinkDbKafkaCluster): string { |
10 | | - return `SHOW MODELS FROM \`${database.environmentId}\`.\`${database.id}\``; |
| 14 | + return ` |
| 15 | + -- Model toplevel definitions |
| 16 | + select |
| 17 | + 'model' as \`rowType\`, |
| 18 | + \`MODEL_NAME\` as \`modelName\`, |
| 19 | + \`DEFAULT_VERSION\` as \`defaultVersion\`, |
| 20 | + \`VERSION_COUNT\` as \`versionCount\`, |
| 21 | + \`COMMENT\` as \`comment\`, |
| 22 | + CAST(NULL AS STRING) as \`optionKey\`, |
| 23 | + CAST(NULL AS STRING) as \`optionValue\`, |
| 24 | + CAST(NULL AS STRING) as \`version\` |
| 25 | + from \`INFORMATION_SCHEMA\`.\`MODELS\` |
| 26 | + where \`MODEL_SCHEMA_ID\` = '${database.id}' |
| 27 | +
|
| 28 | + union all |
| 29 | +
|
| 30 | + -- Model options (WITH clause configuration) |
| 31 | + select |
| 32 | + 'modelOption' as \`rowType\`, |
| 33 | + \`MODEL_NAME\` as \`modelName\`, |
| 34 | + CAST(NULL AS STRING) as \`defaultVersion\`, |
| 35 | + CAST(NULL AS INT) as \`versionCount\`, |
| 36 | + CAST(NULL AS STRING) as \`comment\`, |
| 37 | + \`OPTION_KEY\` as \`optionKey\`, |
| 38 | + \`OPTION_VALUE\` as \`optionValue\`, |
| 39 | + \`VERSION\` as \`version\` |
| 40 | + from \`INFORMATION_SCHEMA\`.\`MODEL_OPTIONS\` |
| 41 | + where \`MODEL_SCHEMA_ID\` = '${database.id}' |
| 42 | + `; |
| 43 | +} |
| 44 | + |
| 45 | +type StringBoolean = "YES" | "NO"; |
| 46 | + |
| 47 | +/** Describes rows from the models query describing the model as a whole */ |
| 48 | +export interface RawModelRow { |
| 49 | + rowType: "model"; |
| 50 | + modelName: string; |
| 51 | + defaultVersion: string; |
| 52 | + versionCount: number; |
| 53 | + comment: string | null; |
| 54 | + optionKey: null; |
| 55 | + optionValue: null; |
| 56 | + version: null; |
11 | 57 | } |
12 | 58 |
|
13 | | -/** Raw results type corresponding to `SHOW MODELS` query */ |
14 | | -export type RawFlinkAIModelRow = { |
15 | | - "Model Name": string; |
16 | | -}; |
| 59 | +/** Describes rows from the models query describing a single option for a model */ |
| 60 | +export interface RawModelOptionRow { |
| 61 | + rowType: "modelOption"; |
| 62 | + modelName: string; |
| 63 | + defaultVersion: null; |
| 64 | + versionCount: null; |
| 65 | + comment: null; |
| 66 | + optionKey: string; |
| 67 | + optionValue: string; |
| 68 | + version: string; |
| 69 | +} |
| 70 | + |
| 71 | +/** Raw results type corresponding to the models INFORMATION_SCHEMA query */ |
| 72 | +export type RawFlinkAIModelRow = RawModelRow | RawModelOptionRow; |
17 | 73 |
|
18 | 74 | /** |
19 | | - * Transform raw model rows from the `SHOW MODELS` query into basic {@link FlinkAIModel} objects. |
| 75 | + * Transform raw model rows from the INFORMATION_SCHEMA models query into {@link FlinkAIModel} objects. |
| 76 | + * Processes mixed model definition rows and model option rows to build complete model objects with their options. |
20 | 77 | * |
21 | 78 | * @param database What cluster these models belong to |
22 | | - * @param rawResults The raw rows from the `SHOW MODELS` query |
| 79 | + * @param rawResults The raw rows from the INFORMATION_SCHEMA models query (both model and modelOption rows) |
23 | 80 | * @returns Array of {@link FlinkAIModel} objects, sorted by name. |
24 | 81 | */ |
25 | 82 | export function transformRawFlinkAIModelRows( |
26 | 83 | database: CCloudFlinkDbKafkaCluster, |
27 | 84 | rawResults: RawFlinkAIModelRow[], |
28 | 85 | ): FlinkAIModel[] { |
29 | | - const models: FlinkAIModel[] = rawResults.map((row) => { |
30 | | - return new FlinkAIModel({ |
31 | | - environmentId: database.environmentId, |
32 | | - provider: database.provider, |
33 | | - region: database.region, |
34 | | - databaseId: database.id, |
35 | | - name: row["Model Name"], |
36 | | - }); |
37 | | - }); |
| 86 | + logger.debug( |
| 87 | + `Transforming ${rawResults.length} raw model rows for cluster ${database.name} (${database.id})`, |
| 88 | + ); |
| 89 | + |
| 90 | + // Sort rows to ensure model definition comes before its options |
| 91 | + sortRawModelRows(rawResults); |
| 92 | + |
| 93 | + const models: FlinkAIModel[] = []; |
| 94 | + let currentModel: FlinkAIModel | null = null; |
| 95 | + const seenModelNames = new Set<string>(); |
| 96 | + |
| 97 | + for (const row of rawResults) { |
| 98 | + if (row.rowType === "model") { |
| 99 | + // Create new model |
| 100 | + if (seenModelNames.has(row.modelName)) { |
| 101 | + throw new Error(`Duplicate model name ${row.modelName} in INFORMATION_SCHEMA results`); |
| 102 | + } |
| 103 | + seenModelNames.add(row.modelName); |
38 | 104 |
|
| 105 | + currentModel = new FlinkAIModel({ |
| 106 | + environmentId: database.environmentId, |
| 107 | + provider: database.provider, |
| 108 | + region: database.region, |
| 109 | + databaseId: database.id, |
| 110 | + name: row.modelName, |
| 111 | + defaultVersion: row.defaultVersion, |
| 112 | + versionCount: row.versionCount, |
| 113 | + comment: row.comment, |
| 114 | + options: new Map(), |
| 115 | + }); |
| 116 | + |
| 117 | + models.push(currentModel); |
| 118 | + } else { |
| 119 | + // Model option row |
| 120 | + if (currentModel === null || currentModel.name !== row.modelName) { |
| 121 | + throw new Error( |
| 122 | + `Unexpected model option row for model ${row.modelName} when current model is ${currentModel?.name}`, |
| 123 | + ); |
| 124 | + } |
| 125 | + |
| 126 | + // Add option to current model's options map, keyed by version |
| 127 | + const versionKey = row.version || "default"; |
| 128 | + if (!currentModel.options.has(versionKey)) { |
| 129 | + currentModel.options.set(versionKey, new Map()); |
| 130 | + } |
| 131 | + const optionsMap = currentModel.options.get(versionKey)!; |
| 132 | + optionsMap.set(row.optionKey, row.optionValue); |
| 133 | + } |
| 134 | + } |
| 135 | + |
| 136 | + logger.debug(`Transformed to ${models.length} FlinkAIModel objects`); |
| 137 | + |
| 138 | + // Sort models by name |
39 | 139 | models.sort((a, b) => a.name.localeCompare(b.name)); |
40 | 140 | return models; |
41 | 141 | } |
| 142 | + |
| 143 | +/** |
| 144 | + * Sorts RawFlinkAIModelRow[] by modelName, then by rowType (model rows first). |
| 145 | + * This ensures that each model's definition row comes before its option rows. |
| 146 | + */ |
| 147 | +function sortRawModelRows(rows: RawFlinkAIModelRow[]): void { |
| 148 | + rows.sort((a, b) => { |
| 149 | + // First sort by model name |
| 150 | + if (a.modelName !== b.modelName) { |
| 151 | + return a.modelName.localeCompare(b.modelName); |
| 152 | + } |
| 153 | + |
| 154 | + // Then sort by row type (model rows first) |
| 155 | + return rowRank(a) - rowRank(b); |
| 156 | + }); |
| 157 | +} |
| 158 | + |
| 159 | +/** Assist in sorting the row types: model rows come before option rows */ |
| 160 | +function rowRank(row: RawFlinkAIModelRow): number { |
| 161 | + switch (row.rowType) { |
| 162 | + case "model": |
| 163 | + return 0; |
| 164 | + case "modelOption": |
| 165 | + return 1; |
| 166 | + } |
| 167 | +} |
0 commit comments