From f07956e90efb646085d9ea07820590959e141641 Mon Sep 17 00:00:00 2001 From: Terence Yim <885032+chtyim@users.noreply.github.com> Date: Wed, 3 Apr 2024 11:02:35 -0700 Subject: [PATCH] Added BQML pipeline library for remote inference (#2) * Added BQML pipeline library for remote inference --------- Co-authored-by: jortiz16 --- LICENSE | 201 ++++++++++++++++++++++++ README.md | 272 +++++++++++++++++++++++++++++++++ index.js | 4 + modules/object_table_ml.js | 163 ++++++++++++++++++++ modules/structured_table_ml.js | 166 ++++++++++++++++++++ modules/utils.js | 22 +++ package-lock.json | 25 +++ package.json | 6 + 8 files changed, 859 insertions(+) create mode 100644 LICENSE create mode 100644 README.md create mode 100644 index.js create mode 100644 modules/object_table_ml.js create mode 100644 modules/structured_table_ml.js create mode 100644 modules/utils.js create mode 100644 package-lock.json create mode 100644 package.json diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..c9054d2 --- /dev/null +++ b/README.md @@ -0,0 +1,272 @@ +# BigQuery Remote Inference Pipeline +BigQuery supports remote models, such as Vertex AI LLMs, to perform remote inference operations +on both structured and unstructured data. When using remote inference, the user needs to pay +attention to [quotas and limits](https://cloud.google.com/bigquery/quotas#cloud_ai_service_functions), +which can result in retryable error in a subset of rows and require reprocessing. + +The BQML dataform library assists users to create BQML pipelines that are resilient to +transient failures by automatic reprocessing and incrementally updating the output table. + +## Quick Start Guide +### Installation +Add the bqml package to your package.json file in your Dataform project. +You can find the most up to date package version on the releases page. + +### Usage +The following example shows how to generate text from images using the +Vertex AI multimodel. + +```javascript +// Import the module +const bqml = require("bqml"); + +// Name of the multimodel that has `gemini-pro-vision` as the endpoint +let model = "multi-llm"; +// Name of the object table that points to a set of images +let source_table = "product_image"; +// Name of the table for storing the result +let output_table = "product_image_description"; + +// Execute the pipeline +bqml.vision_generate_text( + source_table, output_table, model, + "Describe the image in 20 words", { + flatten_json_output: true + } +); +``` + +## Function Reference +### Function generate_text +#### Signature +```javascript +function generate_text( + source_table, output_table, unique_keys, + ml_model, source_query, ml_configs, options) +``` +#### Description +Performs the ML.GENERATE_TEXT function on the given source table. + +**See**: [https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-generate-text](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-generate-text) + +| Param | Type | Description | +| --- | --- | --- | +| source_table | Resolvable | represents the source table | +| output_table | String | the name of the table to store the final result | +| unique_keys | String \| Array | column name(s) for identifying an unique row in the source table | +| ml_model | Resolvable | the remote model to use for the ML operation that uses one of the Vertex AI LLM endpoints | +| source_query | String \| function | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields | +| ml_configs | Object | configurations for the ML operation | +| options | Object | the configuration object for the [table_ml](#table_ml) function | + +--- +### Function vision_generate_text +#### Signature +```javascript +function vision_generate_text( + source_table, output_table, model, prompt, llm_config, options) +``` +#### Description +Performs the ML.GENERATE_TEXT function on visual content in the given source table. + +**See**: [https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-generate-text#gemini-pro-vision](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-generate-text#gemini-pro-vision) + +| Param | Type | Description | +| --- | --- | --- | +| source_table | Resolvable | represents the source object table | +| output_table | String | name of the output table | +| model | Resolvable | name the remote model with the `gemini-pro-vision` endpoint | +| prompt | String | the prompt text for the LLM | +| llm_config | Object | extra configurations to the LLM | +| options | Object | the configuration object for the [obj_table_ml](#obj_table_ml) function | + +--- +### Function generate_embedding +#### Signature +```javascript +function generate_embedding( + source_table, output_table, unique_keys, + ml_model, source_query, ml_configs, options = {}) +``` +#### Description +Performs the ML.GENERATE_EMBEDDING function on the given source table. + +**See**: [https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-generate-embedding](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-generate-embedding) + +| Param | Type | Description | +| --- | --- | --- | +| source_table | Resolvable | represents the source table | +| output_table | String | the name of the table to store the final result | +| unique_keys | String \| Array | column name(s) for identifying an unique row in the source table | +| ml_model | Resolvable | the remote model to use for the ML operation that uses one of the `textembedding-gecko*` Vertex AI LLMs as endpoint | +| source_query | String \| function | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields | +| ml_configs | Object | configurations for the ML operation | +| options | Object | the configuration object for the [table_ml](#table_ml) function | + +--- +### Function understand_text +#### Signature +```javascript +function understand_text( + source_table, output_table, unique_keys, + ml_model, source_query, ml_configs, options) +``` +#### Description +Performs the ML.UNDERSTAND_TEXT function on the given source table. + +**See**: [https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-understand-text](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-understand-text) + +| Param | Type | Description | +| --- | --- | --- | +| source_table | Resolvable | represents the source table | +| output_table | String | the name of the table to store the final result | +| unique_keys | String \| Array | column name(s) for identifying an unique row in the source table | +| ml_model | Resolvable | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_NATURAL_LANGUAGE_V1 | +| source_query | String \| function | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields | +| ml_configs | Object | configurations for the ML operation | +| options | Object | the configuration object for the [table_ml](#table_ml) function | + +--- +### Function translate +#### Signature +```javascript +function translate( + source_table, output_table, unique_keys, + ml_model, source_query, ml_configs, options) +``` +#### Description +Performs the ML.TRANSLATE function on the given source table. + +**See**: [https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-translate](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-translate) + +| Param | Type | Description | +| --- | --- | --- | +| source_table | Resolvable | represents the source table | +| output_table | String | the name of the table to store the final result | +| unique_keys | String \| Array | column name(s) for identifying an unique row in the source table | +| ml_model | Resolvable | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_TRANSLATE_V3 | +| source_query | String \| function | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields | +| ml_configs | Object | configurations for the ML operation | +| options | Object | the configuration object for the [table_ml](#table_ml) function | + +--- +### Function annotate_image +#### Signature +```javascript +function annotate_image( + source_table, output_table, model, features, options) +``` +#### Description +Performs the ML.ANNOTATE_IMAGE function on the given source table. + +**See**: [https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-annotate-image](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-annotate-image) + +| Param | Type | Description | +| --- | --- | --- | +| source_table | Resolvable | represents the source object table | +| output_table | String | name of the output table | +| model | Resolvable | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_VISION_V1 | +| features | Array | specifies one or more feature names of supported Vision API features | +| options | Object | the configuration object for the [obj_table_ml](#obj_table_ml) function | + +--- +### Function transcribe +#### Signature +```javascript +function transcribe( + source_table, output_table, model, recognition_config, options) +``` +#### Description +Performs the ML.TRANSCRIBE function on the given source table. + +**See**: [https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-transcribe](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-transcribe) + +| Param | Type | Description | +| --- | --- | --- | +| source_table | Resolvable | represents the source object table | +| output_table | String | name of the output table | +| model | Resolvable | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_SPEECH_TO_TEXT_V2 | +| recognition_config | Object | the recognition configuration to override the default configuration of the specified recognizer | +| options | Object | the configuration object for the [obj_table_ml](#obj_table_ml) function | + +--- +### Function process_document +#### Signature +```javascript +function process_document( + source_table, output_table, model, options) +``` +#### Description +Performs the ML.PROCESS_DOCUMENT function on the given source table. + +**See**: [https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-process-document](https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-process-document) + +| Param | Type | Description | +| --- | --- | --- | +| source_table | Resolvable | represents the source object table | +| output_table | String | name of the output table | +| model | Resolvable | the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_DOCUMENT_V1 | +| options | Object | the configuration object for the [obj_table_ml](#obj_table_ml) function | + +--- + +### Function table_ml +#### Signature +```javascript +function table_ml( + output_table, unique_keys, ml_function, ml_model, + source_query, accept_filter, ml_configs = {}, { + batch_size = 10000, + batch_duration_secs = 22 * 60 * 60 +} = {}) +``` +#### Description +A generic structured table ML pipeline. +It incrementally performs an ML operation on rows from the source table +and merges to the output table until all rows are processed or runs longer +than the specific duration. + +| Param | Type | Description | +| --- | --- | --- | +| output_table | String | name of the output table | +| unique_keys | String \| Array | column name(s) for identifying an unique row in the source table | +| ml_function | String | the name of the BQML function to call | +| ml_model | Resolvable | the remote model to use for the ML operation | +| source_query | String \| function | either a query string or a Contextable function to produce the query on the source data for the ML operation and it must have the unique key columns selected in addition to other fields | +| accept_filter | String | a SQL boolean expression for accepting a row to the output table after the ML operation | +| ml_configs | Object | configurations for the ML operation | +| batch_size | Number | number of rows to process in each SQL job. Rows in the object table will be processed in batches according to the batch size. Default batch size is 10000 | +| batch_duration_secs | Number | the number of seconds to pass before breaking the batching loop if it hasn't been finished before within this duration. Default value is 22 hours | + +--- + +### Function obj_table_ml +#### Signature +```javascript +function obj_table_ml( + source_table, source, output_table, accept_filter, { + batch_size = 500, + unique_key = "uri", + updated_column = "updated", + batch_duration_secs = 22 * 60 * 60, +} = {}) +``` +#### Description +A generic object table ML pipeline. +It incrementally performs an ML operation on new rows from the source table +and merges to the output table until no new row is detected or runs longer +than the specific duration. +A row from the source table is considered as new if the `unique_key` (default to "uri") +of a row is absent in the output table, or if the `updated_column` (default to "updated") +column is newer than the largest value in the output table. + +| Param | Type | Description | +| --- | --- | --- | +| source_table | Resolvable | represents the source object table | +| source | String \| function | either a query string or a Contextable function to produce the query on the source data | +| output_table | String | the name of the table to store the final result | +| accept_filter | String | a SQL expression for finding rows that contains retryable error | +| batch_size | Number | number of rows to process in each SQL job. Rows in the object table will be processed in batches according to the batch size. Default batch size is 500 | +| unique_key | String | the primary key in the output table for incremental update. Default value is "uri". | +| updated_column | String | the column that carries the last updated timestamp of an object in the object table. Default value is "updated" | +| batch_duration_secs | Number | the number of seconds to pass before breaking the batching loop if it hasn't been finished before within this duration. Default value is 22 hours | diff --git a/index.js b/index.js new file mode 100644 index 0000000..1dd1e87 --- /dev/null +++ b/index.js @@ -0,0 +1,4 @@ +const structured_table_ml = require("./modules/structured_table_ml"); +const object_table_ml = require("./modules/object_table_ml"); + +module.exports = {...structured_table_ml, ...object_table_ml}; \ No newline at end of file diff --git a/modules/object_table_ml.js b/modules/object_table_ml.js new file mode 100644 index 0000000..49bcb33 --- /dev/null +++ b/modules/object_table_ml.js @@ -0,0 +1,163 @@ +const common = require("./utils.js"); + +/** + * A generic object table ML pipeline. + * It incrementally performs an ML operation on new rows from the source table + * and merges to the output table until no new row is detected or runs longer + * than the specific duration. + * A row from the source table is considered as new if the `unique_key` (default to "uri") + * of a row is absent in the output table, or if the `updated_column` (default to "updated") + * column is newer than the largest value in the output table. + * + * @param {Resolvable} source_table represents the source object table + * @param {String | Function} source either a query string or a Contextable function to produce the query on the source data + * @param {String} output_table the name of the table to store the final result + * @param {String} accept_filter a SQL expression for finding rows that contains retryable error + * @param {Number} batch_size number of rows to process in each SQL job. Rows in the object table will be + * processed in batches according to the batch size. Default batch size is 500 + * @param {String} unique_key the primary key in the output table for incremental update. Default value is "uri". + * @param {String} updated_column the column that carries the last updated timestamp of an object in the object + * table. Default value is "updated" + * @param {Number} batch_duration_secs the number of seconds to pass before breaking the batching loop if it + * hasn't been finished before within this duration. Default value is 22 hours + */ +function obj_table_ml(source_table, source, output_table, accept_filter, { + batch_size = 500, + unique_key = "uri", + updated_column = "updated", + batch_duration_secs = 22 * 60 * 60, +} = {}) { + let source_func = (source instanceof Function) ? source : () => source; + let limit_clause = `LIMIT ${batch_size}`; + + // Initialize by creating the output table with a small limit to avoid timeout + operate(`init_${output_table}`) + .queries((ctx) => + `CREATE TABLE IF NOT EXISTS ${ctx.resolve(output_table)} AS ${source_func(ctx)} WHERE ${accept_filter} LIMIT 10`); + + // Incrementally update the output table. + let table = publish(output_table, { + type: "incremental", + dependencies: [`init_${output_table}`], + uniqueKey: [unique_key] + }); + + // Repeatedly finding a new set of uri candidates, performs the ML operation, and merges the result to the output table + table.preOps((ctx) => `${ctx.when(ctx.incremental(), ` + DECLARE candidates ARRAY; + REPEAT + SET candidates = ARRAY( + SELECT ${unique_key} FROM ${ctx.resolve(source_table)} AS S + WHERE NOT EXISTS (SELECT * FROM ${ctx.resolve(output_table)} AS T WHERE S.${unique_key} = T.${unique_key}) + OR ${updated_column} > (SELECT max(${updated_column}) FROM ${ctx.resolve(output_table)}) ${limit_clause})`, + ``)}`); + table.query((ctx) => ` + ${source_func(ctx)} WHERE ${ctx.when(ctx.incremental(), + `${unique_key} IN UNNEST(candidates) AND ${accept_filter}`, + // The non-incremental part shouldn't be used since the table is already created in the init operation above. + // Nevertheless, the accept filter and limit is set if such occassion does occur. + `${accept_filter} ${limit_clause}`)}`); + table.postOps((ctx) => `${ctx.when(ctx.incremental(), ` + UNTIL (SELECT @@row_count) = 0 OR TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), @@script.creation_time, SECOND) >= ${batch_duration_secs} + END REPEAT`)}`); +}; + +/** + * Performs the ML.ANNOTATE_IMAGE function on the given source table. + * + * @param {Resolvable} source_table represents the source object table + * @param {String} output_table name of the output table + * @param {Resolvable} model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_VISION_V1 + * @param {Array} features specifies one or more feature names of supported Vision API features + * @param {Object} options the configuration object for the {@link obj_table_ml} function + * + * @see {@link https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-annotate-image} + */ +function annotate_image(source_table, output_table, model, features, options) { + let feature_names = features.map((f) => `'${f}'`).join(", "); + common.declare_resolvable(source_table); + common.declare_resolvable(model); + obj_table_ml(source_table, (ctx) => `SELECT * FROM ML.ANNOTATE_IMAGE( + MODEL ${ctx.resolve(model)}, + TABLE ${ctx.resolve(source_table)}, + STRUCT([${feature_names}] AS vision_features))`, + output_table, common.retryable_error_filter("ml_annotate_image_status"), options); +}; + +/** + * Performs the ML.TRANSCRIBE function on the given source table. + * + * @param {Resolvable} source_table represents the source object table + * @param {String} output_table name of the output table + * @param {Resolvable} model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_SPEECH_TO_TEXT_V2 + * @param {Object} recognition_config the recognition configuration to override the default configuration + * of the specified recognizer + * @param {Object} options the configuration object for the {@link obj_table_ml} function + * + * @see {@link https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-transcribe} + */ +function transcribe(source_table, output_table, model, recognition_config, options) { + let config = JSON.stringify(recognition_config); + common.declare_resolvable(source_table); + common.declare_resolvable(model); + obj_table_ml(source_table, (ctx) => `SELECT * FROM ML.TRANSCRIBE( + MODEL ${ctx.resolve(model)}, + TABLE ${ctx.resolve(source_table)}, + recognition_config => ( JSON '${config}'))`, + output_table, common.retryable_error_filter("ml_transcribe_status"), options); +}; + +/** + * Performs the ML.PROCESS_DOCUMENT function on the given source table. + * + * @param {Resolvable} source_table represents the source object table + * @param {String} output_table name of the output table + * @param {Resolvable} model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_DOCUMENT_V1 + * @param {Object} options the configuration object for the {@link obj_table_ml} function + * + * @see {@link https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-process-document + */ +function process_document(source_table, output_table, model, options) { + common.declare_resolvable(source_table); + common.declare_resolvable(model); + obj_table_ml(source_table, (ctx) => `SELECT * FROM ML.PROCESS_DOCUMENT( + MODEL ${ctx.resolve(model)}, + TABLE ${ctx.resolve(source_table)})`, + output_table, common.retryable_error_filter("ml_process_document_status"), options); +}; + +/** + * Performs the ML.GENERATE_TEXT function on visual content in the given source table. + * + * @param {Resolvable} source_table represents the source object table + * @param {String} output_table name of the output table + * @param {Resolvable} model name the remote model with the `gemini-pro-vision` endpoint + * @param {String} prompt the prompt text for the LLM + * @param {Object} llm_config extra configurations to the LLM + * @param {Object} options the configuration object for the {@link obj_table_ml} function + * + * @see {@link https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-generate-text#gemini-pro-vision + */ +function vision_generate_text(source_table, output_table, model, prompt, llm_config, options) { + let config = { + prompt: prompt, + ...llm_config + }; + common.declare_resolvable(source_table); + common.declare_resolvable(model); + obj_table_ml(source_table, (ctx) => `SELECT * FROM ML.GENERATE_TEXT( + MODEL ${ctx.resolve(model)}, + TABLE ${ctx.resolve(source_table)}, + STRUCT( + ${Object.entries(config).map(([k, v]) => `${JSON.stringify(v)} AS ${k}`).join(",")} + ))`, + output_table, common.retryable_error_filter("ml_generate_text_status"), options); +} + +module.exports = { + annotate_image: annotate_image, + transcribe: transcribe, + process_document: process_document, + vision_generate_text: vision_generate_text, + obj_table_ml: obj_table_ml, +}; diff --git a/modules/structured_table_ml.js b/modules/structured_table_ml.js new file mode 100644 index 0000000..d356619 --- /dev/null +++ b/modules/structured_table_ml.js @@ -0,0 +1,166 @@ +const common = require("./utils.js"); + +/** + * A generic structured table ML pipeline. + * It incrementally performs an ML operation on rows from the source table + * and merges to the output table until all rows are processed or runs longer + * than the specific duration. + * + * @param {String} output_table name of the output table + * @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table + * @param {String} ml_function the name of the BQML function to call + * @param {Resolvable} ml_model the remote model to use for the ML operation + * @param {String | Function} source_query either a query string or a Contextable function to produce the + * query on the source data for the ML operation and it must have the unique key + * columns selected in addition to other fields + * @param {String} accept_filter a SQL boolean expression for accepting a row to the output table after + * the ML operation + * @param {Object} ml_configs configurations for the ML operation + * @param {Number} batch_size number of rows to process in each SQL job. Rows in the object table will be + * processed in batches according to the batch size. Default batch size is 10000 + * @param {Number} batch_duration_secs the number of seconds to pass before breaking the batching loop if it + * hasn't been finished before within this duration. Default value is 22 hours + */ +function table_ml(output_table, unique_keys, ml_function, ml_model, source_query, accept_filter, ml_configs = {}, { + batch_size = 10000, + batch_duration_secs = 22 * 60 * 60 +} = {}) { + let source_func = (source_query instanceof Function) ? source_query : () => source_query; + let limit_clause = `LIMIT ${batch_size}`; + let ml_configs_string = Object.entries(ml_configs).map(([k, v]) => `${JSON.stringify(v)} AS ${k}`).join(','); + + unique_keys = (unique_keys instanceof Array ? unique_keys : [unique_keys]); + + // Initialize by creating the output table. + operate(`init_${output_table}`) + .queries((ctx) => `CREATE TABLE IF NOT EXISTS ${ctx.resolve(output_table)} AS + SELECT * FROM ${ml_function} ( + MODEL ${ctx.resolve(ml_model)}, + (SELECT * FROM (${source_func(ctx)}) ${limit_clause}), + STRUCT (${ml_configs_string}) + ) WHERE ${accept_filter}`); + + // Incrementally update the output table. + let table = publish(output_table, { + type: "incremental", + dependencies: [`init_${output_table}`], + uniqueKey: unique_keys, + }); + + // Repeatedly find new rows from the source table, performs the ML operation, and merges the result to the output table + table.preOps((ctx) => `${ctx.when(ctx.incremental(), ` + REPEAT --;`)}`); + table.query((ctx) => ` + SELECT * FROM ${ml_function} ( + MODEL ${ctx.resolve(ml_model)}, + (SELECT S.* FROM (${source_func(ctx)}) AS S + ${ctx.when(ctx.incremental(), + `WHERE NOT EXISTS (SELECT * FROM ${ctx.resolve(output_table)} AS T WHERE ${unique_keys.map((k) => `S.${k} = T.${k}`).join(' AND ')})`)} ${limit_clause}), + STRUCT (${ml_configs_string}) + ) WHERE ${accept_filter}`); + table.postOps((ctx) => `${ctx.when(ctx.incremental(), ` + UNTIL (SELECT @@row_count) = 0 OR TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), @@script.creation_time, SECOND) >= ${batch_duration_secs} + END REPEAT`, ``)}`); +}; + +/** + * Performs the ML.GENERATE_EMBEDDING function on the given source table. + * + * @param {Resolvable} source_table represents the source table + * @param {String} output_table the name of the table to store the final result + * @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table + * @param {Resolvable} ml_model the remote model to use for the ML operation that uses one of the + * `textembedding-gecko*` Vertex AI LLMs as endpoint + * @param {String | Function} source_query either a query string or a Contextable function to produce the + * query on the source data for the ML operation and it must have the unique key + * columns selected in addition to other fields + * @param {Object} ml_configs configurations for the ML operation + * @param {Object} options the configuration object for the {@link table_ml} function + * + * @see {@link https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-generate-embedding} + */ +function generate_embedding(source_table, output_table, unique_keys, ml_model, source_query, ml_configs, options) { + common.declare_resolvable(source_table); + common.declare_resolvable(ml_model); + + table_ml(output_table, unique_keys, "ML.GENERATE_EMBEDDING", ml_model, source_query, + common.retryable_error_filter("ml_generate_embedding_status"), ml_configs, options); +}; + +/** + * Performs the ML.GENERATE_TEXT function on the given source table. + * + * @param {Resolvable} source_table represents the source table + * @param {String} output_table the name of the table to store the final result + * @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table + * @param {Resolvable} ml_model the remote model to use for the ML operation that uses one + * of the Vertex AI LLM endpoints + * @param {String | Function} source_query either a query string or a Contextable function to produce the + * query on the source data for the ML operation and it must have the unique key + * columns selected in addition to other fields + * @param {Object} ml_configs configurations for the ML operation + * @param {Object} options the configuration object for the {@link table_ml} function + * + * @see {@link https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-generate-text} + */ +function generate_text(source_table, output_table, unique_keys, ml_model, source_query, ml_configs, options) { + common.declare_resolvable(source_table); + common.declare_resolvable(ml_model); + + table_ml(output_table, unique_keys, "ML.GENERATE_TEXT", ml_model, source_query, + common.retryable_error_filter("ml_generate_text_status"), ml_configs, options); +} + +/** + * Performs the ML.UNDERSTAND_TEXT function on the given source table. + * + * @param {Resolvable} source_table represents the source table + * @param {String} output_table the name of the table to store the final result + * @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table + * @param {Resolvable} ml_model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_NATURAL_LANGUAGE_V1 + * @param {String | Function} source_query either a query string or a Contextable function to produce the + * query on the source data for the ML operation and it must have the unique key + * columns selected in addition to other fields + * @param {Object} ml_configs configurations for the ML operation + * @param {Object} options the configuration object for the {@link table_ml} function + * + * @see {@link https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-understand-text} + */ +function understand_text(source_table, output_table, unique_keys, ml_model, source_query, ml_configs, options) { + common.declare_resolvable(source_table); + common.declare_resolvable(ml_model); + + table_ml(output_table, unique_keys, "ML.UNDERSTAND_TEXT", ml_model, source_query, + common.retryable_error_filter("ml_understand_text_status"), ml_configs, options); +} + +/** + * Performs the ML.TRANSLATE function on the given source table. + * + * @param {Resolvable} source_table represents the source table + * @param {String} output_table the name of the table to store the final result + * @param {String | Array} unique_keys column name(s) for identifying an unique row in the source table + * @param {Resolvable} ml_model the remote model with a REMOTE_SERVICE_TYPE of CLOUD_AI_TRANSLATE_V3 + * @param {String | Function} source_query either a query string or a Contextable function to produce the + * query on the source data for the ML operation and it must have the unique key + * columns selected in addition to other fields + * @param {Object} ml_configs configurations for the ML operation + * @param {Object} options the configuration object for the {@link table_ml} function + * + * @see {@link https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-translate} + */ +function translate(source_table, output_table, unique_keys, ml_model, source_query, ml_configs, options) { + common.declare_resolvable(source_table); + common.declare_resolvable(ml_model); + + table_ml(output_table, unique_keys, "ML.TRANSLATE", ml_model, source_query, + common.retryable_error_filter("ml_translate_status"), ml_configs, options); +} + +module.exports = { + table_ml: table_ml, + generate_embedding: generate_embedding, + generate_text: generate_text, + understand_text: understand_text, + translate: translate, +} diff --git a/modules/utils.js b/modules/utils.js new file mode 100644 index 0000000..d61231d --- /dev/null +++ b/modules/utils.js @@ -0,0 +1,22 @@ +module.exports = { + /** + * Declares the resolvable as a Dataform data source. + */ + declare_resolvable: (source) => { + if (source.constructor === Object) { + declare(source); + } else { + declare({ + name: source + }); + } + }, + + /** + * Forms a SQL filter clause for filtering out retryable + * error based on a given status column. + */ + retryable_error_filter: (status_col) => { + return `${status_col} NOT LIKE 'A retryable error occurred:%'`; + }, +}; diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..76e57aa --- /dev/null +++ b/package-lock.json @@ -0,0 +1,25 @@ +{ + "name": "bqml", + "lockfileVersion": 2, + "requires": true, + "packages": { + "": { + "name": "bqml", + "dependencies": { + "@dataform/core": "3.0.0-beta.4" + } + }, + "node_modules/@dataform/core": { + "version": "3.0.0-beta.4", + "resolved": "https://registry.npmjs.org/@dataform/core/-/core-3.0.0-beta.4.tgz", + "integrity": "sha512-1mCimb4hxeO2iSsdkjSquq15rt8xKKNieX63ENqpWtjqtJwnpBmY8IyzufWjsuEiT3iqqWR/1A0aUbUa5kEtsg==" + } + }, + "dependencies": { + "@dataform/core": { + "version": "3.0.0-beta.4", + "resolved": "https://registry.npmjs.org/@dataform/core/-/core-3.0.0-beta.4.tgz", + "integrity": "sha512-1mCimb4hxeO2iSsdkjSquq15rt8xKKNieX63ENqpWtjqtJwnpBmY8IyzufWjsuEiT3iqqWR/1A0aUbUa5kEtsg==" + } + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..d6c011d --- /dev/null +++ b/package.json @@ -0,0 +1,6 @@ +{ + "name": "bqml", + "dependencies": { + "@dataform/core": "3.0.0-beta.4" + } +}