Skip to content

Commit

Permalink
Merge branch 'master' into the-big-split
Browse files Browse the repository at this point in the history
# Conflicts:
#	azkustodata/kcsb.go
#	azkustoingest/go.mod
#	azkustoingest/go.sum
  • Loading branch information
AsafMah committed Jul 19, 2023
2 parents 1568225 + 2ff4861 commit baf90d4
Show file tree
Hide file tree
Showing 12 changed files with 109 additions and 26 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ x86/
bld/
[Bb]in/
[Oo]bj/
*.exe

# Visual Studio 2015/2017 cache/options directory
.vs/
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ kustoConnectionString := kustoConnectionStringBuilder.WithUserManagedIdentity(cl
client, err = azkustodata.New(kustoConnectionString)
```

#### Using a k8s workload identity

```go
kustoConnectionString := kustoConnectionStringBuilder.WithKubernetesWorkloadIdentity(appId, tokenFilePath, authorityID)
client, err = kusto.New(kustoConnectionString)
```

#### Using a bearer token

```go
Expand Down Expand Up @@ -258,7 +265,7 @@ that is returned supports this via the `.ToStruct()` method.
```go
// NodeRec represents our Kusto data that will be returned.
type NodeRec struct {
// ID is the table's NodeId. We use the field tag here to to instruct our client to convert NodeId to ID.
// ID is the table's NodeId. We use the field tag here to instruct our client to convert NodeId to ID.
ID int64 `kusto:"NodeId"`
// CollectionTime is Go representation of the Kusto datetime type.
CollectionTime time.Time
Expand Down
2 changes: 1 addition & 1 deletion azkustodata/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ that is returned supports this via the `.ToStruct()` method.
// NodeRec represents our Kusto data that will be returned.
type NodeRec struct {
// ID is the table's NodeId. We use the field tag here to to instruct our client to convert NodeId to ID.
// ID is the table's NodeId. We use the field tag here to instruct our client to convert NodeId to ID.
ID int64 `kusto:"NodeId"`
// CollectionTime is Go representation of the Kusto datetime type.
CollectionTime time.Time
Expand Down
43 changes: 41 additions & 2 deletions azkustodata/kcsb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package azkustodata

import (
"fmt"
"strconv"
"strings"

kustoErrors "github.com/Azure/azure-kusto-go/azkustodata/errors"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"strconv"
"strings"
)

type ConnectionStringBuilder struct {
Expand All @@ -23,6 +24,8 @@ type ConnectionStringBuilder struct {
ApplicationToken string
AzCli bool
MsiAuthentication bool
WorkloadAuthentication bool
FederationTokenFilePath string
ManagedServiceIdentity string
InteractiveLogin bool
RedirectURL string
Expand Down Expand Up @@ -158,6 +161,7 @@ func (kcsb *ConnectionStringBuilder) resetConnectionString() {
kcsb.ApplicationToken = ""
kcsb.AzCli = false
kcsb.MsiAuthentication = false
kcsb.WorkloadAuthentication = false
kcsb.ManagedServiceIdentity = ""
kcsb.InteractiveLogin = false
kcsb.RedirectURL = ""
Expand Down Expand Up @@ -251,6 +255,18 @@ func (kcsb *ConnectionStringBuilder) WithSystemManagedIdentity() *ConnectionStri
return kcsb
}

// WithKubernetesWorkloadIdentity Creates a Kusto Connection string builder that will authenticate with AAD application, using
// an application token obtained from a Microsoft Service Identity endpoint using Kubernetes workload identity.
func (kcsb *ConnectionStringBuilder) WithKubernetesWorkloadIdentity(appId, tokenFilePath, authorityID string) *ConnectionStringBuilder {
requireNonEmpty(dataSource, kcsb.DataSource)
kcsb.resetConnectionString()
kcsb.ApplicationClientId = appId
kcsb.AuthorityId = authorityID
kcsb.FederationTokenFilePath = tokenFilePath
kcsb.WorkloadAuthentication = true
return kcsb
}

// WithInteractiveLogin Creates a Kusto Connection string builder that will authenticate by launching the system default browser
// to interactively authenticate a user, and obtain an access token
func (kcsb *ConnectionStringBuilder) WithInteractiveLogin(authorityID string) *ConnectionStringBuilder {
Expand Down Expand Up @@ -376,6 +392,29 @@ func (kcsb *ConnectionStringBuilder) newTokenProvider() (*TokenProvider, error)
fmt.Errorf("error: Couldn't retrieve client credentials using Managed Identity: %s", err))
}

return cred, nil
}
case kcsb.WorkloadAuthentication:
init = func(ci *CloudInfo, cliOpts *azcore.ClientOptions, appClientId string) (azcore.TokenCredential, error) {
opts := &azidentity.WorkloadIdentityCredentialOptions{ClientOptions: *cliOpts}
if !isEmpty(kcsb.ApplicationClientId) {
opts.ClientID = kcsb.ApplicationClientId
}

if !isEmpty(kcsb.FederationTokenFilePath) {
opts.TokenFilePath = kcsb.FederationTokenFilePath
}

if !isEmpty(kcsb.AuthorityId) {
opts.TenantID = kcsb.AuthorityId
}

cred, err := azidentity.NewWorkloadIdentityCredential(opts)
if err != nil {
return nil, kustoErrors.E(kustoErrors.OpTokenProvider, kustoErrors.KOther,
fmt.Errorf("error: Couldn't retrieve client credentials using Workload Identity: %s", err))
}

return cred, nil
}
case !isEmpty(kcsb.UserToken):
Expand Down
23 changes: 23 additions & 0 deletions azkustodata/kcsb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ func TestWitAadUserToken(t *testing.T) {
assert.EqualValues(t, want, *actual)
}

func TestWithWorkloadIdentity(t *testing.T) {
want := ConnectionStringBuilder{
DataSource: "endpoint",
ApplicationClientId: "clientID",
AuthorityId: "authorityID",
FederationTokenFilePath: "tokenfilepath",
WorkloadAuthentication: true,
}

actual := NewConnectionStringBuilder("endpoint").WithKubernetesWorkloadIdentity("clientID", "tokenfilepath", "authorityID")

assert.EqualValues(t, want, *actual)
}

func TestWitAadUserTokenErr(t *testing.T) {
defer func() {
if res := recover(); res == nil {
Expand Down Expand Up @@ -166,6 +180,15 @@ func TestGetTokenProviderHappy(t *testing.T) {
DataSource: "https://endpoint/test_tokenprovider_managedidauth2",
MsiAuthentication: true,
},
}, {
name: "test_tokenprovider_workloadidentity",
kcsb: ConnectionStringBuilder{
DataSource: "https://endpoint/test_tokenprovider_workloadidentity",
ApplicationClientId: "clientID",
AuthorityId: "tenantID",
FederationTokenFilePath: "tokenfilepath",
WorkloadAuthentication: true,
},
}, {
name: "test_tokenprovider_usertoken",
kcsb: ConnectionStringBuilder{
Expand Down
4 changes: 2 additions & 2 deletions azkustodata/kusto_examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func Example_simple() {

// NodeRec represents our Kusto data that will be returned.
type NodeRec struct {
// ID is the table's NodeId. We use the field tag here to to instruct our client to convert NodeId to ID.
// ID is the table's NodeId. We use the field tag here to instruct our client to convert NodeId to ID.
ID int64 `kusto:"NodeId"`
// CollectionTime is Go representation of the Kusto datetime type.
CollectionTime time.Time
Expand Down Expand Up @@ -240,7 +240,7 @@ func ExampleClient_Query_struct() {

// NodeRec represents our Kusto data that will be returned.
type NodeRec struct {
// ID is the table's NodeId. We use the field tag here to to instruct our client to convert NodeId to ID.
// ID is the table's NodeId. We use the field tag here to instruct our client to convert NodeId to ID.
ID int64 `kusto:"NodeId"`
// CollectionTime is Go representation of the Kusto datetime type.
CollectionTime time.Time
Expand Down
2 changes: 1 addition & 1 deletion azkustodata/mock_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type querier interface {

// NodeRec represents our Kusto data that will be returned.
type NodeRec struct {
// ID is the table's NodeId. We use the field tag here to to instruct our client to convert NodeId to ID.
// ID is the table's NodeId. We use the field tag here to instruct our client to convert NodeId to ID.
ID int64 `kusto:"NodeId"`
// CollectionTime is Go representation of the Kusto datetime type.
CollectionTime time.Time
Expand Down
2 changes: 1 addition & 1 deletion azkustoingest/file_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ const (
SingleJSON DataFormat = properties.SingleJSON
)

// InferFormatFromFileName looks at the file name and tries to discern what the file format is
// InferFormatFromFileName looks at the file name and tries to discern what the file format is
func InferFormatFromFileName(fName string) DataFormat {
return properties.DataFormatDiscovery(fName)
}
Expand Down
22 changes: 12 additions & 10 deletions azkustoingest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,40 @@ require (
github.com/Azure/azure-kusto-go/azkustodata v0.0.0-20230611061857-ce20bdc050a1
github.com/Azure/azure-pipeline-go v0.1.8
github.com/Azure/azure-sdk-for-go v67.1.0+incompatible
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.3.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.6.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
github.com/cenkalti/backoff/v4 v4.2.0
github.com/gofrs/uuid v4.2.0+incompatible
github.com/google/uuid v1.3.0
github.com/kylelemons/godebug v1.1.0
github.com/samber/lo v1.37.0
github.com/shopspring/decimal v1.3.1
github.com/tj/assert v0.0.3
github.com/stretchr/testify v1.8.2
go.uber.org/goleak v1.2.1
)

require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.2.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.1.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.28 // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.23 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v0.7.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect
github.com/kr/text v0.1.0 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/samber/lo v1.37.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/crypto v0.10.0 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/net v0.6.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/sys v0.9.0 // indirect
golang.org/x/text v0.10.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
11 changes: 11 additions & 0 deletions azkustoingest/internal/properties/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,17 @@ func (i Ingestion) validate() error {
return nil
}

func RemoveQueryParamsFromUrl(url string) string {
result := url
if idx := strings.Index(result, "?"); idx != -1 {
result = result[:idx]
}
if idx := strings.Index(result, ";"); idx != -1 {
result = result[:idx]
}
return result
}

func uuidIsZero(id uuid.UUID) bool {
for _, b := range id {
if b != 0 {
Expand Down
6 changes: 3 additions & 3 deletions azkustoingest/internal/queued/queued.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (i *Ingestion) Local(ctx context.Context, from string, props properties.All
return err
}

// We want to check the queue size here so so we don't upload a file and then find we don't have a Kusto queue to stick
// We want to check the queue size here so we don't upload a file and then find we don't have a Kusto queue to stick
// it in. If we don't have a container, that is handled by containerQueue().
if len(mgrResources.Queues) == 0 {
return errors.ES(errors.OpFileIngest, errors.KBlobstore, "no Kusto queue resources are defined, there is no queue to upload to").SetNoRetry()
Expand Down Expand Up @@ -328,7 +328,7 @@ func createPipeline(http *http.Client) pipeline.Pipeline {

var nower = time.Now

// localToBlob copies from a local to to an Azure Blobstore blob. It returns the URL of the Blob, the local file info and an
// localToBlob copies from a local to an Azure Blobstore blob. It returns the URL of the Blob, the local file info and an
// error if there was one.
func (i *Ingestion) localToBlob(ctx context.Context, from string, client *azblob.Client, container string, props *properties.All) (string, int64, error) {
compression := utils.CompressionDiscovery(from)
Expand Down Expand Up @@ -428,7 +428,7 @@ func IsLocalPath(s string) (bool, error) {
return true, nil
}

func fullUrl(client *azblob.Client, container, blob string) string {
func fullUrl(client *azblob.Client, container string, blob string) string {
parseURL, err := azblob.ParseURL(client.URL())
if err != nil {
return ""
Expand Down
10 changes: 5 additions & 5 deletions azkustoingest/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (i StatusCode) IsSuccess() bool {
}
}

// FailureStatusCode indicates the status of failuted ingestion attempts
// FailureStatusCode indicates the status of failed ingestion attempts
type FailureStatusCode string

const (
Expand All @@ -83,7 +83,7 @@ func (i FailureStatusCode) IsRetryable() bool {
}
}

// statusRecord is a record containing information regarding the status of an ingation command
// statusRecord is a record containing information regarding the status of an ingestion command
type statusRecord struct {
// Status is The ingestion status returned from the service. Status remains 'Pending' during the ingestion process and
// is updated by the service once the ingestion completes. When <see cref="IngestionReportMethod"/> is set to 'Queue', the ingestion status
Expand Down Expand Up @@ -162,7 +162,7 @@ func (r *statusRecord) FromProps(props properties.All) {
r.UpdatedOn = time.Now()

if props.Ingestion.BlobPath != "" && r.IngestionSourcePath == undefinedString {
r.IngestionSourcePath = props.Ingestion.BlobPath
r.IngestionSourcePath = properties.RemoveQueryParamsFromUrl(props.Ingestion.BlobPath)
}
}

Expand All @@ -178,7 +178,7 @@ func (r *statusRecord) FromMap(data map[string]interface{}) {
r.FailureStatus = FailureStatusCode(strStatus)
}

r.IngestionSourcePath = safeGetString(data, "IngestionSourcePath")
r.IngestionSourcePath = properties.RemoveQueryParamsFromUrl(safeGetString(data, "IngestionSourcePath"))
r.Database = safeGetString(data, "Database")
r.Table = safeGetString(data, "Table")
r.ErrorCode = safeGetString(data, "ErrorCode")
Expand Down Expand Up @@ -217,7 +217,7 @@ func (r *statusRecord) ToMap() map[string]interface{} {
// Those will be read from the server if they have data in them
data["Status"] = r.Status
data["IngestionSourceId"] = r.IngestionSourceID
data["IngestionSourcePath"] = r.IngestionSourcePath
data["IngestionSourcePath"] = properties.RemoveQueryParamsFromUrl(r.IngestionSourcePath)
data["Database"] = r.Database
data["Table"] = r.Table
data["UpdatedOn"] = r.UpdatedOn.Format(time.RFC3339Nano)
Expand Down

0 comments on commit baf90d4

Please sign in to comment.