Skip to content

feat: FlinkSQL vector search tutorial #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions gen-ai-vector-embedding/flinksql/README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
<!-- title: How to generate vector embeddings for RAG (GenAI) with Flink SQL in Confluent Cloud -->
<!-- description: In this tutorial, learn how to generate vector embeddings for RAG (GenAI) with Flink SQL in Confluent Cloud, with step-by-step instructions and supporting code. -->
<!-- title: How to generate vector embeddings for RAG with Flink SQL in Confluent Cloud -->
<!-- description: In this tutorial, learn how to generate vector embeddings for RAG with Flink SQL in Confluent Cloud, with step-by-step instructions and supporting code. -->

# How to generate vector embeddings for RAG (GenAI) with Flink SQL in Confluent Cloud
# GenAI Part 1 of 2: How to generate vector embeddings for RAG with Flink SQL in Confluent Cloud

In this tutorial, you will generate vector embeddings on retail product catalog data. A source connector ingests and writes unstructured source data to a topic. Flink SQL then converts this data into vector embeddings and inserts into a new topic.
In Part 1 of this tutorial series, you will generate vector embeddings on retail product catalog data. A source connector ingests and writes unstructured source data to a topic. Flink SQL then converts this data into vector embeddings and inserts into a new topic.

This tutorial is a building block for real-time GenAI applications including [RAG](https://www.confluent.io/learn/retrieval-augmented-generation-rag/) and is based on the webinar [How to Build RAG Using Confluent with Flink AI Model Inference and MongoDB](https://www.confluent.io/resources/online-talk/rag-tutorial-with-flink-ai-model-inference-mongodb/).

Once vector encoding is complete, you can leverage vector embeddings and vector search to build use cases including:
Once vector encoding is complete, [Part 2](https://developer.confluent.io/confluent-tutorials/gen-ai-vector-search/flinksql/) of this tutorial series leverages vector search over the embeddings to build a RAG-enabled query engine that is robust with respect to uncommon or slang terminology. Other use cases that build on vector embeddings and vector search include:

* RAG-enabled GenAI chatbots, content discovery, and recommendation engines. E.g., retrieving user profile data and questions to match the size of clothing and the fashion type requested by the user's query. The results are sent to an LLM as context to augment the prompt and mitigate hallucinations, ensuring that the LLM generates specific and accurate product recommendations.
* ML-driven search for real-time fraud detection, anomaly detection, or forecasting

### Prerequisites
## Prerequisites

* A [Confluent Cloud](https://confluent.cloud/signup) account
* The [Confluent CLI](https://docs.confluent.io/confluent-cli/current/install.html)
Expand Down Expand Up @@ -196,11 +196,13 @@ vector content
...
```

## Next step: Add a sink connector for your vector store
## Next step: Add a sink connector for your vector store and run vector search

In this tutorial, you learned how to generate vector embeddings from string data in Kafka messages using Flink SQL.

As a next step, you can further build out this streaming data pipeline by adding a sink connector in Confluent Cloud to store these embeddings in a vector database. To deploy a sink connector on Confluent Cloud, navigate to the `Connectors` page in the Confluent Cloud Console or use the Confluent CLI. This setup enables you to continuously stream real-time vector embeddings from Flink SQL into your vector database.
As a next step, continue to Part 2 of this tutorial series: [How to implement vector search-based RAG with Flink SQL in Confluent Cloud](https://developer.confluent.io/confluent-tutorials/gen-ai-vector-search/flinksql/)

The next tutorial in this series stores these embeddings in MongoDB Atlas. If you would like to write the embeddings to a different vector store, you can deploy a sink connector on Confluent Cloud. Navigate to the `Connectors` page in the Confluent Cloud Console or use the Confluent CLI. This setup enables you to continuously stream real-time vector embeddings from Flink SQL into your vector database.

For guidance on setting up a vector database sink connector, refer to the following resources:

Expand All @@ -215,7 +217,7 @@ For guidance on setting up a vector database sink connector, refer to the follow

## Clean up

When you finish experimenting, delete the `confluent-rag_environment` environment in order to clean up the Confluent Cloud infrastructure created for this tutorial. Run the following command in your terminal to get the environment ID of the form `env-123456` corresponding to the environment named `confluent-rag_environment`:
If you are not continuing to [Part 2](https://developer.confluent.io/confluent-tutorials/gen-ai-vector-search/flinksql/) of this tutorial series, delete the `confluent-rag_environment` environment in order to clean up the Confluent Cloud infrastructure created for this tutorial. Run the following command in your terminal to get the environment ID of the form `env-123456` corresponding to the environment named `confluent-rag_environment`:

```shell
confluent environment list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
"transforms.cast-float.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.cast-float.spec": "price:float64"
}
}
}
162 changes: 162 additions & 0 deletions gen-ai-vector-search/flinksql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
<!-- title: How to implement vector search-based RAG with Flink SQL in Confluent Cloud -->
<!-- description: In this tutorial, learn how to implement vector search-based RAG with Flink SQL in Confluent Cloud, with step-by-step instructions and supporting code. -->

# GenAI Part 2 of 2: How to implement vector search-based RAG with Flink SQL in Confluent Cloud

In this tutorial, you will implement a [Retrieval-Augmented Generation (RAG)](https://www.confluent.io/learn/retrieval-augmented-generation-rag/) use case. You’ll use vector search over embeddings generated in [Part 1 of this tutorial series](https://developer.confluent.io/confluent-tutorials/gen-ai-vector-embedding/flinksql/) to augment a user's query in order to understand uncommon terminology.

## Prerequisites

* Completion of Part 1 of this tutorial series with all resources left up and running: [How to generate vector embeddings for RAG with Flink SQL in Confluent Cloud](https://developer.confluent.io/confluent-tutorials/gen-ai-vector-embedding/flinksql/)
* A [MongoDB Atlas](https://www.mongodb.com/lp/cloud/atlas/try4-reg) account
* Navigate to the `gen-ai-vector-search/flinksql` directory of your `confluentinc/tutorials` GitHub repository clone

## Provision MongoDB Atlas cluster

1. In the [Atlas UI](https://cloud.mongodb.com/), select `Clusters` in the left hand navigation, and then click `Build a Cluster`.
1. Select the `Free` M0 cluster type.
1. Choose the cloud provider and region that match your Confluent Cloud environment
1. Click `Create Deployment`.
1. Create a user `admin` and copy the password that you generate for it.
1. Click `Finish and Close`.

To create your database and collection:

1. From the cluster homepage, click the `Browse Collections` button, and then `Add My Own Data`.
1. Enter the following and then click `Create`:
* Database name: `store`
* Collection name: `products`

## Provision MongoDB Atlas Sink Connector

You'll now land the embeddings generated in Part 1 of this tutorial series in your Atlas cluster.

1. Copy your `kafka.api.key` and `kafka.api.secret` from Part 1 (found in `../../gen-ai-vector-embedding/flinksql/datagen-product-updates-connector.json`) into `./mongodb-atlas-sink-connector.json`.
1. Replace the `ATLAS_ADMIN_USER_PASSWORD` placeholder with your `admin` user’s password.
1. In the Atlas UI, select `Clusters` in the left hand navigation, and then click `Connect`. Click `Shell` and then copy your cluster's endpoint of the form `cluster-name.abcdefg.mongodb.net`. Replace the `ATLAS_HOST` placeholder in your connector config with this value.
1. Provision the connector:
```shell
confluent connect cluster create --config-file ./mongodb-atlas-sink-connector.json
```
1. After provisioning, verify that records show up in the `store.products` collection via the Atlas UI.

## Configure vector index

To enable vector search in Confluent Cloud, create a vector index in MongoDB Atlas:

1. In the Atlas UI, go to `Atlas Search` in the left hand navigation, and then `Create Search Index`.
1. Set `Search Type` to `Vector Search`.
1. Leave the index name as `vector_index`.
1. Select the `store.products` collection and click `Next`.
1. Choose `Dot Product` as the `Similarity Method` and click `Next`.
1. Click `Create Vector Search Index`.

It may take a moment for the index to build and be queryable.

## Create Flink connection to MongoDB

Before you can create a MongoDB external table in Confluent Cloud, first create a connection to the vector index. Use the same `admin` user password that you used when provisioning the sink connector earlier. Note that the endpoint needs `mongodb+srv://` prepended. If you need your environment ID of the form `env-123456`, run `confluent environment list` on the command line. Finally, if you are running Atlas in a different cloud provider or region, change that here.

```shell
confluent flink connection create mongodb-connection \
--cloud AWS \
--region us-east-1 \
--type mongodb \
--endpoint mongodb+srv://ATLAS_HOST/ \
--username admin \
--password ATLAS_ADMIN_USER_PASSWORD \
--environment ENVIRONMENT_ID
```

## Create MongoDB External Table in Flink SQL

Create a MongoDB external table in the Flink SQL shell that references the connection created in the previous step. For the sake of clarity, we are only going to return the `articleType` field. Keep in mind, though, that the vector search will run against embeddings generated from the concatenation of all product catalog fields.

```sql
CREATE TABLE mongodb_products (
articleType STRING,
vector ARRAY<FLOAT>
) WITH (
'connector' = 'mongodb',
'mongodb.connection' = 'mongodb-connection',
'mongodb.database' = 'store',
'mongodb.collection' = 'products',
'mongodb.index' = 'vector_index_products',
'mongodb.embedding_column' = 'vector',
'mongodb.numCandidates' = '5'
);

```

## Populate a table with sample user queries

In the Flink SQL shell, simulate a stream of uncommon slang user queries for footwear (kicks, footies), shorts (cutoffs), and a hat (lid):

```sql
CREATE TABLE queries (
query STRING
) WITH (
'value.format' = 'json-registry'
);
```

```sql
INSERT INTO queries values ('kicks'), ('footies'), ('cutoffs'), ('lid');
```

## Generate query vector embeddings

Create a table for query vectors using the same embedding model used for product embeddings in Part 1:

```sql
CREATE TABLE query_vectors (
query STRING,
vector ARRAY<FLOAT>
) WITH (
'value.format' = 'json-registry'
);
```

```sql
INSERT INTO query_vectors
(
SELECT query, vector
FROM queries,
LATERAL TABLE(ML_PREDICT('vector_embedding', query))
);
```

## Run the vector search

Query the vector index to find products for each slang query:

```sql
SELECT query, search_results
FROM query_vectors,
LATERAL TABLE(VECTOR_SEARCH(mongodb_products, 3, DESCRIPTOR(vector), query_vectors.vector));
```

Your results will vary from the following depending on the products that you generated in Part 1 of this tutorial series, but you should see sensible search results that match the slang queries:

```plaintext
query search_results

footies [(shoes, [0.020776896, 0.023529341, ...
cutoffs [(shorts, [0.0352604, -0.0070558237, ...
kicks [(sandals, [0.030727627, -0.009568145, ...
lid [(hat, [0.007294555, 0.022405202, ...
```

## Clean up

When you finish experimenting, delete the `confluent-rag_environment` environment in order to clean up the Confluent Cloud infrastructure created for this tutorial. Run the following command in your terminal to get the environment ID of the form `env-123456` corresponding to the environment named `confluent-rag_environment`:

```shell
confluent environment list
```

Delete the environment:

```shell
confluent environment delete <ENVIRONMENT_ID>
```
54 changes: 54 additions & 0 deletions gen-ai-vector-search/flinksql/mongodb-atlas-sink-connector.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"config": {
"connector.class": "MongoDbAtlasSink",
"name": "MongoDbAtlasSinkConnector",
"schema.context.name": "default",
"input.data.format": "JSON_SR",
"cdc.handler": "None",
"input.key.format": "STRING",
"delete.on.null.values": "false",
"max.batch.size": "0",
"bulk.write.ordered": "true",
"rate.limiting.timeout": "0",
"rate.limiting.every.n": "0",
"write.strategy": "DefaultWriteModelStrategy",
"csfle.enabled": "false",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "YOUR_API_KEY",
"kafka.api.secret": "YOUR_API_SECRET",
"topics": "product_vector",
"connection.host": "ATLAS_HOST",
"connection.user": "admin",
"connection.password": "ATLAS_ADMIN_USER_PASSWORD",
"database": "store",
"collection": "products",
"doc.id.strategy": "BsonOidStrategy",
"doc.id.strategy.overwrite.existing": "false",
"document.id.strategy.uuid.format": "string",
"key.projection.type": "none",
"value.projection.type": "none",
"namespace.mapper.class": "DefaultNamespaceMapper",
"server.api.deprecation.errors": "false",
"server.api.strict": "false",
"max.num.retries": "3",
"retries.defer.timeout": "5000",
"timeseries.timefield.auto.convert": "false",
"timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd[['T'][ ]][HH:mm:ss[[.][SSSSSS][SSS]][ ]VV[ ]'['VV']'][HH:mm:ss[[.][SSSSSS][SSS]][ ]X][HH:mm:ss[[.][SSSSSS][SSS]]]",
"timeseries.timefield.auto.convert.locale.language.tag": "en",
"timeseries.expire.after.seconds": "0",
"ts.granularity": "None",
"mongo.errors.tolerance": "NONE",
"max.poll.interval.ms": "300000",
"max.poll.records": "500",
"tasks.max": "1",
"value.converter.replace.null.with.default": "true",
"value.converter.reference.subject.name.strategy": "DefaultReferenceSubjectNameStrategy",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all",
"value.converter.ignore.default.for.nullables": "false",
"value.converter.decimal.format": "BASE64",
"value.converter.value.subject.name.strategy": "TopicNameStrategy",
"key.converter.key.subject.name.strategy": "TopicNameStrategy",
"auto.restart.on.user.error": "true"
}
}