Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] add preferred regions logging #22598

Merged
merged 15 commits into from
Mar 19, 2024
2 changes: 2 additions & 0 deletions sdk/data/azcosmos/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extends:
parameters:
ServiceDirectory: 'data/azcosmos'
UsePipelineProxy: false
ExcludeGoNMinus2: true
AdditionalStages:
- stage: Emulator
displayName: 'Cosmos Emulator'
Expand All @@ -38,6 +39,7 @@ extends:
pool:
name: $(WINDOWSPOOL)
image: $(WINDOWSVMIMAGE)
go.version: '1.22.0'
ealsur marked this conversation as resolved.
Show resolved Hide resolved
os: windows

steps:
Expand Down
4 changes: 3 additions & 1 deletion sdk/data/azcosmos/cosmos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
)

const (
Expand Down Expand Up @@ -70,7 +72,6 @@ func NewClient(endpoint string, cred azcore.TokenCredential, o *ClientOptions) (
if err != nil {
return nil, err
}

return &Client{endpoint: endpoint, pipeline: newPipeline(newCosmosBearerTokenPolicy(cred, scope, nil), gem, o), gem: gem}, nil
}

Expand Down Expand Up @@ -471,6 +472,7 @@ func (c *Client) attachContent(content interface{}, req *policy.Request) error {
}

func (c *Client) executeAndEnsureSuccessResponse(request *policy.Request) (*http.Response, error) {
log.Write(azlog.EventResponse, fmt.Sprintf("\n===== Client preferred regions:\n%v\n=====\n", c.gem.preferredLocations))
response, err := c.pipeline.Do(request)
if err != nil {
return nil, err
Expand Down
2 changes: 0 additions & 2 deletions sdk/data/azcosmos/cosmos_client_retry_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package azcosmos
import (
"context"
"encoding/json"
"fmt"
"net"
"net/url"
"testing"
Expand Down Expand Up @@ -366,7 +365,6 @@ func TestReadServiceUnavailable(t *testing.T) {
// Request should retry twice and then succeed (2 preferred regions)
assert.NoError(t, err)
assert.True(t, retryPolicy.retryCount == 2)
fmt.Println("we here 1")

// Setting up responses for retrying and failing
srv.AppendResponse(
Expand Down
45 changes: 30 additions & 15 deletions sdk/data/azcosmos/cosmos_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func TestEnsureErrorIsGeneratedOnResponse(t *testing.T) {
mock.WithStatusCode(404))

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -128,7 +129,8 @@ func TestEnsureErrorIsNotGeneratedOnResponse(t *testing.T) {
mock.WithStatusCode(200))

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand All @@ -146,7 +148,8 @@ func TestRequestEnricherIsCalled(t *testing.T) {
mock.WithStatusCode(200))

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand All @@ -173,7 +176,8 @@ func TestNoOptionsIsCalled(t *testing.T) {
mock.WithStatusCode(200))

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand All @@ -190,7 +194,8 @@ func TestAttachContent(t *testing.T) {
defer close()

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -241,7 +246,8 @@ func TestCreateRequest(t *testing.T) {
srv, close := mock.NewTLSServer()
defer close()
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -285,7 +291,8 @@ func TestSendDelete(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand All @@ -308,7 +315,8 @@ func TestSendGet(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand All @@ -331,7 +339,8 @@ func TestSendPut(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -364,7 +373,8 @@ func TestSendPost(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -397,7 +407,8 @@ func TestSendQuery(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -432,7 +443,8 @@ func TestSendQueryWithParameters(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -474,7 +486,8 @@ func TestSendBatch(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDocument,
resourceAddress: "",
Expand Down Expand Up @@ -518,7 +531,8 @@ func TestSendPatch(t *testing.T) {
mock.WithStatusCode(200))
verifier := pipelineVerifier{}
pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}
operationContext := pipelineRequestOptions{
resourceType: resourceTypeDatabase,
resourceAddress: "",
Expand Down Expand Up @@ -594,7 +608,8 @@ func TestQueryDatabases(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

receivedIds := []string{}
queryPager := client.NewQueryDatabasesPager("select * from c", nil)
Expand Down
27 changes: 18 additions & 9 deletions sdk/data/azcosmos/cosmos_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func TestContainerRead(t *testing.T) {
mock.WithStatusCode(200))

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -123,7 +124,8 @@ func TestContainerDeleteItem(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -176,7 +178,8 @@ func TestContainerReadItem(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -233,7 +236,8 @@ func TestContainerReplaceItem(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -294,7 +298,8 @@ func TestContainerUpsertItem(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -359,7 +364,8 @@ func TestContainerCreateItem(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -437,7 +443,8 @@ func TestContainerQueryItems(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -544,7 +551,8 @@ func TestContainerExecuteBatch(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down Expand Up @@ -607,7 +615,8 @@ func TestContainerPatchItem(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)
container, _ := newContainer("containerId", database)
Expand Down
3 changes: 2 additions & 1 deletion sdk/data/azcosmos/cosmos_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func TestDatabaseQueryContainers(t *testing.T) {
verifier := pipelineVerifier{}

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv})
client := &Client{endpoint: srv.URL(), pipeline: pl}
gem := &globalEndpointManager{preferredLocations: []string{}}
client := &Client{endpoint: srv.URL(), pipeline: pl, gem: gem}

database, _ := newDatabase("databaseId", client)

Expand Down
8 changes: 2 additions & 6 deletions sdk/data/azcosmos/cosmos_global_endpoint_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ func TestGlobalEndpointManagerMarkEndpointUnavailableForRead(t *testing.T) {

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})

client := &Client{endpoint: srv.URL(), pipeline: pl}

endpoint, err := url.Parse(client.endpoint)
endpoint, err := url.Parse(srv.URL())
assert.NoError(t, err)

gem, err := newGlobalEndpointManager(srv.URL(), pl, []string{"West US", "Central US"}, 5*time.Minute, true)
Expand All @@ -101,9 +99,7 @@ func TestGlobalEndpointManagerMarkEndpointUnavailableForWrite(t *testing.T) {

pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})

client := &Client{endpoint: srv.URL(), pipeline: pl}

endpoint, err := url.Parse(client.endpoint)
endpoint, err := url.Parse(srv.URL())
assert.NoError(t, err)

gem, err := newGlobalEndpointManager(srv.URL(), pl, []string{"West US", "Central US"}, 5*time.Minute, true)
Expand Down
Loading