Skip to content

Commit

Permalink
Bicep IaC templates with Streaming Analytics Job Sample
Browse files Browse the repository at this point in the history
* README.md  Describes how to run the project in Azure and perform functional, unit, and e2e test using an online simulator.
  Added section on "Key Concepts"
* .gitignore Added
* containers.bicep Creates blob storaage container as data sink
* iothubs.bicep Creates IoT Hub
* streamingjobs.bicep Creates ASA job
* main.bicep Creates all Azure resources necessary for sample
* streamanalytics-tech-sample.asaql Sample query for filtering temperature > 27
* test/temperature_equal_to_27_degrees.json Test case data
* temperature_greater_than_27_degrees_expected.json Test case data
* temperature_greater_than_27_degrees.json Test case data
* temperature_less_than_27_degrees.json Test case data
* testConfig.json Added
  • Loading branch information
jmostella committed Apr 14, 2021
1 parent e1f946f commit 51c89c2
Show file tree
Hide file tree
Showing 21 changed files with 2,567 additions and 2 deletions.
3 changes: 3 additions & 0 deletions single_tech_samples/streamanalytics/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
output
testResultSummary.json
e2e/node_modules
133 changes: 131 additions & 2 deletions single_tech_samples/streamanalytics/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,140 @@
# Azure Stream Analytics

WIP
![](./docs/images/ASA-job.PNG)

[Azure Stream Analytics](https://azure.microsoft.com/en-us/services/stream-analytics/) is a serverless real-time analytics service. The goal of this sample is to demonstarte how to develop a streaming pipeline, with IaC and testability in mind.

## Prerequisites

## Setup

1. __Azure Cli__ Will be necessary for various tasks. Please follow instructions found [here](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli).

1. __Bicep__ This project uses `Bicep` templates to setup `Azure` infrastructure. Please follow the steps under [Install and manage via Azure CLI (easiest)](https://github.com/Azure/bicep/blob/main/docs/installing.md#install-and-manage-via-azure-cli-easiest) to install the `Azure Cli` extension.

A `Bicep` tutorial can be found [here](https://github.com/Azure/bicep/blob/main/docs/tutorial/01-simple-template.md).

1. __Raspberry Pi Azure IoT Online Simulator__ To run through the [Functional Test](#Functional%20Test) section, you can use [this](https://azure-samples.github.io/raspberry-pi-web-simulator/) simulator. Makes sure to follow the step required to edit the js code.

1. __azure-streamanalytics-cicd__ Unit testing is possible using the `npm` package [azure-streamanalytics-cicd](https://www.npmjs.com/package/azure-streamanalytics-cicd). The tool provides additional features for working with `Azure Stream Analytics` locally. Check [here](https://docs.microsoft.com/en-us/azure/stream-analytics/cicd-tools?tabs=visual-studio-code) for more info.

## Build

```bash
# Run Unit Tests
azure-streamanalytics-cicd test -project ./asaproj.json -outputPath ./output/

# Create Resource Group
az group create -n my-rg -l japaneast

# Validate Generated Template
az deployment group validate -f main.bicep -g my-rg --parameters query='@./streamanalytics-tech-sample.asaql'

# Show plan
az deployment group what-if -f main.bicep -g my-rg --parameters query='@./streamanalytics-tech-sample.asaql'
```

## Running the sample

## Key concepts
```bash
# Create Azure Resources. This will also start the job

az deployment group create -f main.bicep -g my-rg --parameters query='@./streamanalytics-tech-sample.asaql'
```

## Functional Test

```bash
# Add device

az iot hub device-identity create --hub-name iot-tech-sample-test --device-id iot-tech-sample-test --edge-enabled

# Use connection information with "Raspberry Pi Azure IoT Online Simulator": https://azure-samples.github.io/raspberry-pi-web-simulator/

az iot hub device-identity connection-string show --hub-name iot-tech-sample-test --device-id iot-tech-sample-test --output tsv
```

__Check the blob storage container to ensure that a json file exists with expected temperature sensor data (exceeding 27 degrees) is present__

## Automated End-to-End Test

```bash
export DEVICE_CONNECTION_STRING=$(az iot hub device-identity connection-string show --hub-name iot-tech-sample-test --device-id iot-tech-sample-test --output tsv)
export AZURE_STORAGE_CONNECTION_STRING=$(az storage account show-connection-string -n sttechsampletest --query connectionString -o tsv)

cd e2e
npm install
npm test
```

## Key concepts

### Infrastructure as Code

This sample's infrastructre is created using [Bicep](https://github.com/Azure/bicep#what-is-bicep) templates. `Bicep` is a DSL that is transpiled into `Azure`'s native `ARM Templates`. It's currently in preview but already has been integrated into the `Azure Cli`. Also [ARM template reference documentation](https://docs.microsoft.com/en-us/azure/templates/microsoft.devices/iothubs?tabs=bicep) lists `Bicep` along side `ARM Templates`:


![](docs/images/Bicep-doc.PNG)


Alternatively, `Terraform` can also be used.

> Key take aways for `Bicep`
>
> * Day 0 resource provider support. Any Azure resource — whether in private or public preview or GA — can be provisioned using Bicep.
> * No state or state files to manage. All state is stored in Azure, so makes it easy to collaborate and make changes to resources confidently.
> * VSCode extension for Bicep makes it extremely easy to author and get started with advanced type validation based on all Azure resource type API definitions.

Resource naming conventions attempt to follow examples from [Define your naming convention
](https://docs.microsoft.com/en-us/azure/cloud-adoption-framework/ready/azure-best-practices/resource-naming#example-names-for-common-azure-resource-types) where appropriate.

The structure of the __IaC__ scripts allow you to setup as many versions or the infrastructure as you need easily. For example an individual developer may create a version to use within their __Inner Dev Loop__:

```bash
DEVELOPER=kaa
az group create -n ${DEVELOPER}-rg -l japaneast
az deployment group create -f main.bicep -g ${DEVELOPER}-rg --parameters query='@./streamanalytics-tech-sample.asaql' name=$DEVELOPER env=inner
```

### Testing

_Unit_

`Azure Stream Analytics` can be unit tested locally via the [azure-streamanalytics-cicd](https://www.npmjs.com/package/azure-streamanalytics-cicd) npm package.

Also this package can be used as part of an Azure DevOps CI\CD pipeline. More info can be found [here](https://docs.microsoft.com/en-us/azure/stream-analytics/set-up-cicd-pipeline). This should make integrating this into your existing CI\CD fairly straight forward.

> Key take aways for `azure-streamanalytics-cicd`
>
> * Test configuration is found under __/test__ directory.
> * This is the default location for `azure-streamanalytics-cicd`.
> * __test/testConfig.json__ defines the test cases.
> * __test/temperature ... .json__ defines pipeline inputs and outputs (as `JSON`)
_Functional_

![](docs/images/InkedIoT-Simulator_LI.jpg)

Functional testing can be done using the online tool [Raspberry Pi Web Client Simulator](https://azure-samples.github.io/raspberry-pi-web-simulator/). Source code can be found [here](https://github.com/Azure-Samples/raspberry-pi-web-simulator). This is an easy way to interacte with your `Azure Streaming Analytics` pipeline.

You can manually check the blob storage to see that events are coming through correctly.

![](docs/images/BLOB-OUT.PNG)

> One aspect to consider, is that events will be batched on the scale of seconds due to the `pathPattern` defined in [streamingjobs.bicep](./streamingjobs.bicep) for `bloboutput`. This is done to aid automated testing. You can adjust during functional testing, and for production as required:
>
> _example_
> ```
> pathPattern: '{date}'
> ```
> Then you should be able to see many events per file.
_Automated End-to-End Test_
This sample combines [Azure IoT device SDK](https://www.npmjs.com/package/azure-iot-device) and [Microsoft Azure Storage SDK for JavaScript](https://www.npmjs.com/package/@azure/storage-blob) to create a Node.js (TypeScript) based End-to-End test solution. As mentioned under __Functional__ by default blob output partitioning is done on the seconds resolution to prevent the automated test from waiting an impractical amount of time. This can be made configurable according to your requirements on deploy by altering the `Bicep` template.
![](docs/images/e2e-test.PNG)
Within the test file [e2e/e2e.ts](e2e/e2e.ts) there is the `EXPECTED_E2E_LATENCY_MS` defined to be 1s. So this would also need to be adjusted for a real implementation.
5 changes: 5 additions & 0 deletions single_tech_samples/streamanalytics/asaproj.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"name": "streamanalytics",
"startFile": "streamanalytics-tech-sample.asaql",
"configurations": []
}
20 changes: 20 additions & 0 deletions single_tech_samples/streamanalytics/containers.bicep
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
param name string
param env string

resource storageAccount 'Microsoft.Storage/storageAccounts@2020-08-01-preview' = {
name: replace('st${name}${env}', '-', '')
location: resourceGroup().location
sku: {
name: 'Standard_LRS'
}
kind: 'StorageV2'
}

resource blobContainer 'Microsoft.Storage/storageAccounts/blobServices/containers@2020-08-01-preview' = {
name: '${storageAccount.name}/default/bloboutput'
}

output account object = {
accountName: storageAccount.name
accountKey: listKeys(storageAccount.id, storageAccount.apiVersion).keys[0].value
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
141 changes: 141 additions & 0 deletions single_tech_samples/streamanalytics/e2e/e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import * as chai from 'chai';
import { Mqtt } from 'azure-iot-device-mqtt';
import { Client, Message } from 'azure-iot-device';
import { BlobItem, ContainerClient } from '@azure/storage-blob';
chai.use(require('chai-subset'));

const EVENT_SINK_CONTAINER = 'bloboutput';
const EXPECTED_E2E_LATENCY_MS = 1000;
const DEVICE_ID = 'modern-data-warehouse-dataops/single_tech_samples/streamanalytics/e2e'

describe('Send to IoT Hub', () => {
let iotClient: Client;
let containerClient: ContainerClient;

before(() => {
chai.expect(process.env.DEVICE_CONNECTION_STRING).to.be.not.empty;
chai.expect(process.env.AZURE_STORAGE_CONNECTION_STRING).to.be.not.empty;
});

before(async () => {
iotClient = Client.fromConnectionString(process.env.DEVICE_CONNECTION_STRING, Mqtt);
await iotClient.open();
});

before(() => {
containerClient = new ContainerClient(process.env.AZURE_STORAGE_CONNECTION_STRING, EVENT_SINK_CONTAINER);
});

before(async () => {
await deleteAllBlobs();
});

after(async () => {
await iotClient.close();
});

afterEach(async () => {
await deleteAllBlobs();
});

async function send(message: Message) {
await iotClient.sendEvent(message);
};

async function deleteAllBlobs() {
for await (const blob of containerClient.listBlobsFlat()) {
await containerClient.deleteBlob(blob.name);
}
}

async function getFirstBlob(): Promise<BlobItem> {
for await (const blob of containerClient.listBlobsFlat()) {
return blob;
}
}

async function getBlobData(blob: BlobItem): Promise<string> {
const client = containerClient.getBlockBlobClient(blob.name);
const response = await client.download();
return (await streamToBuffer(response.readableStreamBody!)).toString();
}

function convertBlobData(blobData: string): any[] {
return blobData.split('\n').map(entry => JSON.parse(entry));
}

// A helper method used to read a Node.js readable stream into a Buffer
async function streamToBuffer(readableStream: NodeJS.ReadableStream): Promise<Buffer> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
readableStream.on("data", (data: Buffer | string) => {
chunks.push(data instanceof Buffer ? data : Buffer.from(data));
});
readableStream.on("end", () => {
resolve(Buffer.concat(chunks));
});
readableStream.on("error", reject);
});
}

async function delay(factor = 1) {
await new Promise(resolve => setTimeout(resolve, EXPECTED_E2E_LATENCY_MS * factor));
}

describe('payload with temperature', () => {
describe('greater than 27 degrees', () => {
it('should contain entry in blob', async () => {
const data = {
deviceId: DEVICE_ID,
temperature: 27.1
};
const message = new Message(JSON.stringify(data));

await send(message);

await delay();

const blob = await getFirstBlob();
chai.expect(blob).to.not.be.undefined;
const blobData = await getBlobData(blob);
const entries = convertBlobData(blobData);
chai.expect(entries).to.have.length(1);
chai.expect(entries).to.containSubset([data]);
});
});

describe('equal to 27 degrees', () => {
it('should not contain entry in blob', async () => {
const data = {
deviceId: DEVICE_ID,
temperature: 27
};
const message = new Message(JSON.stringify(data));

await send(message);

await delay(1.5);

const blob = await getFirstBlob();
chai.expect(blob).to.be.undefined;
});
});

describe('less than 27 degrees', () => {
it('should not contain entry in blob', async () => {
const data = {
deviceId: DEVICE_ID,
temperature: 26.9
};
const message = new Message(JSON.stringify(data));

await send(message);

await delay(1.5);

const blob = await getFirstBlob();
chai.expect(blob).to.be.undefined;
});
});
});
});
Loading

0 comments on commit 51c89c2

Please sign in to comment.