Skip to content

Commit

Permalink
### Added
Browse files Browse the repository at this point in the history
- `ServerTimeout` Query Option

### Changed
- `MgmtOption` is deprecated. From now on both `Query` and `Mgmt` accept `QueryOption`, `MgmtOption` will remain as an alias until the next version.
-

### Fixed

### Removed
- `AllowWrite` has been a no-op for a while. It is now finally removed.

### Security
  • Loading branch information
AsafMah committed May 2, 2023
1 parent 778af97 commit 407e100
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 114 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
### Added
- `ServerTimeout` Query Option

### Changed
- `MgmtOption` is deprecated. From now on both `Query` and `Mgmt` accept `QueryOption`, `MgmtOption` will remain as an alias until the next version.
-

### Fixed

### Removed
- `AllowWrite` has been a no-op for a while. It is now finally removed.

### Security

## [0.12.1] - 2023-05-01
### Fixed
* Fixed parsing of errors in queries
Expand Down
3 changes: 1 addition & 2 deletions kusto/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ type queryMsg struct {

type connOptions struct {
queryOptions *queryOptions
mgmtOptions *mgmtOptions
}

// query makes a query for the purpose of extracting data from Kusto. Context can be used to set
Expand All @@ -92,7 +91,7 @@ func (c *Conn) query(ctx context.Context, db string, query Statement, options *q
}

// mgmt is used to do management queries to Kusto.
func (c *Conn) mgmt(ctx context.Context, db string, query Statement, options *mgmtOptions) (execResp, error) {
func (c *Conn) mgmt(ctx context.Context, db string, query Statement, options *queryOptions) (execResp, error) {
return c.execute(ctx, execMgmt, db, query, *options.requestProperties)
}

Expand Down
76 changes: 15 additions & 61 deletions kusto/kusto.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type queryer interface {
io.Closer
query(ctx context.Context, db string, query Statement, options *queryOptions) (execResp, error)
mgmt(ctx context.Context, db string, query Statement, options *mgmtOptions) (execResp, error)
mgmt(ctx context.Context, db string, query Statement, options *queryOptions) (execResp, error)
queryToJson(ctx context.Context, db string, query Statement, options *queryOptions) (string, error)
}

Expand Down Expand Up @@ -97,8 +97,8 @@ type QueryOption func(q *queryOptions) error

// Note: QueryOption are defined in queryopts.go file

// MgmtOption is an option type for a call to Mgmt().
type MgmtOption func(m *mgmtOptions) error
// Deprecated: MgmtOption will be removed in a future release. Use QueryOption instead.
type MgmtOption = QueryOption

// Note: MgmtOption are defined in queryopts.go file

Expand Down Expand Up @@ -130,7 +130,7 @@ func (c *Client) Query(ctx context.Context, db string, query Statement, options
return nil, err
}

opts, err := setQueryOptions(ctx, errors.OpQuery, query, options...)
opts, err := setQueryOptions(errors.OpQuery, query, options...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func (c *Client) QueryToJson(ctx context.Context, db string, query Statement, op
return "", err
}

opts, err := setQueryOptions(ctx, errors.OpQuery, query, options...)
opts, err := setQueryOptions(errors.OpQuery, query, options...)
if err != nil {
return "", err
}
Expand All @@ -214,7 +214,7 @@ func (c *Client) QueryToJson(ctx context.Context, db string, query Statement, op
// Mgmt accepts a Stmt, but that Stmt cannot have any query parameters attached at this time.
// Note that the server has a timeout of 10 minutes for a management call by default unless the context deadline is set.
// There is a maximum of 1 hour.
func (c *Client) Mgmt(ctx context.Context, db string, query Statement, options ...MgmtOption) (*RowIterator, error) {
func (c *Client) Mgmt(ctx context.Context, db string, query Statement, options ...QueryOption) (*RowIterator, error) {
if stmt, ok := query.(Stmt); ok {
if !stmt.params.IsZero() || !stmt.defs.IsZero() {
return nil, errors.ES(errors.OpMgmt, errors.KClientArgs, "a Mgmt() call cannot accept a Stmt object that has Definitions or Parameters attached")
Expand All @@ -226,12 +226,12 @@ func (c *Client) Mgmt(ctx context.Context, db string, query Statement, options .
return nil, err
}

opts, err := setMgmtOptions(ctx, errors.OpMgmt, query, options...)
opts, err := setQueryOptions(errors.OpMgmt, query, options...)
if err != nil {
return nil, err
}

conn, err := c.getConn(mgmtCall, connOptions{mgmtOptions: opts})
conn, err := c.getConn(mgmtCall, connOptions{queryOptions: opts})
if err != nil {
return nil, err
}
Expand All @@ -258,27 +258,18 @@ func (c *Client) Mgmt(ctx context.Context, db string, query Statement, options .
return iter, nil
}

func setQueryOptions(ctx context.Context, op errors.Op, query Statement, options ...QueryOption) (*queryOptions, error) {
// Match our server deadline to our context.Deadline. This should be set from withing kusto.Query() to always have a value.
deadline, ok := ctx.Deadline()
if ok {
options = append(
options,
queryServerTimeout(deadline.Sub(nower())),
)
}

func setQueryOptions(op errors.Op, query Statement, options ...QueryOption) (*queryOptions, error) {
opt := &queryOptions{
requestProperties: &requestProperties{
Options: map[string]interface{}{},
},
}
/*if op == errors.OpQuery {

if op == errors.OpQuery {
// We want progressive frames by default for Query(), but not Mgmt() because it uses v1 framing and ingestion endpoints
// do not support it.
opt.requestProperties.Options["results_progressive_enabled"] = true
}*/
opt.requestProperties.Options["results_progressive_enabled"] = true
}

for _, o := range options {
if err := o(opt); err != nil {
Expand All @@ -299,48 +290,13 @@ func setQueryOptions(ctx context.Context, op errors.Op, query Statement, options
return opt, nil
}

func setMgmtOptions(ctx context.Context, op errors.Op, query Statement, options ...MgmtOption) (*mgmtOptions, error) {
if stmt, ok := query.(Stmt); ok {
if !stmt.params.IsZero() {
return nil, errors.ES(op, errors.KClientArgs, "Parameters aren't compatible with management queries").SetNoRetry()
}
}

// Match our server deadline to our context.Deadline. This should be set from withing kusto.Query() to always have a value.
deadline, ok := ctx.Deadline()
if ok {
options = append(
options,
mgmtServerTimeout(deadline.Sub(nower())),
)
}

opt := &mgmtOptions{
requestProperties: &requestProperties{
Options: map[string]interface{}{},
},
}
if op == errors.OpQuery {
// We want progressive frames by default for Query(), but not Mgmt() because it uses v1 framing and ingestion endpoints
// do not support it.
opt.requestProperties.Options["results_progressive_enabled"] = true
}

for _, o := range options {
if err := o(opt); err != nil {
return nil, errors.ES(op, errors.KClientArgs, "QueryValues in the the Stmt were incorrect: %s", err).SetNoRetry()
}
}
return opt, nil
}

func (c *Client) getConn(callType callType, options connOptions) (queryer, error) {
switch callType {
case queryCall:
return c.conn, nil
case mgmtCall:
delete(options.mgmtOptions.requestProperties.Options, "results_progressive_enabled")
if options.mgmtOptions.queryIngestion {
delete(options.queryOptions.requestProperties.Options, "results_progressive_enabled")
if options.queryOptions.queryIngestion {
c.mgmtConnMu.Lock()
defer c.mgmtConnMu.Unlock()

Expand Down Expand Up @@ -370,12 +326,10 @@ func (c *Client) getConn(callType callType, options connOptions) (queryer, error
}
}

var nower = time.Now

func contextSetup(ctx context.Context, mgmtCall bool) (context.Context, context.CancelFunc, error) {
t, ok := ctx.Deadline()
if ok {
d := t.Sub(nower())
d := t.Sub(time.Now())
if d > 1*time.Hour {
if mgmtCall {
return ctx, nil, errors.ES(errors.OpMgmt, errors.KClientArgs, "cannot set a deadline greater than 1 hour(%s)", d)
Expand Down
42 changes: 0 additions & 42 deletions kusto/mgmtopts.go

This file was deleted.

2 changes: 1 addition & 1 deletion kusto/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (m mockConn) query(_ context.Context, _ string, _ Statement, _ *queryOption
return execResp{}, nil
}

func (m mockConn) mgmt(_ context.Context, _ string, _ Statement, _ *mgmtOptions) (execResp, error) {
func (m mockConn) mgmt(_ context.Context, _ string, _ Statement, _ *queryOptions) (execResp, error) {
framesCh := make(chan frames.Frame, 100)
framesCh <- v1.DataTable{}
close(framesCh)
Expand Down
20 changes: 12 additions & 8 deletions kusto/queryopts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/Azure/azure-kusto-go/kusto/kql"
"time"

"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/data/value"
)

Expand All @@ -25,6 +24,7 @@ type requestProperties struct {

type queryOptions struct {
requestProperties *requestProperties
queryIngestion bool
}

const NoRequestTimeoutValue = "norequesttimeout"
Expand Down Expand Up @@ -135,14 +135,9 @@ func ResultsProgressiveDisable() QueryOption {
}
}

// queryServerTimeout is the amount of time the server will allow a query to take.
// NOTE: I have made the serverTimeout private. For the moment, I'm going to use the context.Context timer
// to set timeouts via this private method.
func queryServerTimeout(d time.Duration) QueryOption {
// ServerTimeout overrides the default request timeout.
func ServerTimeout(d time.Duration) QueryOption {
return func(q *queryOptions) error {
if d > 1*time.Hour {
return errors.ES(errors.OpQuery, errors.KClientArgs, "ServerTimeout option was set to %v, but can't be more than 1 hour", d)
}
q.requestProperties.Options[ServerTimeoutValue] = value.Timespan{Valid: true, Value: d}.Marshal()
return nil
}
Expand Down Expand Up @@ -570,3 +565,12 @@ func ValidatePermissions() QueryOption {
return nil
}
}

// IngestionEndpoint will instruct the Mgmt call to connect to the ingest-[endpoint] instead of [endpoint].
// This is not often used by end users and can only be used with a Mgmt() call.
func IngestionEndpoint() QueryOption {
return func(m *queryOptions) error {
m.queryIngestion = true
return nil
}
}

0 comments on commit 407e100

Please sign in to comment.