Welcome to the migration guide for the Azure Data Explorer (Kusto) Go SDK. This guide is designed to assist you in migrating your application from using the beta version of the SDK (github.com/Azure/azure-kusto-go
) to the new, stable release, version 1.0.0
.
The release of version 1.0.0
introduces breaking changes, which include changes in dependencies, package arrangement, querying methods, and management commands. Following these steps carefully will ensure a smooth transition to the new version.
To migrate to version 1.x from older versions, you'll need to update your go.mod
file's dependencies.
Old SDK:
go get github.com/Azure/azure-kusto-go
New SDK:
- For data operations (
github.com/Azure/azure-kusto-go/azkustodata
):
go get github.com/Azure/azure-kusto-go/azkustodata
- For ingestion operations (
github.com/Azure/azure-kusto-go/azkustoingest
):
go get github.com/Azure/azure-kusto-go/azkustoingest
Alternatively, manually update your go.mod file by replacing github.com/Azure/azure-kusto-go
with the specific package(s) you need.
The SDK is now split into two separate packages:
- azkustodata: For querying and managing Azure Data Explorer clusters.
- azkustoingest: For ingesting data into Azure Data Explorer clusters.
Depending on your requirements, import one or both of these packages into your Go files.
For Data Operations:
import (
"github.com/Azure/azure-kusto-go/azkustodata"
)
For Ingestion Operations:
import (
"github.com/Azure/azure-kusto-go/azkustoingest"
)
Update your code to use the new packages.
For exmaple:
Old SDK:
kscb := kusto.NewConnectionStringBuilder(endpoint)
client, err = kusto.New(kscb)
New SDK (For Data Operations):
kscb := azkustodata.NewConnectionStringBuilder(endpoint)
client, err = azkustodata.New(kscb)
Same for ingestion operations:
Old SDK Ingestion Client Creation(Queued Example):
in, err := ingest.New(kustoClient, "database", "table")
New SDK Ingestion Client Creation (Queued Example):
in, err := azkustoingest.New(kustoConnectionString)
The new SDK introduces a new way to build queries:
The old SDK used a kusto.NewStmt
method to build queries:
query := kusto.NewStmt("systemNodes | project CollectionTime, NodeId")
For the new SDK, use the azkustodata/kql
package to build queries, which has a type-safe query builder:
dt, _ := time.Parse(time.RFC3339Nano, "2020-03-04T14:05:01.3109965Z")
tableName := "system nodes"
value := 1
query := kql.New("")
.AddTable(tableName)
.AddLiteral(" | where CollectionTime == ").AddDateTime(dt)
.AddLiteral(" and ")
.AddLiteral("NodeId == ").AddInt(value) // outputs ['system nodes'] | where CollectionTime == datetime(2020-03-04T14:05:01.3109965Z) and NodeId == int(1)
The new SDK introduces a new way to query data.
The old SDK used the Query
method to query data:
import github.com/Azure/azure-kusto-go/kusto/data/table
// Query our database table "systemNodes" for the CollectionTimes and the NodeIds.
iter, err := client.Query(ctx, "database", query)
if err != nil {
panic("add error handling")
}
defer iter.Stop()
// .Do() will call the function for every row in the table.
err = iter.DoOnRowOrError(
func(row *table.Row, e *kustoErrors.Error) error {
if e != nil {
return e
}
if row.Replace {
fmt.Println("---") // Replace flag indicates that the query result should be cleared and replaced with this row
}
fmt.Println(row) // As a convenience, printing a *table.Row will output csv
return nil
},
)
if err != nil {
panic("add error handling")
}
In 1.0.0, the "Query" method returns a dataset object, which contains all of the tables returned by the query. The primary results table(s) always come first, therefore in the common case, it's possible to access it like this:
dataset, err := client.Query(ctx, "database", query)
if err != nil {
panic("error handling")
}
primaryResult := dataset.Tables()[0]
for _, row := range primaryResult.Rows() {
fmt.Println(row) // Process each row
}
If needed, it's possible to iterate over the dataset and handle each table separately.
import github.com/Azure/azure-kusto-go/azkustodata/query/v2
for _, table := range dataset.Tables() {
switch table.Kind() {
case v2.QueryPropertiesKind:
queryProps, err := v2.AsQueryProperties(table)
if err != nil {
panic(err)
}
fmt.Printf("%v\n", queryProps[0].Value)
case v2.QueryCompletionInformationKind:
queryProps, err := v2.AsQueryCompletionInformation(table)
if err != nil {
panic(err)
}
fmt.Printf("%v\n", queryProps[0].ActivityId)
}
case v2.PrimaryResultKind:
for _, row := range table.Rows() {
fmt.Println(row) // Process each row
}
}
}
Alternatively, use the QueryIterative
method to iterate tables as they arrive:
dataset, err := client.QueryIterative(ctx, "database", query)
if err != nil {
panic("error handling")
}
// Make sure to close the dataset when done.
defer dataset.Close()
for tableResult := range dataset.Tables() {
if tableResult.Err() != nil {
panic("table error handling")
}
// Make sure to consume the rows, or the Tables() channel will block.
for rowResult := range tableResult.Table().Rows() {
if rowResult.Err() != nil {
panic("row error handling")
}
fmt.Println(rowResult.Row()) // Process each row
}
}
Working with rows also got easier, with methods to extract specific types:
row := table.Rows()[0]
row.IntByName("EventId") // Get the value of the column "EventId" as an int
row.StringByIndex(0) // Get the value of the first column as a string
Or get the table as a slice of structs:
import github.com/Azure/azure-kusto-go/azkustodata/query
events, err := query.ToStructs[Event]()
if err != nil {
panic("error handling")
}
for _, event := range events {
fmt.Println(event) // Process each event
}
Management commands are now called using the Mgmt
method on the client, and have an identical api to Query
:
dataset, err := client.Mgmt(ctx, "database", query)
if err != nil {
panic("error handling")
}
primaryResult := dataset.Tables()[0]
for _, row := range primaryResult.Rows() {
fmt.Println(row) // Process each row
}
It is recommended to use parameters for queries that contain user input.
Management commands can not use parameters, and therefore should be built using the builder (see next section).
Parameters can be implicitly referenced in a query:
query := kql.New("systemNodes | project CollectionTime, NodeId | where CollectionTime > startTime and NodeId == nodeIdValue")
Here, startTime
and nodeIdValue
are parameters that can be passed to the query.
To Pass the parameters values to the query, create kql.Parameters
:
params := kql.NewParameters().AddDateTime("startTime", dt).AddInt("nodeIdValue", 1)
And then pass it to the Query
method, as an option:
results, err := client.Query(ctx, database, query, QueryParameters(params))
if err != nil {
panic("add error handling")
}
// You can see the generated parameters using the ToDeclarationString() method:
fmt.Println(params.ToDeclarationString()) // declare query_parameters(startTime:datetime, nodeIdValue:int);
// You can then use the same query with different parameters:
params2 := kql.NewParameters().AddDateTime("startTime", dt).AddInt("nodeIdValue", 2)
dataset, err = client.Query(ctx, database, query, QueryParameters(params2))
The Ingestion API stayed the same, only using the new package:
import github.com/Azure/azure-kusto-go/azkustoingest
kcsb := azkustodata.NewConnectionStringBuilder(`endpoint`).WithAadAppKey("clientID", "clientSecret", "tenentID")
ingestor, err := azkustoingest.New(kcsb, azkustoingest.WithDefaultDatabase("database"), azkustoingest.WithDefaultTable("table"))
if err != nil {
// Handle error
}
defer ingestor.Close() // Always close the ingestor when done.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
_, err = ingestor.FromFile(ctx, "/path/to/file", azkustoingest.DeleteSource())