This project showcases a scalable and modern architecture for detecting and filtering review bombing activities in real-time, leveraging the power of Confluent, AWS, and MongoDB. The setup processes Amazon review data to identify fake reviews and integrate valid reviews with user account information. Furthermore, we utilize Amazon Bedrock to categorize the reviews into interesting categories and generate review summaries.
- Launch Confluent Cloud and set up your cluster
- Prepare Flink, Kafka topics and API keys
- Set up AWS resources for review data using terraform
- Configure MongoDB Atlas for user data
- Establish a source connector to MongoDB
- Using Flink for real-time data stream processing.
- Clean up and decommission resources post-analysis
To ensure a smooth and successful experience with this demo, please make sure you have the following tools and accounts set up:
-
Confluent Cloud Account: You'll need a Confluent Cloud account. If you don't have one, you can sign up for a free trial here.
- After verifying your email address, access Confluent Cloud sign-in by navigating here.
- When provided with the username and password prompts, fill in your credentials.
-
Terraform: If you don't already have Terraform installed, you can find installation instructions here.
-
AWS Cli: You'll need AWS cli installed and configured on your system. You can download it from the official website here. Set up your AWS CLI using the
aws configure
command. -
Python 3.11: Ensure you have python 3.11 version installed.
-
MongoDB Atlas Account: Create a MongoDB Atlas account and set up a free cluster. You can follow the Atlas UI instructions given below
-
Clone the repository
git clone https://github.com/confluentinc/genai-product-review-pipeline.git
With these prerequisites in place, you'll be ready to explore and run the demo seamlessly.
- Log in to Confluent Cloud and enter your email and password.
- If you are logging in for the first time, you will see a self-guided wizard that walks you through spinning up a cluster. Please minimize this as you will walk through those steps in this guide.
- Click + Add Environment. Specify an Environment Name and choose the Essentials Stream Governance package and Click Create.
-
Now that you have an environment enabled with basic Stream Governance essentials which includes Schema Registry. Now click Create Cluster.
Note: Learn more about the different types of clusters and their associated features and limits, refer to this documentation. Here we will use Basic Cluster for our purpose.
-
Choose the Basic Cluster Type.
- Click Begin Configuration.
- Choose AWS Cloud Provider, Region, and Availability Zone.
- Select the same region where you plan to deploy your resources in the upcoming steps.
-
Specify a Cluster Name - any name will work here.
-
View the associated Configuration & Cost, Usage Limits, and Uptime SLA information before launching.
-
Click Launch Cluster.
Confluent Cloud for Apache Flink®️ offers powerful AI model inference capabilities through the built-in ML_PREDICT and ML_EVALUATE functions, allowing remote AI/ML models to be invoked directly within Flink SQL queries. This streamlines the development and deployment of AI applications, enabling seamless integration of data processing and AI/ML tasks on a single platform. Additionally, Confluent Cloud supports read-only external tables, allowing federated query execution across external vector databases like MongoDB, Pinecone, and Elasticsearch, further enriching data pipelines.
With Flink SQL's AI model inference, data can be dynamically enriched with real-time machine learning predictions, enabling smarter, more responsive applications. For example, imagine a user leaving a negative product review — instead of a basic response, the app could analyze the review, identify the root cause, and offer real-time solutions, enhancing the user experience.
To learn more, visit:
- Create a Flink workspace by navigating to the cluster view of the environment and selecting the Flink section. Follow the steps as demonstrated below
-
You can also access the Flink workspace from the left most pane by clicking
Stream Processing
option. -
Execute the following queries in the editor to create the tables.
Note: Run each query in a new editor window. To create a new editor, click the + button near the left side of the editor.
CREATE TABLE `amazon-reviews` ( user_id STRING, rating STRING, title STRING, text STRING, images ARRAY<STRING>, asin STRING, parent_asin STRING, `timestamp` TIMESTAMP(3), helpful_vote INT, verified_purchase BOOLEAN, WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND );
CREATE TABLE filtered_invalid_reviews ( user_id STRING, rating STRING, title STRING, text STRING, images ARRAY<STRING>, asin STRING, parent_asin STRING, `timestamp` TIMESTAMP(3), helpful_vote INT, verified_purchase BOOLEAN, window_start TIMESTAMP(3), window_end TIMESTAMP(3), review_count BIGINT, WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND );
CREATE TABLE filtered_valid_reviews ( user_id STRING, rating STRING, title STRING, text STRING, images ARRAY<STRING>, asin STRING, parent_asin STRING, `timestamp` TIMESTAMP(3), helpful_vote INT, verified_purchase BOOLEAN, window_start TIMESTAMP(3), window_end TIMESTAMP(3), review_count BIGINT, WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND );
CREATE TABLE valid_reviews_with_user_info ( user_id STRING, rating STRING, title STRING, text STRING, images ARRAY<STRING>, asin STRING, parent_asin STRING, `timestamp` TIMESTAMP(3), helpful_vote INT, verified_purchase BOOLEAN, window_start TIMESTAMP(3), window_end TIMESTAMP(3), review_count BIGINT, address STRING, city STRING, country STRING, email STRING, first_name STRING, gender STRING, last_name STRING, payment_method STRING, phone_number STRING, state STRING, zip_code STRING, WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND );
-
Check the Topics section of your cluster to see the topics created. These topics are generated based on the table names specified in the queries above.
-
Create Kafka cluster API KEY and store it for later purpose.
- Create a Schema Registry API key by navigating to the right pane in the cluster's view of the environment. Save the Schema Registry URL and generate the API key by clicking the "Add Key" button. Save the endpoint and credentials for the next step
- Get the bootstrap endpoint from the Cluster Settings and save it for the next step.
-
This demo uses Terraform to spin up AWS resources that are needed.
- Update the
variables.auto.tfvars
file with the proper values generated during the Confluent Cloud setup phase. Ensure you use the same region as selected during the Confluent Cloud cluster creation step.
- Update the
-
Initialize Terraform.
terraform init
-
Preview the actions Terraform would take to modify your infrastructure or Check if there are any errors in the code.
terraform plan
-
Apply the plan to create the infrastructure.
terraform apply
-
Verify the resources created by terraform in AWS.
-
Execute the
upload-datasets-to-s3.py
script located in thescripts
folder using the following command:python3 upload-datasets-to-s3.py
Be sure the bucket name is updated in this python script.
-
[Optional] Check the datasets uploaded to S3 by the script.
-
Open the AWS Step Functions that were created and start the execution:
- The Step Function
confluent-mongo-aws-state-function-1
will trigger the valid reviews Lambda function, running every 5 seconds to generate valid reviews. - The Step Function
confluent-mongo-aws-state-function-2
will trigger the review bombing Lambda function, running every 3 minutes to generate 1,000 fake reviews. - The Step Function
confluent-mongo-aws-state-function-3
will trigger the static review bombing Lambda function, running every 25 seconds to generate static fake reviews.
- The Step Function
- Observe the data being populated in the
amazon-reviews
topic that we set up in Confluent Cloud.
- Create a database and collection in the cluster from the prerequisites. Click the "Insert Document" option, switch to code view, and paste the entire content from the
data/amazon-user-mockdata.json
file. This will populate the collection with user accounts.
- Go to the
Connectors
section in the left pane, search for theMongoDB Atlas Source
connector, and select it.
- Provide a prefix for the topic that will be created by the MongoDB source connector.
- Enter the API key for the existing cluster created in the previous steps or generate a new Kafka cluster API key.
- Provide Atlas credentials in the authentication step
- The verification step will be successful if Confluent can connect to MongoDB Atlas using the provided credentials.
- Configure your connector as demonstrated, and leave the remaining settings unchanged.
- Review and launch the connector. Once it is successfully running, you will see a new topic created in the Topics section and user account data populated within the topic.
-
Execute the following queries to perform a review filter operation, separating reviews into valid and invalid categories. The filtering is based on a hopping window action, where any sudden spike of negative reviews from a single user ID within a short time frame will be classified as an invalid review.
Note: Run each query in a new editor window. To create a new editor, click the + button near the left side of the editor.
INSERT INTO filtered_invalid_reviews ( user_id, rating, title, text, images, asin, parent_asin, `timestamp`, helpful_vote, verified_purchase, window_start, window_end, review_count ) SELECT ar.user_id, ar.rating, ar.title, ar.text, ar.images, ar.asin, ar.parent_asin, ar.`timestamp`, ar.helpful_vote, ar.verified_purchase, rc.window_start, rc.window_end, rc.review_count FROM ( SELECT user_id, window_start, window_end, COUNT(*) AS review_count FROM TABLE( HOP(TABLE `amazon-reviews`, DESCRIPTOR(`timestamp`), INTERVAL '1' MINUTES, INTERVAL '4' MINUTES)) GROUP BY window_start, window_end, user_id ) rc JOIN `amazon-reviews` ar ON ar.user_id = rc.user_id AND ar.`timestamp` >= rc.window_start AND ar.`timestamp` < rc.window_end WHERE rc.review_count >= 10 AND ar.verified_purchase = false;
INSERT INTO filtered_valid_reviews ( user_id, rating, title, text, images, asin, parent_asin, `timestamp`, helpful_vote, verified_purchase, window_start, window_end, review_count ) SELECT ar.user_id, ar.rating, ar.title, ar.text, ar.images, ar.asin, ar.parent_asin, ar.`timestamp`, ar.helpful_vote, ar.verified_purchase, rc.window_start, rc.window_end, rc.review_count FROM ( SELECT user_id, window_start, window_end, COUNT(*) AS review_count FROM TABLE( HOP(TABLE `amazon-reviews`, DESCRIPTOR(`timestamp`), INTERVAL '1' MINUTES, INTERVAL '4' MINUTES)) GROUP BY window_start, window_end, user_id ) rc JOIN `amazon-reviews` ar ON ar.user_id = rc.user_id AND ar.`timestamp` >= rc.window_start AND ar.`timestamp` < rc.window_end WHERE rc.review_count < 10
-
After the window analysis is complete, the data will be distributed into the respective topics. Verify the data within these topics, all data produced by the review bombing Lambda function will be categorized as invalid reviews.
-
Now, let's join the valid reviews with the account information.
INSERT INTO valid_reviews_with_user_info SELECT fvr.user_id, fvr.rating, fvr.title, fvr.text, fvr.images, fvr.asin, fvr.parent_asin, fvr.`timestamp`, fvr.helpful_vote, fvr.verified_purchase, fvr.window_start, fvr.window_end, fvr.review_count, au.address, au.city, au.country, au.email, au.first_name, au.gender, au.last_name, au.payment_method, au.phone_number, au.state, au.zip_code FROM filtered_valid_reviews fvr JOIN `mongodb.confluent-aws-mongo-demo-db.amazon-userids` au ON fvr.user_id = au.user_id;
-
Now, you will see the valid review data with the account information in the
valid_reviews_with_user_info
topic.
- Navigate to Confluent Cloud and create the following new topics:
valid_reviews_with_user_info_json enriched_review enriched_product
Normally you would do the deserialization and then the calls out to Bedrock in a single AWS Lambda function, however for this demo we broke it up into two AWS Lambdas for readability. After messages are deserialized, they are sent back to Confluent Cloud as plain JSON which can be used by the next Lambda function in making a call out to Amazon Bedrock.
- Navigate to the avro_deserializer Lambda function.
- Click
Add Trigger
. - Search for
Confluent
and select theApache Kafka
option. What we are doing is creating a native integration between Confluent and AWS Lambda wherein AWS Lambda will spin up a consumer group and listen in on a topic we specify. - Use the following configurations:
Bootstrap Servers: Starting position: Trim Horizon Topic Name: valid_reviews_with_user_info Authentication: BASIC_AUTH Secrets Manager Key: genai_demo_secret
- Confirm data is following by looking at the
valid_reviews_with_user_info
topic. If you see messages coming in, the AWS Lambda trigger is working and the AWS Lambda is consuming and publishing data to Confluent Cloud.
This lambda function summarizes the review after consuming off the valid_reviews_with_user_info_json
topic. The newly generated review and associated details are then publish to the enriched_product
topic.
- Navigate to the review_summarizer Lambda function.
- Click
Add Trigger
. - Search for
Confluent
and select theApache Kafka
option. What we are doing is creating a native integration between Confluent and AWS Lambda wherein AWS Lambda will spin up a consumer group and listen in on a topic we specify. - Use the following configurations:
Bootstrap Servers: Starting position: Trim Horizon Topic Name: valid_reviews_with_user_info_json Authentication: BASIC_AUTH Secrets Manager Key: genai_demo_secret
- Confirm data is following by looking at the
enriched_product
topic. If you see messages coming in, the AWS Lambda trigger is working and the AWS Lambda is consuming and publishing data to Confluent Cloud.
This lambda function finds reviews that are unique. This could come in the form of length/comprehensiveness, a varied perspective, etc. This is done by finding reviews that have the least amount of matches after consuming off the valid_reviews_with_user_info_json
topic. The newly generated review and associated details are then publish to the enriched_review
topic.
- Navigate to the semantic_filter Lambda function.
- Click
Add Trigger
. - Search for
Confluent
and select theApache Kafka
option. What we are doing is creating a native integration between Confluent and AWS Lambda wherein AWS Lambda will spin up a consumer group and listen in on a topic we specify. - Use the following configurations:
Bootstrap Servers: Starting position: Trim Horizon Topic Name: valid_reviews_with_user_info_json Authentication: BASIC_AUTH Secrets Manager Key: genai_demo_secret
- Confirm data is following by looking at the
enriched_review
topic. If you see messages coming in, the AWS Lambda trigger is working and the AWS Lambda is consuming and publishing data to Confluent Cloud.
-
If you wish to remove all resources created during the demo to avoid additional charges, run the following command to execute a cleanup:
terraform destroy
This will delete all resources provisioned by Terraform.
-
Terminate all running processes such as connectors and Flink commands, then delete the cluster and environment in the Confluent Cloud UI.
-
Delete the collection and database, then remove the cluster in MongoDB Atlas.