Skip to content

Commit

Permalink
feat: add event streams mirroring config support
Browse files Browse the repository at this point in the history
  • Loading branch information
Srikant Sahu authored and Srikant Sahu committed Oct 18, 2024
1 parent 87c26c0 commit 2cee06a
Show file tree
Hide file tree
Showing 8 changed files with 517 additions and 39 deletions.
50 changes: 50 additions & 0 deletions examples/ibm-event-streams/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,53 @@ resource "ibm_resource_tag" "tag_example_on_es" {
tag_type = "access"
resource_id = data.ibm_resource_instance.es_instance_5.id
}

#### Scenario 6: Create a target Event Streams service instance with mirroring enabled and its mirroring config
data "ibm_resource_instance" "es_instance_source" {
name = "terraform-integration-source"
resource_group_id = data.ibm_resource_group.group.id
}
# setup s2s at service level for mirroring to work
resource "ibm_iam_authorization_policy" "service-policy" {
source_service_name = "messagehub"
target_service_name = "messagehub"
roles = ["Reader"]
description = "test mirroring setup via terraform"
}

resource "ibm_resource_instance" "es_instance_target" {
name = "terraform-integration-target"
service = "messagehub"
plan = "enterprise-3nodes-2tb"
location = "us-south"
resource_group_id = data.ibm_resource_group.group.id
parameters_json = jsonencode(
{
mirroring = {
source_crn = data.ibm_resource_instance.es_instance_source.id
source_alias = "source-alias"
target_alias = "target-alias"
}
}
)
timeouts {
create = "3h"
update = "1h"
delete = "15m"
}
}
# Configure a service-to-service binding between both instances to allow both instances to communicate.
resource "ibm_iam_authorization_policy" "instance_policy" {
source_service_name = "messagehub"
source_resource_instance_id = ibm_resource_instance.es_instance_target.guid
target_service_name = "messagehub"
target_resource_instance_id = data.ibm_resource_instance.es_instance_source.guid
roles = ["Reader"]
description = "test mirroring setup via terraform"
}

# Select some topics from the source cluster to mirror.
resource "ibm_event_streams_mirroring_config" "es_mirroring_config" {
resource_instance_id = ibm_resource_instance.es_instance_target.id
mirroring_topic_patterns = ["topicA", "topicB"]
}
2 changes: 2 additions & 0 deletions ibm/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ func Provider() *schema.Provider {
"ibm_event_streams_topic": eventstreams.DataSourceIBMEventStreamsTopic(),
"ibm_event_streams_schema": eventstreams.DataSourceIBMEventStreamsSchema(),
"ibm_event_streams_quota": eventstreams.DataSourceIBMEventStreamsQuota(),
"ibm_event_streams_mirroring_config": eventstreams.DataSourceIBMEventStreamsMirroringConfig(),
"ibm_hpcs": hpcs.DataSourceIBMHPCS(),
"ibm_hpcs_managed_key": hpcs.DataSourceIbmManagedKey(),
"ibm_hpcs_key_template": hpcs.DataSourceIbmKeyTemplate(),
Expand Down Expand Up @@ -1114,6 +1115,7 @@ func Provider() *schema.Provider {
"ibm_event_streams_topic": eventstreams.ResourceIBMEventStreamsTopic(),
"ibm_event_streams_schema": eventstreams.ResourceIBMEventStreamsSchema(),
"ibm_event_streams_quota": eventstreams.ResourceIBMEventStreamsQuota(),
"ibm_event_streams_mirroring_config": eventstreams.ResourceIBMEventStreamsMirroringConfig(),
"ibm_firewall": classicinfrastructure.ResourceIBMFirewall(),
"ibm_firewall_policy": classicinfrastructure.ResourceIBMFirewallPolicy(),
"ibm_hpcs": hpcs.ResourceIBMHPCS(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright IBM Corp. 2024 All Rights Reserved.
// Licensed under the Mozilla Public License v2.0

package eventstreams

import (
"context"
"fmt"
"log"
"strings"

"github.com/IBM-Cloud/terraform-provider-ibm/ibm/conns"
"github.com/IBM-Cloud/terraform-provider-ibm/ibm/flex"
"github.com/IBM/eventstreams-go-sdk/pkg/adminrestv1"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

const (
mirroringConfigResourceType = "mirroring-config"
)

// The mirroring config for topic selection in an Event Streams service instance.
// The ID is the CRN with the last two components "mirroring-config:".
// The mirroring topic patterns defines the topic selection.
func DataSourceIBMEventStreamsMirroringConfig() *schema.Resource {
return &schema.Resource{
ReadContext: dataSourceIBMEventStreamsMirroringConfigRead,
Schema: map[string]*schema.Schema{
"resource_instance_id": {
Type: schema.TypeString,
Required: true,
Description: "The ID or CRN of the Event Streams service instance",
},
"mirroring_topic_patterns": {
Type: schema.TypeList,
Computed: true,
Description: "The topic pattern to use for mirroring",
Elem: &schema.Schema{Type: schema.TypeString},
},
},
}

}

// read mirroring config using the admin-rest API
func dataSourceIBMEventStreamsMirroringConfigRead(context context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
adminrestClient, err := meta.(conns.ClientSession).ESadminRestSession()
if err != nil {
tfErr := flex.TerraformErrorf(err, "Error getting Event Streams admin rest session", "ibm_event_streams_mirroring_config", "read")
log.Printf("[DEBUG]\n%s", tfErr.GetDebugMessage())
return tfErr.GetDiag()
}

adminURL, instanceCRN, err := getMirroringConfigInstanceURL(d, meta)
if err != nil {
tfErr := flex.TerraformErrorf(err, "Error getting Event Streams mirroring config URL", "ibm_event_streams_mirroring_config", "read")
log.Printf("[DEBUG]\n%s", tfErr.GetDebugMessage())
return tfErr.GetDiag()
}
adminrestClient.SetServiceURL(adminURL)

getMirroringConfigOptions := &adminrestv1.GetMirroringTopicSelectionOptions{}
mirroringConfig, _, err := adminrestClient.GetMirroringTopicSelectionWithContext(context, getMirroringConfigOptions)
if err != nil {
tfErr := flex.TerraformErrorf(err, "GetMirroringTopicSelection returned error", "ibm_event_streams_mirroring_config", "read")
log.Printf("[DEBUG]\n%s", tfErr.GetDebugMessage())
return tfErr.GetDiag()
}
if mirroringConfig == nil {
tfErr := flex.TerraformErrorf(err, "Unexpected nil config when getting mirroring config", "ibm_event_streams_mirroring_config", "read")
log.Printf("[DEBUG]\n%s", tfErr.GetDebugMessage())
return tfErr.GetDiag()
}
d.SetId(getMirroringConfigID(instanceCRN))
d.Set("resource_instance_id", instanceCRN)
d.Set("mirroring_topic_patterns", mirroringConfig.Includes)
return nil
}

func getMirroringConfigInstanceURL(d *schema.ResourceData, meta interface{}) (string, string, error) {
instanceCRN := d.Get("resource_instance_id").(string)
if instanceCRN == "" { // importing
id := d.Id()
crnSegments := strings.Split(id, ":")
if len(crnSegments) != 10 || crnSegments[8] != mirroringConfigResourceType {
return "", "", fmt.Errorf("ID '%s' is not a mirroring config resource", id)
}
crnSegments[8] = ""
crnSegments[9] = ""
instanceCRN = strings.Join(crnSegments, ":")
d.Set("resource_instance_id", instanceCRN)
}

instance, err := getInstanceDetails(instanceCRN, meta)
if err != nil {
return "", "", err
}
adminURL := instance.Extensions["kafka_http_url"].(string)
planID := *instance.ResourcePlanID
valid := strings.Contains(planID, "enterprise")
if !valid {
return "", "", fmt.Errorf("mirroring config is not supported by the Event Streams %s plan, enterprise plan is expected",
planID)
}
return adminURL, instanceCRN, nil
}

func getMirroringConfigID(instanceCRN string) string {
crnSegments := strings.Split(instanceCRN, ":")
crnSegments[8] = mirroringConfigResourceType
crnSegments[9] = ""
return strings.Join(crnSegments, ":")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright IBM Corp. 2024 All Rights Reserved.
// Licensed under the Mozilla Public License v2.0

package eventstreams_test

import (
"fmt"
"strings"
"testing"

acc "github.com/IBM-Cloud/terraform-provider-ibm/ibm/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
)

func TestAccIBMEventStreamsMirroringConfigDataSource(t *testing.T) {
instanceName := "ES Integration Pipeline MZR"
resource.Test(t, resource.TestCase{
PreCheck: func() { acc.TestAccPreCheck(t) },
Providers: acc.TestAccProviders,
Steps: []resource.TestStep{
{
Config: testAccCheckIBMEventStreamsMirroringConfigDataSource(instanceName),
Check: resource.ComposeTestCheckFunc(
testAccCheckIBMEventStreamsMirroringConfigProperties("data.ibm_event_streams_mirroring_config.es_mirroring_config"),
resource.TestCheckResourceAttr("data.ibm_event_streams_mirroring_config.es_mirroring_config", "mirroring_topic_patterns.#", "1"),
resource.TestCheckResourceAttr("data.ibm_event_streams_mirroring_config.es_mirroring_config", "mirroring_topic_patterns.0", ".*"),
),
},
},
})
}

// check properties of the mirroring config data source or resource object
func testAccCheckIBMEventStreamsMirroringConfigProperties(name string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[name]
if !ok {
return fmt.Errorf("Not found: %s", name)
}
mcID := rs.Primary.ID
if mcID == "" {
return fmt.Errorf("[ERROR] Mirroring config ID is not set")
}
if !strings.HasSuffix(mcID, ":mirroring-config:") {
return fmt.Errorf("[ERROR] Mirroring config ID %s not expected CRN", mcID)
}
return nil
}
}

func testAccCheckIBMEventStreamsMirroringConfigDataSource(instanceName string) string {
return fmt.Sprintf(`
data "ibm_resource_group" "group" {
is_default = true
}
data "ibm_resource_instance" "es_instance" {
resource_group_id = data.ibm_resource_group.group.id
name = "%s"
}
data "ibm_event_streams_mirroring_config" "es_mirroring_config" {
resource_instance_id = data.ibm_resource_instance.es_instance.id
}`, instanceName)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright IBM Corp. 2024 All Rights Reserved.
// Licensed under the Mozilla Public License v2.0

package eventstreams

import (
"context"
"log"

"github.com/IBM-Cloud/terraform-provider-ibm/ibm/conns"
"github.com/IBM-Cloud/terraform-provider-ibm/ibm/flex"
"github.com/IBM/eventstreams-go-sdk/pkg/adminrestv1"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

// The mirroring config for topic selection in an Event Streams service instance.
// The ID is the CRN with the last two components "mirroring-config:".
// The mirroring topic patterns defines the topic selection.
func ResourceIBMEventStreamsMirroringConfig() *schema.Resource {
return &schema.Resource{
CreateContext: resourceIBMEventStreamsMirroringConfigUpdate,
ReadContext: resourceIBMEventStreamsMirroringConfigRead,
UpdateContext: resourceIBMEventStreamsMirroringConfigUpdate,
DeleteContext: resourceIBMEventStreamsMirroringConfigDelete,
Importer: &schema.ResourceImporter{},
Schema: map[string]*schema.Schema{
"resource_instance_id": {
Type: schema.TypeString,
Required: true,
Description: "The ID or CRN of the Event Streams service instance",
},
"mirroring_topic_patterns": {
Type: schema.TypeList,
Required: true,
Description: "The topic pattern to use for mirroring",
Elem: &schema.Schema{Type: schema.TypeString},
},
},
}
}

func resourceIBMEventStreamsMirroringConfigRead(context context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
return dataSourceIBMEventStreamsMirroringConfigRead(context, d, meta)
}

// The mirroring topic selection for a mirroring enabled instance is always replaced,
// so create and update have the same behavior
func resourceIBMEventStreamsMirroringConfigUpdate(context context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
adminrestClient, err := meta.(conns.ClientSession).ESadminRestSession()
if err != nil {
tfErr := flex.TerraformErrorf(err, "Error getting Event Streams admin rest session", "ibm_event_streams_mirroring_config", "update")
log.Printf("[DEBUG]\n%s", tfErr.GetDebugMessage())
return tfErr.GetDiag()
}

adminURL, _, err := getMirroringConfigInstanceURL(d, meta)
if err != nil {
tfErr := flex.TerraformErrorf(err, "Error getting Event Streams mirroring config URL", "ibm_event_streams_mirroring_config", "update")
log.Printf("[DEBUG]\n%s", tfErr.GetDebugMessage())
return tfErr.GetDiag()
}
adminrestClient.SetServiceURL(adminURL)
mirroringOptions := &adminrestv1.ReplaceMirroringTopicSelectionOptions{}
mirroringOptions.SetIncludes(flex.ExpandStringList(d.Get("mirroring_topic_patterns").([]interface{})))

_, _, err = adminrestClient.ReplaceMirroringTopicSelectionWithContext(context, mirroringOptions)
if err != nil {
tfErr := flex.TerraformErrorf(err, "ReplaceMirroringTopicSelection returned error", "ibm_event_streams_mirroring_config", "update")
log.Printf("[DEBUG]\n%s", tfErr.GetDebugMessage())
return tfErr.GetDiag()
}
return resourceIBMEventStreamsMirroringConfigRead(context, d, meta)
}

// The mirroring config can't be deleted, but we reset with an empty list.
func resourceIBMEventStreamsMirroringConfigDelete(context context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
adminrestClient, err := meta.(conns.ClientSession).ESadminRestSession()
if err != nil {
tfErr := flex.TerraformErrorf(err, "Error getting Event Streams admin rest session", "ibm_event_streams_mirroring_config", "delete")
log.Printf("[DEBUG]\n%s", tfErr.GetDebugMessage())
return tfErr.GetDiag()
}

adminURL, _, err := getMirroringConfigInstanceURL(d, meta)
if err != nil {
tfErr := flex.TerraformErrorf(err, "Error getting Event Streams mirroring config URL", "ibm_event_streams_mirroring_config", "delete")
log.Printf("[DEBUG]\n%s", tfErr.GetDebugMessage())
return tfErr.GetDiag()
}
adminrestClient.SetServiceURL(adminURL)
mirroringOptions := &adminrestv1.ReplaceMirroringTopicSelectionOptions{}
mirroringOptions.SetIncludes([]string{})
_, _, err = adminrestClient.ReplaceMirroringTopicSelectionWithContext(context, mirroringOptions)
if err != nil {
tfErr := flex.TerraformErrorf(err, "ReplaceMirroringTopicSelection returned error", "ibm_event_streams_mirroring_config", "delete")
log.Printf("[DEBUG]\n%s", tfErr.GetDebugMessage())
return tfErr.GetDiag()
}
d.SetId("")
return nil
}
Loading

0 comments on commit 2cee06a

Please sign in to comment.