Skip to content

Commit

Permalink
feat: Add Azure Stream Analytics Single Tech Sample - Testing ASA job…
Browse files Browse the repository at this point in the history
…s + Bicep IaC templates (#94) (#321)

* Bicep IaC templates with Streaming Analytics Job Sample (#94)
* README.md  Describes how to run the project in Azure and perform functional, unit, and e2e test using an online simulator.
  Added section on "Key Concepts"
* .gitignore Added
* containers.bicep Creates blob storaage container as data sink
* iothubs.bicep Creates IoT Hub
* streamingjobs.bicep Creates ASA job
* main.bicep Creates all Azure resources necessary for sample
* streamanalytics-tech-sample.asaql Sample query for filtering temperature > 27
* test/temperature_equal_to_27_degrees.json Test case data
* temperature_greater_than_27_degrees_expected.json Test case data
* temperature_greater_than_27_degrees.json Test case data
* temperature_less_than_27_degrees.json Test case data
* testConfig.json Added
* Updated commands and params
* main.bicep removed default values.
* README.md Complete env variable usage.
  • Loading branch information
jmostella authored May 21, 2021
1 parent 630dacd commit abd95bc
Show file tree
Hide file tree
Showing 22 changed files with 2,598 additions and 2 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ page_type: sample
languages:
- python
- C#
- TypeScript
- bicep
products:
- Azure
- Azure-Data-factory
- Azure-Databricks
- Azure-Stream-Analytics
- Azure-Data-Lake-Gen2
- Azure-Functions
description: "Code samples showcasing how to apply DevOps concepts to the Modern Data Warehouse Architecture leveraging different Azure Data Technologies."
Expand All @@ -33,6 +36,7 @@ The samples are either focused on a single azure service (**Single Tech Samples*
- [IaC - Basic Azure Databricks deployment](single_tech_samples/databricks/sample1_basic_azure_databricks_environment/)
- [IaC - Enterprise Security and Data Exfiltration Protection Deployment](single_tech_samples/databricks/sample2_enterprise_azure_databricks_environment/)
- [IaC - Cluster Provisioning and Secure Data Access](single_tech_samples/databricks/sample3_cluster_provisioning_and_data_access/)
- [Stream Analytics](single_tech_samples/streamanalytics/)
- [Azure Purview](single_tech_samples/purview/)
- [IaC - Azure Purview](single_tech_samples/purview/)

Expand Down
3 changes: 3 additions & 0 deletions single_tech_samples/streamanalytics/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
output
testResultSummary.json
e2e/node_modules
160 changes: 158 additions & 2 deletions single_tech_samples/streamanalytics/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,167 @@
# Azure Stream Analytics

WIP
![introductory diagram](./docs/images/ASA-job.PNG)

[Azure Stream Analytics](https://azure.microsoft.com/en-us/services/stream-analytics/) is a serverless real-time analytics service. The goal of this sample is to demonstrate how to develop a streaming pipeline, with IaC and testability in mind.

## Prerequisites

## Setup

## Running the sample
1. __Azure Cli__ Will be necessary for various tasks. Please follow the instructions found [here](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli).

1. __Bicep__ This project uses `Bicep` templates to setup `Azure` infrastructure. Please follow the steps under [Install and manage via Azure CLI (easiest)](https://github.com/Azure/bicep/blob/main/docs/installing.md#install-and-manage-via-azure-cli-easiest) to install the `Azure Cli` extension.

A `Bicep` tutorial can be found [here](https://github.com/Azure/bicep/blob/main/docs/tutorial/01-simple-template.md).

1. __Raspberry Pi Azure IoT Online Simulator__ To run through the [Functional Test](#Functional%20Test) section, you can use [this](https://azure-samples.github.io/raspberry-pi-web-simulator/) simulator. Makes sure to follow the step required to edit the js code.

1. __azure-streamanalytics-cicd__ Unit testing is possible using the `npm` package [azure-streamanalytics-cicd](https://www.npmjs.com/package/azure-streamanalytics-cicd). The tool provides additional features for working with `Azure Stream Analytics` locally. Check [here](https://docs.microsoft.com/en-us/azure/stream-analytics/cicd-tools?tabs=visual-studio-code) for more info.

1. __Configure environment variables.__ We will assume these are customized and configured per your needs

```bash
APP="tech-sample"
ENVIRONMENT="test"
LOCATION="japaneast"
STORAGE_ACCOUNT="st${APP/-/}${ENVIRONMENT}"
```

## Build

```bash
# Run Unit Tests
azure-streamanalytics-cicd test -project ./asaproj.json -outputPath ./output/

# Create Resource Group

az group create -n rg-${APP} -l ${LOCATION}

# Validate Generated Template
az deployment group validate -f main.bicep -g rg-${APP} --parameters query='@./streamanalytics-tech-sample.asaql' name=${APP} env=${ENVIRONMENT}

# Show plan
az deployment group what-if -f main.bicep -g rg-${APP} --parameters query='@./streamanalytics-tech-sample.asaql' name=${APP} env=${ENVIRONMENT}
```

## Deploy

```bash
# Create Azure Resources. This will also start the job

az deployment group create -f main.bicep -g rg-${APP} --parameters query='@./streamanalytics-tech-sample.asaql' name=${APP} env=${ENVIRONMENT}
```

## Functional Test

```bash
# Add device

az iot hub device-identity create --hub-name iot-${APP}-${ENVIRONMENT} --device-id iot-${APP}-${ENVIRONMENT} --edge-enabled

# Use connection information with "Raspberry Pi Azure IoT Online Simulator": https://azure-samples.github.io/raspberry-pi-web-simulator/

az iot hub device-identity connection-string show --hub-name iot-${APP}-${ENVIRONMENT} --device-id iot-${APP}-${ENVIRONMENT} --output tsv
```

> Check the blob storage container to ensure that a json file exists with expected temperature sensor data (exceeding 27 degrees) is present.
## Automated End-to-End Test

```bash
export DEVICE_CONNECTION_STRING=$(az iot hub device-identity connection-string show --hub-name iot-${APP}-${ENVIRONMENT} --device-id iot-${APP}-${ENVIRONMENT} --output tsv)
export AZURE_STORAGE_CONNECTION_STRING=$(az storage account show-connection-string -n ${STORAGE_ACCOUNT} --query connectionString -o tsv)

cd e2e
npm install
npm test
```

## Cleanup

```bash
az group delete -n rg-${APP}
```

## Key concepts

### Infrastructure as Code

This sample's infrastructure is created using [Bicep](https://github.com/Azure/bicep#what-is-bicep) templates. `Bicep` is a DSL that is transpiled into `Azure`'s native `ARM Templates`. It's currently in preview but already has been integrated into the `Azure Cli`. Also [ARM template reference documentation](https://docs.microsoft.com/en-us/azure/templates/microsoft.devices/iothubs?tabs=bicep) lists `Bicep` along side `ARM Templates`:

![Bicep template screen capture](docs/images/Bicep-doc.PNG)

Alternatively, `Terraform` can also be used.

> Key take aways for `Bicep`
>
> * Day 0 resource provider support. Any Azure resource — whether in private or public preview or GA — can be provisioned using Bicep.
> * No state or state files to manage. All state is stored in Azure, so makes it easy to collaborate and make changes to resources confidently.
> * VSCode extension for Bicep makes it extremely easy to author and get started with advanced type validation based on all Azure resource type API definitions.
Resource naming conventions attempt to follow examples from [Define your naming convention
](https://docs.microsoft.com/en-us/azure/cloud-adoption-framework/ready/azure-best-practices/resource-naming#example-names-for-common-azure-resource-types) where appropriate.

The structure of the __IaC__ scripts allow you to setup as many versions or the infrastructure as you need easily. For example an individual developer may create a version to use within their __Inner Dev Loop__:

```bash
DEVELOPER="kaa"
ENVIRONMENT="inner"
LOCATION="japaneast"
STORAGE_ACCOUNT="st${DEVELOPER}${ENVIRONMENT}"

az group create -n rg-${DEVELOPER} -l $LOCATION

az deployment group create -f main.bicep -g rg-${DEVELOPER} --parameters query='@./streamanalytics-tech-sample.asaql' name=${DEVELOPER} env=${ENVIRONMENT}

az iot hub device-identity create --hub-name iot-${DEVELOPER}-${ENVIRONMENT} --device-id iot-${DEVELOPER}-${ENVIRONMENT} --edge-enabled

export DEVICE_CONNECTION_STRING=$(az iot hub device-identity connection-string show --hub-name iot-${DEVELOPER}-${ENVIRONMENT} --device-id iot-${DEVELOPER}-${ENVIRONMENT} --output tsv)
export AZURE_STORAGE_CONNECTION_STRING=$(az storage account show-connection-string -n ${STORAGE_ACCOUNT} --query connectionString -o tsv)

# Cleanup
az group delete -n rg-${DEVELOPER}
```

### Testing

#### Unit

`Azure Stream Analytics` can be unit tested locally via the [azure-streamanalytics-cicd](https://www.npmjs.com/package/azure-streamanalytics-cicd) npm package.

Also this package can be used as part of an Azure DevOps CI\CD pipeline. More info can be found [here](https://docs.microsoft.com/en-us/azure/stream-analytics/set-up-cicd-pipeline). This should make integrating this into your existing CI\CD fairly straight forward.

> Key take aways for `azure-streamanalytics-cicd`
>
> * Test configuration is found under the __/test__ directory.
> * This is the default location for `azure-streamanalytics-cicd`.
> * __test/testConfig.json__ defines the test cases.
> * __test/temperature ... .json__ defines pipeline inputs and outputs (as `JSON`)
#### Functional

![simulator site image](docs/images/InkedIoT-Simulator_LI.jpg)

Functional testing can be done using the online tool [Raspberry Pi Web Client Simulator](https://azure-samples.github.io/raspberry-pi-web-simulator/). Source code can be found [here](https://github.com/Azure-Samples/raspberry-pi-web-simulator). This is an easy way to interacte with your `Azure Streaming Analytics` pipeline.

You can manually check the blob storage to see that events are coming through correctly.

![blob container screen cap](docs/images/BLOB-OUT.PNG)

> One aspect to consider, is that events will be batched on the scale of seconds due to the `pathPattern` defined in [streamingjobs.bicep](./streamingjobs.bicep) for `bloboutput`. This is done to aid automated testing. You can adjust during functional testing, and for production as required:
>
> _example_
> <!-- markdownlint-disable MD037 -->
> ```bicep
> pathPattern: '{date}'
> ```
> <!-- markdownlint-enable MD037 -->
> Then you should be able to see many events per file.
#### Automated End-to-End
This sample combines [Azure IoT device SDK](https://www.npmjs.com/package/azure-iot-device) and [Microsoft Azure Storage SDK for JavaScript](https://www.npmjs.com/package/@azure/storage-blob) to create a Node.js (TypeScript) based End-to-End test solution. As mentioned under __Functional__ by default blob output partitioning is done on the seconds resolution to prevent the automated test from waiting an impractical amount of time. This can be made configurable according to your requirements on deploy by altering the `Bicep` template.
![test result output screen capture](docs/images/e2e-test.PNG)
Within the test file [e2e/e2e.ts](e2e/e2e.ts) there is the `EXPECTED_E2E_LATENCY_MS` defined to be 1s. So this would also need to be adjusted for a real implementation.
5 changes: 5 additions & 0 deletions single_tech_samples/streamanalytics/asaproj.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"name": "streamanalytics",
"startFile": "streamanalytics-tech-sample.asaql",
"configurations": []
}
20 changes: 20 additions & 0 deletions single_tech_samples/streamanalytics/containers.bicep
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
param name string
param env string

resource storageAccount 'Microsoft.Storage/storageAccounts@2020-08-01-preview' = {
name: replace('st${name}${env}', '-', '')
location: resourceGroup().location
sku: {
name: 'Standard_LRS'
}
kind: 'StorageV2'
}

resource blobContainer 'Microsoft.Storage/storageAccounts/blobServices/containers@2020-08-01-preview' = {
name: '${storageAccount.name}/default/bloboutput'
}

output account object = {
accountName: storageAccount.name
accountKey: listKeys(storageAccount.id, storageAccount.apiVersion).keys[0].value
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
141 changes: 141 additions & 0 deletions single_tech_samples/streamanalytics/e2e/e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import * as chai from 'chai';
import { Mqtt } from 'azure-iot-device-mqtt';
import { Client, Message } from 'azure-iot-device';
import { BlobItem, ContainerClient } from '@azure/storage-blob';
chai.use(require('chai-subset'));

const EVENT_SINK_CONTAINER = 'bloboutput';
const EXPECTED_E2E_LATENCY_MS = 1000;
const DEVICE_ID = 'modern-data-warehouse-dataops/single_tech_samples/streamanalytics/e2e'

describe('Send to IoT Hub', () => {
let iotClient: Client;
let containerClient: ContainerClient;

before(() => {
chai.expect(process.env.DEVICE_CONNECTION_STRING).to.be.not.empty;
chai.expect(process.env.AZURE_STORAGE_CONNECTION_STRING).to.be.not.empty;
});

before(async () => {
iotClient = Client.fromConnectionString(process.env.DEVICE_CONNECTION_STRING, Mqtt);
await iotClient.open();
});

before(() => {
containerClient = new ContainerClient(process.env.AZURE_STORAGE_CONNECTION_STRING, EVENT_SINK_CONTAINER);
});

before(async () => {
await deleteAllBlobs();
});

after(async () => {
await iotClient.close();
});

afterEach(async () => {
await deleteAllBlobs();
});

async function send(message: Message) {
await iotClient.sendEvent(message);
};

async function deleteAllBlobs() {
for await (const blob of containerClient.listBlobsFlat()) {
await containerClient.deleteBlob(blob.name);
}
}

async function getFirstBlob(): Promise<BlobItem> {
for await (const blob of containerClient.listBlobsFlat()) {
return blob;
}
}

async function getBlobData(blob: BlobItem): Promise<string> {
const client = containerClient.getBlockBlobClient(blob.name);
const response = await client.download();
return (await streamToBuffer(response.readableStreamBody!)).toString();
}

function convertBlobData(blobData: string): any[] {
return blobData.split('\n').map(entry => JSON.parse(entry));
}

// A helper method used to read a Node.js readable stream into a Buffer
async function streamToBuffer(readableStream: NodeJS.ReadableStream): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
readableStream.on("data", (data: Buffer | string) => {
chunks.push(data instanceof Buffer ? data : Buffer.from(data));
});
readableStream.on("end", () => {
resolve(Buffer.concat(chunks));
});
readableStream.on("error", reject);
});
}

async function delay(factor = 1) {
await new Promise(resolve => setTimeout(resolve, EXPECTED_E2E_LATENCY_MS * factor));
}

describe('payload with temperature', () => {
describe('greater than 27 degrees', () => {
it('should contain entry in blob', async () => {
const data = {
deviceId: DEVICE_ID,
temperature: 27.1
};
const message = new Message(JSON.stringify(data));

await send(message);

await delay();

const blob = await getFirstBlob();
chai.expect(blob).to.not.be.undefined;
const blobData = await getBlobData(blob);
const entries = convertBlobData(blobData);
chai.expect(entries).to.have.length(1);
chai.expect(entries).to.containSubset([data]);
});
});

describe('equal to 27 degrees', () => {
it('should not contain entry in blob', async () => {
const data = {
deviceId: DEVICE_ID,
temperature: 27
};
const message = new Message(JSON.stringify(data));

await send(message);

await delay(1.5);

const blob = await getFirstBlob();
chai.expect(blob).to.be.undefined;
});
});

describe('less than 27 degrees', () => {
it('should not contain entry in blob', async () => {
const data = {
deviceId: DEVICE_ID,
temperature: 26.9
};
const message = new Message(JSON.stringify(data));

await send(message);

await delay(1.5);

const blob = await getFirstBlob();
chai.expect(blob).to.be.undefined;
});
});
});
});
Loading

0 comments on commit abd95bc

Please sign in to comment.