Skip to content

Commit

Permalink
[Cosmos] add preferred regions logging (#22598)
Browse files Browse the repository at this point in the history
* add new client option

* add client config logs

* remove new option

* nil fix

* fix

* log client preferred regions on every request

* move logging logic to policy

* change to 1.21

* Revert "move logging logic to policy"

This reverts commit 58be1ca.

* move logging logic, add gem to tests

* Update go.mod

* Update ci.yml

* Update ci.yml

* Update ci.yml
  • Loading branch information
simorenoh authored Mar 19, 2024
1 parent 3a7a4e7 commit b59b15c
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 36 deletions.
5 changes: 3 additions & 2 deletions sdk/data/azcosmos/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extends:
parameters:
ServiceDirectory: 'data/azcosmos'
UsePipelineProxy: false
ExcludeGoNMinus2: true
AdditionalStages:
- stage: Emulator
displayName: 'Cosmos Emulator'
Expand Down Expand Up @@ -59,7 +60,7 @@ extends:

- task: GoTool@0
inputs:
version: '$(go.version)'
version: '1.22.0'
displayName: "Select Go Version"

- template: /eng/pipelines/templates/steps/create-go-workspace.yml@self
Expand All @@ -68,6 +69,6 @@ extends:
parameters:
ServiceDirectory: 'data/azcosmos'
Image: $(vm.image)
GoVersion: $(go.version)
GoVersion: '1.22.0'
EnvVars:
EMULATOR: 'true'
4 changes: 3 additions & 1 deletion sdk/data/azcosmos/cosmos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
)

const (
Expand Down Expand Up @@ -70,7 +72,6 @@ func NewClient(endpoint string, cred azcore.TokenCredential, o *ClientOptions) (
if err != nil {
return nil, err
}

return &Client{endpoint: endpoint, pipeline: newPipeline(newCosmosBearerTokenPolicy(cred, scope, nil), gem, o), gem: gem}, nil
}

Expand Down Expand Up @@ -471,6 +472,7 @@ func (c *Client) attachContent(content interface{}, req *policy.Request) error {
}

func (c *Client) executeAndEnsureSuccessResponse(request *policy.Request) (*http.Response, error) {
log.Write(azlog.EventResponse, fmt.Sprintf("\n===== Client preferred regions:\n%v\n=====\n", c.gem.preferredLocations))
response, err := c.pipeline.Do(request)
if err != nil {
return nil, err
Expand Down
2 changes: 0 additions & 2 deletions sdk/data/azcosmos/cosmos_client_retry_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package azcosmos
import (
"context"
"encoding/json"
"fmt"
"net"
"net/url"
"testing"
Expand Down Expand Up @@ -366,7 +365,6 @@ func TestReadServiceUnavailable(t *testing.T) {
// Request should retry twice and then succeed (2 preferred regions)
assert.NoError(t, err)
assert.True(t, retryPolicy.retryCount == 2)
fmt.Println("we here 1")

// Setting up responses for retrying and failing
srv.AppendResponse(
Expand Down
45 changes: 30 additions & 15 deletions sdk/data/azcosmos/cosmos_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func TestEnsureErrorIsGeneratedOnResponse(t *testing.T) {
mock.WithStatusCode(404))

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -128,7 +129,8 @@ func TestEnsureErrorIsNotGeneratedOnResponse(t *testing.T) {
mock.WithStatusCode(200))

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand All @@ -146,7 +148,8 @@ func TestRequestEnricherIsCalled(t *testing.T) {
mock.WithStatusCode(200))

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand All @@ -173,7 +176,8 @@ func TestNoOptionsIsCalled(t *testing.T) {
mock.WithStatusCode(200))

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand All @@ -190,7 +194,8 @@ func TestAttachContent(t *testing.T) {
defer close()

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -241,7 +246,8 @@ func TestCreateRequest(t *testing.T) {
srv, close := mock.NewTLSServer()
defer close()
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -285,7 +291,8 @@ func TestSendDelete(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand All @@ -308,7 +315,8 @@ func TestSendGet(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand All @@ -331,7 +339,8 @@ func TestSendPut(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -364,7 +373,8 @@ func TestSendPost(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -397,7 +407,8 @@ func TestSendQuery(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -432,7 +443,8 @@ func TestSendQueryWithParameters(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -474,7 +486,8 @@ func TestSendBatch(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDocument,
resourceAddress: "",
Expand Down Expand Up @@ -518,7 +531,8 @@ func TestSendPatch(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -594,7 +608,8 @@ func TestQueryDatabases(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

receivedIds := []string{}
queryPager := client.NewQueryDatabasesPager("select * from c", nil)
Expand Down
27 changes: 18 additions & 9 deletions sdk/data/azcosmos/cosmos_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func TestContainerRead(t *testing.T) {
mock.WithStatusCode(200))

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -123,7 +124,8 @@ func TestContainerDeleteItem(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -176,7 +178,8 @@ func TestContainerReadItem(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -233,7 +236,8 @@ func TestContainerReplaceItem(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -294,7 +298,8 @@ func TestContainerUpsertItem(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -359,7 +364,8 @@ func TestContainerCreateItem(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -437,7 +443,8 @@ func TestContainerQueryItems(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -544,7 +551,8 @@ func TestContainerExecuteBatch(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -607,7 +615,8 @@ func TestContainerPatchItem(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down
3 changes: 2 additions & 1 deletion sdk/data/azcosmos/cosmos_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func TestDatabaseQueryContainers(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)

Expand Down
8 changes: 2 additions & 6 deletions sdk/data/azcosmos/cosmos_global_endpoint_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ func TestGlobalEndpointManagerMarkEndpointUnavailableForRead(t *testing.T) {

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})

client := &Client{endpoint: srv.URL(), pipeline: pl}

endpoint, err := url.Parse(client.endpoint)
endpoint, err := url.Parse(srv.URL())
assert.NoError(t, err)

gem, err := newGlobalEndpointManager(srv.URL(), pl, []string{"West US", "Central US"}, 5*time.Minute, true)
Expand All @@ -101,9 +99,7 @@ func TestGlobalEndpointManagerMarkEndpointUnavailableForWrite(t *testing.T) {

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})

client := &Client{endpoint: srv.URL(), pipeline: pl}

endpoint, err := url.Parse(client.endpoint)
endpoint, err := url.Parse(srv.URL())
assert.NoError(t, err)

gem, err := newGlobalEndpointManager(srv.URL(), pl, []string{"West US", "Central US"}, 5*time.Minute, true)
Expand Down

0 comments on commit b59b15c

Please sign in to comment.