In this project I am building and executing end-to-end ELT pipeline and driving analytics using Amazon Redshift as the data warehouse solution. With the below AWS services
Amazon Redshift: Data warehouse and data lake
AWS Step Function: ELT orchestration
AWS Glue: Data catalog, ELT job and crawler
AWS Secrets Manager: Store Amazon Redshift cluster credential
Amazon Quicksight: BI tool
As a starting point I'm using NYC taxi sample data which is made available as gzip csv files in an Amazon S3 bucket. This data will be loaded, transformed, exported to data lake using Amazon Redshift. The exported data will be crawled via AWS Glue Crawler. This crawled table will be accessed via Amazon Redshift Spectrum. These steps are orchestrated using AWS Step function.
After the cloud formation deployment completing we check the secrets in the secrets managers such as (Redshift username, password)
We will do few steps here with redshift. a. We will copy the ARN (Amazon Resource Name) under cluster permission related to IAM role. b. From the editor in the left menu in editor: We will connect to the database using the credentials in the secrets manager mentioned before. Database : nyctaxi User : master Password : (from secrets manager console) c. In the query editor we will create the schema and the main table containing the trips using the following SQL code (Please change the region and the ARN to yours)
In Amazon AWS Glue we will check and edit the connection which has been already created in our Cloudformation template. a. We will edit the connection and add our parameters of the database-name, user-name and password from the secrets manager as per the following screenshot.
b. From ETL > Job select the job name "rs-query" and then Action > Edit job (I worked on the legacy one): we choose "rs-con".
*First we will do the data loading using the following script:
SET query_group TO 'ingest';
CREATE TEMPORARY TABLE nyc_greentaxi_tmp (LIKE taxischema.nyc_greentaxi);
COPY nyc_greentaxi_tmp FROM '{0}' IAM_ROLE '{1}' csv ignoreheader 1 region '{2}' gzip;
DELETE FROM taxischema.nyc_greentaxi USING nyc_greentaxi_tmp WHERE taxischema.nyc_greentaxi.vendorid = nyc_greentaxi_tmp.vendorid AND taxischema.nyc_greentaxi.lpep_pickup_datetime = nyc_greentaxi_tmp.lpep_pickup_datetime;
INSERT INTO taxischema.nyc_greentaxi SELECT * FROM nyc_greentaxi_tmp;
DROP TABLE nyc_greentaxi_tmp;
Don't forget to replace {0} by S3 bucket which has the data, {1} with ARN role, {2} with the region you are in.
*Second we will do the transormation and loading using the following script:
SET query_group TO 'ingest';
UNLOAD( 'select case when vendorid = 1 then ''Creative Mobile Technologies, LLC'' else ''VeriFone Inc.'' end as vendor , date_part(hr, lpep_pickup_datetime)::int hour_pickup , date_part(hr, lpep_dropoff_datetime)::int hour_dropoff , trunc(lpep_pickup_datetime) pickup_date , trunc(lpep_dropoff_datetime) dropoff_date , case when RateCodeID = 1 then ''Standard rate'' when RateCodeID = 2 then ''JFK'' when RateCodeID = 3 then ''Newark'' when RateCodeID = 4 then ''Nassau or Westchester'' when RateCodeID = 5 then ''Negotiated fare'' else ''Group ride'' end RateCode , case when Payment_type = 1 then ''Credit car'' when Payment_type = 2 then ''Cash'' when Payment_type = 3 then ''No charge'' when Payment_type = 4 then ''Dispute'' when Payment_type = 5 then ''Unknown'' else ''Voided trip'' end Paymenttype , case when Trip_type = 1 then ''Street-hail'' else ''Dispatch'' end Trip_type , pulocationid , dolocationid , Passenger_count , Trip_distance , Total_amount , Tip_amount , Fare_amount , tolls_amount , total_amount from taxischema.nyc_greentaxi ORDER BY pickup_date' ) to '{0}' IAM_ROLE '{1}' parquet PARTITION BY (pickup_date) ALLOWOVERWRITE ;
Step functions is used for orchesteration, we will find the job is already created so all we need is to start the execution.
We will use crawlers to crawl the data from the s3 bucket in parquet folder which we brought from the unload step, to crawl this data to a default database and create the table for us.
Using Quicksight we will create a dashboard with more than *3,000,000 records