Skip to content

Commit 43052af

Browse files
davemarcogibber9809
authored andcommitted
feat(webui): Dump query results from Presto into MongoDB. (y-scope#1150)
1 parent 53c6933 commit 43052af

File tree

2 files changed

+85
-5
lines changed

2 files changed

+85
-5
lines changed

components/webui/server/src/routes/api/presto-search/index.ts

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,28 @@ import {
66
PrestoQueryJobCreationSchema,
77
PrestoQueryJobSchema,
88
} from "../../../schemas/presto-search.js";
9+
import {insertPrestoRowsToMongo} from "./utils.js";
910

1011

1112
/**
1213
* Presto search API routes.
1314
*
1415
* @param fastify
1516
*/
17+
// eslint-disable-next-line max-lines-per-function
1618
const plugin: FastifyPluginAsyncTypebox = async (fastify) => {
17-
const {Presto} = fastify;
19+
const {Presto, mongo} = fastify;
20+
const mongoDb = mongo.db;
1821

1922
if ("undefined" === typeof Presto) {
2023
// If Presto client is not available, skip the plugin registration.
2124
return;
2225
}
2326

27+
if ("undefined" === typeof mongoDb) {
28+
throw new Error("MongoDB database not found");
29+
}
30+
2431
/**
2532
* Submits a search query.
2633
*/
@@ -36,7 +43,7 @@ const plugin: FastifyPluginAsyncTypebox = async (fastify) => {
3643
tags: ["Presto Search"],
3744
},
3845
},
39-
46+
// eslint-disable-next-line max-lines-per-function
4047
async (request, reply) => {
4148
const {queryString} = request.body;
4249

@@ -45,13 +52,39 @@ const plugin: FastifyPluginAsyncTypebox = async (fastify) => {
4552
try {
4653
searchJobId = await new Promise<string>((resolve, reject) => {
4754
let isResolved = false;
48-
4955
Presto.client.execute({
5056
// eslint-disable-next-line no-warning-comments
51-
// TODO: Data, error, and success handlers are dummy implementations
57+
// TODO: Error, and success handlers are dummy implementations
5258
// and will be replaced with proper implementations.
5359
data: (_, data, columns) => {
54-
request.log.info({columns, data}, "Presto data");
60+
request.log.info(
61+
`Received ${data.length} rows from Presto query`
62+
);
63+
64+
if (false === isResolved) {
65+
request.log.error(
66+
"Presto data received before searchJobId was resolved; " +
67+
"skipping insert."
68+
);
69+
70+
return;
71+
}
72+
73+
if (0 === data.length) {
74+
return;
75+
}
76+
77+
insertPrestoRowsToMongo(
78+
data,
79+
columns,
80+
searchJobId,
81+
mongoDb
82+
).catch((err: unknown) => {
83+
request.log.error(
84+
err,
85+
"Failed to insert Presto results into MongoDB"
86+
);
87+
});
5588
},
5689
error: (error) => {
5790
request.log.info(error, "Presto search failed");
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import type {
2+
Db,
3+
InsertManyResult,
4+
} from "mongodb";
5+
6+
7+
/**
8+
* Converts a Presto result row (array of values) into an object, using the provided column
9+
* definitions to assign property names.
10+
*
11+
* @param row Array of values representing a single Presto result row.
12+
* @param columns Array of column definitions, each containing a `name` property.
13+
* @return An object mapping each column name to its corresponding value from the row.
14+
*/
15+
const prestoRowToObject = (
16+
row: unknown[],
17+
columns: {name: string}[]
18+
): Record<string, unknown> => {
19+
const obj: Record<string, unknown> = {};
20+
columns.forEach((col, idx) => {
21+
obj[col.name] = row[idx];
22+
});
23+
24+
return obj;
25+
};
26+
27+
/**
28+
* Inserts Presto rows into a MongoDB collection for a given search job.
29+
*
30+
* @param data Array of Presto result rows
31+
* @param columns Array of column definitions
32+
* @param searchJobId
33+
* @param mongoDb
34+
* @return Promise that resolves when the insertion is complete
35+
*/
36+
const insertPrestoRowsToMongo = (
37+
data: unknown[][],
38+
columns: {name: string}[],
39+
searchJobId: string,
40+
mongoDb: Db
41+
): Promise<InsertManyResult<Document>> => {
42+
const collection = mongoDb.collection(searchJobId);
43+
const resultDocs = data.map((row) => prestoRowToObject(row, columns));
44+
return collection.insertMany(resultDocs);
45+
};
46+
47+
export {insertPrestoRowsToMongo};

0 commit comments

Comments
 (0)