Lab 1 is a prerequisite for this lab.
In this lab you will use Azure Data Factory to download large data files to your data lake and use Azure SQL Data Warehouse to generate a summary dataset and store it. The dataset you will use contains detailed New York City Yellow Taxi rides for 2018. You will generate a daily aggregated summary of all rides and save the result in your data warehouse. You will use Power BI to visualise summarised taxi ride data.
The estimated time to complete this lab is: 45 minutes.
The final workflow should look something like this:
Why are we doing this? It's common to have data files generated by a vendor as csv or JSON files. It becomes your responsibility as the "consumer" of the data to load that data into your organization's data estate, whether that is a Data Lake, EDW, standard relational database, etc. In this case the data files are stored in a "vendor's" blob storage account.
Note: This lab copies a lot of data around and can take a long time to complete. It can also get a bit frustrating when it fails midway through the pipeline with a sometimes cryptic message. Let me know and I can help. There is a completed JSON template for each resource (inline below) that you can also use to deploy the resources that should work if you get stuck.
Another helpful hint: Occassionally ADF will throw the same error after you have changed/fixed the code. If that happens exit your browser and re-open the Data Factory and try again.
Finally, I'm here to help if you get frustrated or don't understand the use case fully.
IMPORTANT: Some of the Azure services provisioned require globally unique name and a “-suffix” has been added to their names to ensure this uniqueness. Please take note of the suffix generated as you will need it for the following resources:
Name | Type |
---|---|
mdwcosmosdb-suffix | Cosmos DB account |
MDWDataFactory-suffix | Data Factory (V2) |
mdwdatalakesuffix | Storage Account |
MDWEventHubs-suffix | Event Hubs Namespace |
MDWKeyVault-suffix | Key vault |
mdwsqlvirtualserver-suffix | SQL server |
MDWStreamAnalytics-suffix | Stream Analytics job |
In this section you will connect to Azure SQL Data Warehouse to create the database objects used to host and process data.
- Open Azure Data Studio wherever you installed it.
- Connect to MDWSQLVirtualServer
- On the Connection Details panel, enter the following connection details:
- Server: mdwsqlvirtualserver-suffix.database.windows.net
- Authentication Type: SQL Login
- User Name: mdwadmin
- Password: P@ssw0rd123!
- Database: MDWASQLDW
Open a query window and Execute the full solution script
- Why would we use
distribution=round_robin
? - Is there a better choice?
- When is it a good choice to use SQLDW vs a standard Azure SQL DB/SQL IaaS/etc?
The first step is to copy the raw data files unaltered (this is a good ELT patterns) to your storage account.
Create a container in your MDWDataLake that will be used as a repository for the NYC Taxi Data files. You will copy 12 files from the MDWResources Storage Account (your vendor's storage account) into your NYCTaxiData container. These files contain data for all Yellow Taxi rides in 2018, one file for each month of the year.
-
In the Azure Portal, go to the lab resource group and locate the Azure Storage account mdwdatalakesuffix.
-
Click + Container.
-
On the New container blade, enter the following details:
- Name: nyctaxidata
- Public access level: Private (no anonymous access) -
Click OK to create the new container.
All of the external resources are created. We are ready to copy the data using Azure Data Factory.
- Open the Azure Data Factory portal and click the Author option (pencil icon) on the left-hand side panel.
With ADF you first create Linked Services, then Datasets (source and sink), and then the pipelines that actually copy the data.
For this lab we need to create Linked Services for:
- The source storage account csvs (this is our vendor's file store)
- The destination datalake to hold our unaltered csvs
- The SQL DW where we will copy some of our data for later analytics.
-
On the New Linked Service (Azure Blob Storage) blade (this is under "Connections" near the bottom of the ADF screen), enter the following details. (This is our data vendor's storage account):
- Name: NYCTaxiDataVendor
- Connect via integration runtime: AutoResolveIntegrationRuntime
- Authentication method: SAS URI
- SAS URL:https://mdwresources.blob.core.windows.net/?sv=2018-03-28&ss=b&srt=sco&sp=rwl&se=2050-12-30T17:25:52Z&st=2019-04-05T09:25:52Z&spr=https&sig=4qrD8NmhaSmRFu2gKja67ayohfIDEQH3LdVMa2Utykc%3D
-
Click Test connection to make sure you entered the correct connection details and then click Finish.
-
Create another Linked Service for our data lake. You will need to get the storage account key from the Azure Portal.
-
Create another Linked Service for the Azure SQL DW.
With Linked Services created we can now create the data sets. We need 5 of them that will be used by your data pipeline:
Dataset | Description |
---|---|
MDWResources_NYCTaxiData | References MDWResources (our data vendor) shared storage account container that contains source data files. |
MDWResources_NYCTaxiLookup | References MDWResources shared storage account that contains a .csv file with all taxi location codes and names (lookup data). |
MDWASQLDW_StagingNYCTaxiData | References the table Staging.NYCTaxiData in the Azure SQL Data Warehouse database MDWASQLDW. |
MDWASQLDW_StagingNYCLocationLookup | References the table [Staging].[NYCTaxiLocationLookup] in the Azure SQL Data Warehouse database MDWASQLDW and acts as destination of lookup data copied from MDWResources_NYCTaxiLookup. |
MDWDataLake_NYCTaxiData | References your MDWDataLake-suffix storage account. It acts as the destination for the files copied from MDWResources_NYCTaxiData. It also functions as a data source when copying data to MDWASQLDW_StagingNYCTaxiData. |
Each will be setup with the same basic process:
- Open the Azure Data Factory portal and click the Author (pencil icon) option on the left-hand side panel. Under Factory Resources tab, click the ellipsis (…) next to Datasets and then click Add Dataset to create a new dataset.
Ensure you Publish All
after each setup to save your progress.
On the New Data Set tab, enter the following details:
- New Dataset Type: Azure Blob Storage
- format: csv
- General > Name: MDWResources_NYCTaxiData
- Connection > Linked Service: MDWResources
- Connection > File Path: nyctaxidata / *.csv
- Leave directory blank
- Connection > Binary Copy: Checked
Or use the following JSON:
{
"name": "MDWResources_NYCTaxiData",
"properties": {
"linkedServiceName": {
"referenceName": "MDWResources",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "DelimitedText",
"typeProperties": {
"location": {
"type": "AzureBlobStorageLocation",
"container": "nyctaxidata"
},
"columnDelimiter": ",",
"escapeChar": "\\",
"firstRowAsHeader": true,
"quoteChar": "\""
},
"schema": []
},
"type": "Microsoft.DataFactory/factories/datasets"
}
This references the NYCTaxiData container in your MDWDataLake storage account. You will need to create a new Linked Service to Azure Blob Storage first, or this can be done while setting up the Data Set. Call it MDWDataLake
On the New Data Set tab, enter the following details:
- Format: csv
- General > Name: MDWDataLake_NYCTaxiData
- Connection > Linked Service: MDWDataLake
- Connection > File Path: nyctaxidata
- Connection > Binary Copy: Unchecked
- Connection > Column names in the first row: Checked
Or use the following JSON:
{
"name": "MDWDataLake_NYCTaxiData",
"properties": {
"linkedServiceName": {
"referenceName": "MDWDataLake",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "DelimitedText",
"typeProperties": {
"location": {
"type": "AzureBlobStorageLocation",
"fileName": "yellow_tripdata_2018-01.csv",
"container": "nyctaxidata"
},
"columnDelimiter": ",",
"escapeChar": "\\",
"firstRowAsHeader": true,
"quoteChar": "\""
},
"schema": [
{
"name": "VendorID",
"type": "String"
},
{
"name": "tpep_pickup_datetime",
"type": "String"
},
{
"name": "tpep_dropoff_datetime",
"type": "String"
},
{
"name": "passenger_count",
"type": "String"
},
{
"name": "trip_distance",
"type": "String"
},
{
"name": "RatecodeID",
"type": "String"
},
{
"name": "store_and_fwd_flag",
"type": "String"
},
{
"name": "PULocationID",
"type": "String"
},
{
"name": "DOLocationID",
"type": "String"
},
{
"name": "payment_type",
"type": "String"
},
{
"name": "fare_amount",
"type": "String"
},
{
"name": "extra",
"type": "String"
},
{
"name": "mta_tax",
"type": "String"
},
{
"name": "tip_amount",
"type": "String"
},
{
"name": "tolls_amount",
"type": "String"
},
{
"name": "improvement_surcharge",
"type": "String"
},
{
"name": "total_amount",
"type": "String"
}
]
},
"type": "Microsoft.DataFactory/factories/datasets"
}
Create another dataset, this time referencing the NYCTaxiLookup container in your MDWResources storage account.
- General > Name: MDWResources_NYCTaxiLookup
- Connection > Linked Service: MDWResources (Data Vendor)
- Connection > File Path: nyctaxilookup / taxi_zone_lookup.csv
- Connection > Column names in the first row: Checked.
Or use the following JSON:
{
"name": "MDWResources_NYCTaxiLookup",
"properties": {
"linkedServiceName": {
"referenceName": "MDWResources",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "DelimitedText",
"typeProperties": {
"location": {
"type": "AzureBlobStorageLocation",
"fileName": "taxi_zone_lookup.csv",
"container": "nyctaxilookup"
},
"columnDelimiter": ",",
"escapeChar": "\\",
"firstRowAsHeader": true,
"quoteChar": "\""
},
"schema": [
{
"name": "LocationID",
"type": "String"
},
{
"name": "Borough",
"type": "String"
},
{
"name": "Zone",
"type": "String"
},
{
"name": "service_zone",
"type": "String"
}
]
},
"type": "Microsoft.DataFactory/factories/datasets"
}
Create another dataset, this time referencing the Staging.NYCTaxiData in your Azure SQL Data Warehouse database.
- Type: Azure SQL Data Warehouse
- Name: MDWASQLDW_StagingNYCTaxiData
- Linked Service: MDWSQLVirtualServer_MDWASQLDW
- Table: [Staging].[NYCTaxiData]
Or use the following JSON:
{
"name": "MDWASQLDW_StagingNYCTaxiData",
"properties": {
"linkedServiceName": {
"referenceName": "MDWSQLVirtualServer_MDWASQLDW",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "AzureSqlDWTable",
"schema": [
{
"name": "VendorID",
"type": "int",
"precision": 10
},
{
"name": "tpep_pickup_datetime",
"type": "datetime",
"precision": 23,
"scale": 3
},
{
"name": "tpep_dropoff_datetime",
"type": "datetime",
"precision": 23,
"scale": 3
},
{
"name": "passenger_count",
"type": "smallint",
"precision": 5
},
{
"name": "trip_distance",
"type": "decimal",
"precision": 8,
"scale": 2
},
{
"name": "RatecodeID",
"type": "smallint",
"precision": 5
},
{
"name": "store_and_fwd_flag",
"type": "char"
},
{
"name": "PULocationID",
"type": "int",
"precision": 10
},
{
"name": "DOLocationID",
"type": "int",
"precision": 10
},
{
"name": "payment_type",
"type": "smallint",
"precision": 5
},
{
"name": "fare_amount",
"type": "decimal",
"precision": 10,
"scale": 2
},
{
"name": "extra",
"type": "decimal",
"precision": 10,
"scale": 2
},
{
"name": "mta_tax",
"type": "decimal",
"precision": 10,
"scale": 2
},
{
"name": "tip_amount",
"type": "decimal",
"precision": 10,
"scale": 2
},
{
"name": "tolls_amount",
"type": "decimal",
"precision": 10,
"scale": 2
},
{
"name": "improvement_surcharge",
"type": "decimal",
"precision": 10,
"scale": 2
},
{
"name": "total_amount",
"type": "decimal",
"precision": 10,
"scale": 2
}
],
"typeProperties": {
"schema": "Staging",
"table": "NYCTaxiData"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
Repeat the process to create another dataset, this time referencing the Staging.NYCLocationLookup in your Azure SQL Data Warehouse database.
-Name: MDWASQLDW_StagingNYCLocationLookup
-Linked Service: MDWSQLVirtualServer_MDWASQLDW
-Table: [Staging].[NYCTaxiLocationLookup]
Or use the following JSON:
{
"name": "MDWASQLDW_StagingNYCLocationLookup",
"properties": {
"linkedServiceName": {
"referenceName": "MDWSQLVirtualServer_MDWASQLDW",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "AzureSqlDWTable",
"schema": [
{
"name": "LocationID",
"type": "int",
"precision": 10
},
{
"name": "Borough",
"type": "varchar"
},
{
"name": "Zone",
"type": "varchar"
},
{
"name": "service_zone",
"type": "varchar"
}
],
"typeProperties": {
"schema": "Staging",
"table": "NYCTaxiLocationLookup"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
In this section you create a data factory pipeline to copy data in the follwng sequence:
- Copy NYC Taxi CSV Data files from your data vendor's storage account MDWResources to your the nyctaxidata container in your MDWDataLake-suffix storage account;
- Copy data from the files sitting in your data lake to a staging table in your SQL Data Warehouse;
- Execute a stored procedure in your SQL Data Warehouse to generate a daily summary and save it in a new table used for Power BI reporting.
-
Click the ellipsis (…) next to Pipelines and then click Add Pipeline. Call it
Copy NYC Taxi Data
. -
Copy/paste the below JSON. You may have to tweak it and fix validation errors.
Click to expand pipeline JSON code
{
"name": "Copy Pipeline",
"properties": {
"activities": [
{
"name": "CopyTaxiDataToDW",
"type": "Copy",
"dependsOn": [
{
"activity": "Copy Taxi Data Files",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"sink": {
"type": "SqlDWSink",
"allowPolyBase": false,
"writeBatchSize": 10000,
"preCopyScript": "truncate table Staging.NYCTaxiData"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"mappings": [
{
"source": {
"name": "VendorID",
"type": "String"
},
"sink": {
"name": "VendorID",
"type": "Int32"
}
},
{
"source": {
"name": "tpep_pickup_datetime",
"type": "String"
},
"sink": {
"name": "tpep_pickup_datetime",
"type": "DateTime"
}
},
{
"source": {
"name": "tpep_dropoff_datetime",
"type": "String"
},
"sink": {
"name": "tpep_dropoff_datetime",
"type": "DateTime"
}
},
{
"source": {
"name": "passenger_count",
"type": "String"
},
"sink": {
"name": "passenger_count",
"type": "Int16"
}
},
{
"source": {
"name": "trip_distance",
"type": "String"
},
"sink": {
"name": "trip_distance",
"type": "Decimal"
}
},
{
"source": {
"name": "RatecodeID",
"type": "String"
},
"sink": {
"name": "RatecodeID",
"type": "Int16"
}
},
{
"source": {
"name": "store_and_fwd_flag",
"type": "String"
},
"sink": {
"name": "store_and_fwd_flag",
"type": "String"
}
},
{
"source": {
"name": "PULocationID",
"type": "String"
},
"sink": {
"name": "PULocationID",
"type": "Int32"
}
},
{
"source": {
"name": "DOLocationID",
"type": "String"
},
"sink": {
"name": "DOLocationID",
"type": "Int32"
}
},
{
"source": {
"name": "payment_type",
"type": "String"
},
"sink": {
"name": "payment_type",
"type": "Int16"
}
},
{
"source": {
"name": "fare_amount",
"type": "String"
},
"sink": {
"name": "fare_amount",
"type": "Decimal"
}
},
{
"source": {
"name": "extra",
"type": "String"
},
"sink": {
"name": "extra",
"type": "Decimal"
}
},
{
"source": {
"name": "mta_tax",
"type": "String"
},
"sink": {
"name": "mta_tax",
"type": "Decimal"
}
},
{
"source": {
"name": "tip_amount",
"type": "String"
},
"sink": {
"name": "tip_amount",
"type": "Decimal"
}
},
{
"source": {
"name": "tolls_amount",
"type": "String"
},
"sink": {
"name": "tolls_amount",
"type": "Decimal"
}
},
{
"source": {
"name": "improvement_surcharge",
"type": "String"
},
"sink": {
"name": "improvement_surcharge",
"type": "Decimal"
}
},
{
"source": {
"name": "total_amount",
"type": "String"
},
"sink": {
"name": "total_amount",
"type": "Decimal"
}
}
]
}
},
"inputs": [
{
"referenceName": "MDWDataLake_NYCTaxiData",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "MDWASQLDW_StagingNYCTaxiData",
"type": "DatasetReference"
}
]
},
{
"name": "CopyTaxiLookupDataToDW",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"sink": {
"type": "SqlDWSink",
"allowPolyBase": false,
"writeBatchSize": 10000,
"preCopyScript": "truncate table Staging.NYCTaxiLocationLookup"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MDWDataLake",
"type": "LinkedServiceReference"
},
"path": "polybase"
}
},
"inputs": [
{
"referenceName": "MDWResources_NYCTaxiLookup",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "MDWASQLDW_StagingNYCLocationLookup",
"type": "DatasetReference"
}
]
},
{
"name": "LoadTaxiDataSummary",
"type": "SqlServerStoredProcedure",
"dependsOn": [
{
"activity": "CopyTaxiDataToDW",
"dependencyConditions": [
"Succeeded"
]
},
{
"activity": "CopyTaxiLookupDataToDW",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"storedProcedureName": "[Staging].[spNYCLoadTaxiDataSummary]"
},
"linkedServiceName": {
"referenceName": "MDWSQLVirtualServer_MDWASQLDW",
"type": "LinkedServiceReference"
}
},
{
"name": "Copy Taxi Data Files",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": true,
"wildcardFileName": "*.csv",
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"sink": {
"type": "BlobSink",
"copyBehavior": "PreserveHierarchy"
},
"enableStaging": false
},
"inputs": [
{
"referenceName": "MDWResources_NYCTaxiData",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "MDWDataLake_NYCTaxiData",
"type": "DatasetReference"
}
]
}
],
"annotations": []
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
-
Examine (and hopefully understand) what each step is doing.
-
Publish your pipeline changes by clicking the Publish all button.
-
To execute the pipeline, click on Add trigger menu and then Trigger Now.
-
On the Pipeline Run blade, click Finish.
-
To monitor the execution of your pipeline, click on the Monitor menu on the left-hand side panel.
-
You should be able to see the Status of your pipeline execution on the right-hand side panel.
-
Click the View Activity Runs button for detailed information about each activity execution in the pipeline. The whole execution should last between 7-8 minutes.
In this section you are going to use Power BI to visualize data from Azure SQL Data Warehouse. The Power BI report will use an Import connection to query Azure SQL Data Warehouse and visualise Motor Vehicle Collision data from the table you loaded in the previous exercise.
-
Open the file MDWLab2.pbit with Power BI Desktop, wherever you installed Power BI.
-
When prompted to enter the value of the MDWSQLVirtualServer parameter, type the full server name: mdwsqlvirtualserver-suffix.database.windows.net
-
Click Load.
-
When prompted to enter credentials, select Database from the left-hand side panel and enter the following details:
- User name: mdwadmin
- Password: P@ssw0rd123! -
Leave remaining fields with their default values.
-
Click Connect.
-
Once data finish loading interact with the report by changing the PickUpDate slicer and by clicking on the other visualisations.
-
Save your work and close Power BI Desktop.