Skip to content

Commit

Permalink
azurerm_stream_analytics_job: add support for job_storage_account (
Browse files Browse the repository at this point in the history
  • Loading branch information
jiaweitao001 authored Nov 3, 2022
1 parent c364452 commit 4907c16
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 0 deletions.
85 changes: 85 additions & 0 deletions internal/services/streamanalytics/stream_analytics_job_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/Azure/azure-sdk-for-go/services/streamanalytics/mgmt/2020-03-01/streamanalytics"
"github.com/hashicorp/go-azure-helpers/resourcemanager/commonschema"
"github.com/hashicorp/go-azure-helpers/resourcemanager/identity"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-provider-azurerm/helpers/azure"
"github.com/hashicorp/terraform-provider-azurerm/helpers/tf"
"github.com/hashicorp/terraform-provider-azurerm/internal/clients"
Expand Down Expand Up @@ -129,6 +130,47 @@ func resourceStreamAnalyticsJob() *pluginsdk.Resource {
ValidateFunc: validate.StreamAnalyticsJobStreamingUnits,
},

"content_storage_policy": {
Type: pluginsdk.TypeString,
Optional: true,
Default: string(streamanalytics.ContentStoragePolicySystemAccount),
ValidateFunc: validation.StringInSlice([]string{
string(streamanalytics.ContentStoragePolicySystemAccount),
string(streamanalytics.ContentStoragePolicyJobStorageAccount),
}, false),
},

"job_storage_account": {
Type: pluginsdk.TypeList,
Optional: true,
Elem: &pluginsdk.Resource{
Schema: map[string]*schema.Schema{
"authentication_mode": {
Type: pluginsdk.TypeString,
Required: true,
ValidateFunc: validation.StringInSlice([]string{
string(streamanalytics.AuthenticationModeConnectionString),
string(streamanalytics.AuthenticationModeMsi),
string(streamanalytics.AuthenticationModeUserToken),
}, false),
},

"account_name": {
Type: pluginsdk.TypeString,
Required: true,
ValidateFunc: validation.StringIsNotEmpty,
},

"account_key": {
Type: pluginsdk.TypeString,
Required: true,
Sensitive: true,
ValidateFunc: validation.StringIsNotEmpty,
},
},
},
},

"transformation_query": {
Type: pluginsdk.TypeString,
Required: true,
Expand Down Expand Up @@ -182,6 +224,7 @@ func resourceStreamAnalyticsJobCreateUpdate(d *pluginsdk.ResourceData, meta inte
location := azure.NormalizeLocation(d.Get("location").(string))
outputErrorPolicy := d.Get("output_error_policy").(string)
transformationQuery := d.Get("transformation_query").(string)
contentStoragePolicy := d.Get("content_storage_policy").(string)
t := d.Get("tags").(map[string]interface{})

// needs to be defined inline for a Create but via a separate API for Update
Expand Down Expand Up @@ -216,6 +259,7 @@ func resourceStreamAnalyticsJobCreateUpdate(d *pluginsdk.ResourceData, meta inte
Sku: &streamanalytics.Sku{
Name: streamanalytics.SkuNameStandard,
},
ContentStoragePolicy: streamanalytics.ContentStoragePolicy(contentStoragePolicy),
CompatibilityLevel: streamanalytics.CompatibilityLevel(compatibilityLevel),
EventsLateArrivalMaxDelayInSeconds: utils.Int32(int32(eventsLateArrivalMaxDelayInSeconds)),
EventsOutOfOrderMaxDelayInSeconds: utils.Int32(int32(eventsOutOfOrderMaxDelayInSeconds)),
Expand All @@ -227,6 +271,14 @@ func resourceStreamAnalyticsJobCreateUpdate(d *pluginsdk.ResourceData, meta inte
Tags: tags.Expand(t),
}

if contentStoragePolicy == string(streamanalytics.ContentStoragePolicyJobStorageAccount) {
if v, ok := d.GetOk("job_storage_account"); ok {
props.JobStorageAccount = expandJobStorageAccount(v.([]interface{}))
} else {
return fmt.Errorf("`job_storage_account` must be set when `content_storage_policy` is `JobStorageAccount`")
}
}

if jobType == string(streamanalytics.JobTypeEdge) {
if _, ok := d.GetOk("stream_analytics_cluster_id"); ok {
return fmt.Errorf("the job type `Edge` doesn't support `stream_analytics_cluster_id`")
Expand Down Expand Up @@ -327,6 +379,8 @@ func resourceStreamAnalyticsJobRead(d *pluginsdk.ResourceData, meta interface{})
d.Set("events_out_of_order_policy", string(props.EventsOutOfOrderPolicy))
d.Set("output_error_policy", string(props.OutputErrorPolicy))
d.Set("type", string(props.JobType))
d.Set("content_storage_policy", string(props.ContentStoragePolicy))
d.Set("job_storage_account", flattenJobStorageAccount(d, props.JobStorageAccount))

// Computed
d.Set("job_id", props.JobID)
Expand Down Expand Up @@ -412,3 +466,34 @@ func flattenJobIdentity(identity *streamanalytics.Identity) []interface{} {
},
}
}

func expandJobStorageAccount(input []interface{}) *streamanalytics.JobStorageAccount {
if input == nil {
return nil
}

v := input[0].(map[string]interface{})
authenticationMode := v["authentication_mode"].(string)
accountName := v["account_name"].(string)
accountKey := v["account_key"].(string)

return &streamanalytics.JobStorageAccount{
AuthenticationMode: streamanalytics.AuthenticationMode(authenticationMode),
AccountName: utils.String(accountName),
AccountKey: utils.String(accountKey),
}
}

func flattenJobStorageAccount(d *pluginsdk.ResourceData, input *streamanalytics.JobStorageAccount) []interface{} {
if input == nil {
return []interface{}{}
}

return []interface{}{
map[string]interface{}{
"authentication_mode": string(input.AuthenticationMode),
"account_name": *input.AccountName,
"account_key": d.Get("job_storage_account.0.account_key").(string),
},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,21 @@ func TestAccStreamAnalyticsJob_jobTypeEdge(t *testing.T) {
})
}

func TestAccStreamAnalyticsJob_jobStorageAccount(t *testing.T) {
data := acceptance.BuildTestData(t, "azurerm_stream_analytics_job", "test")
r := StreamAnalyticsJobResource{}

data.ResourceTest(t, r, []acceptance.TestStep{
{
Config: r.jobStorageAccount(data),
Check: acceptance.ComposeTestCheckFunc(
check.That(data.ResourceName).ExistsInAzure(r),
),
},
data.ImportStep("job_storage_account.0.account_key"),
})
}

func (r StreamAnalyticsJobResource) Exists(ctx context.Context, client *clients.Client, state *pluginsdk.InstanceState) (*bool, error) {
id, err := parse.StreamingJobID(state.ID)
if err != nil {
Expand Down Expand Up @@ -361,3 +376,48 @@ QUERY
}
`, data.RandomInteger, data.Locations.Primary, data.RandomInteger)
}

func (r StreamAnalyticsJobResource) jobStorageAccount(data acceptance.TestData) string {
return fmt.Sprintf(`
provider "azurerm" {
features {}
}
resource "azurerm_resource_group" "test" {
name = "acctestRG-%d"
location = "%[3]s"
}
resource "azurerm_storage_account" "test" {
name = "acctestacc%[2]s"
resource_group_name = azurerm_resource_group.test.name
location = azurerm_resource_group.test.location
account_tier = "Standard"
account_replication_type = "LRS"
}
resource "azurerm_stream_analytics_job" "test" {
name = "acctestjob-%[4]d"
resource_group_name = azurerm_resource_group.test.name
location = azurerm_resource_group.test.location
streaming_units = 3
content_storage_policy = "JobStorageAccount"
job_storage_account {
authentication_mode = "ConnectionString"
account_name = azurerm_storage_account.test.name
account_key = azurerm_storage_account.test.primary_access_key
}
tags = {
environment = "Test"
}
transformation_query = <<QUERY
SELECT *
INTO [YourOutputAlias]
FROM [YourInputAlias]
QUERY
}
`, data.RandomInteger, data.RandomString, data.Locations.Primary, data.RandomInteger)
}
14 changes: 14 additions & 0 deletions website/docs/r/stream_analytics_job.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ The following arguments are supported:

-> **NOTE:** `streaming_units` must be set when `type` is `Cloud`.

* `content_storage_policy` - (Optional) The policy for storing stream analytics content. Possible values are `JobStorageAccount`, `SystemAccount`.

* `job_storage_account` - (Optional) The details of the job storage account. A `job_storage_account` block as defined below.

---

* `authentication_mode` - (Required) The authentication mode of the storage account. Possible values are `ConnectionString`, `Msi` and `UserToken`.

* `account_name` - (Required) The name of the Azure storage account.

* `account_key` - (Required) The account key for the Azure storage account.

---

* `transformation_query` - (Required) Specifies the query that will be run in the streaming job, [written in Stream Analytics Query Language (SAQL)](https://msdn.microsoft.com/library/azure/dn834998).

* `tags` - A mapping of tags assigned to the resource.
Expand Down

0 comments on commit 4907c16

Please sign in to comment.