Skip to content

Commit

Permalink
Added ignore first record support (#184)
Browse files Browse the repository at this point in the history
* Added ignore first record support

* Changelog
  • Loading branch information
ronmonetaMicro authored May 3, 2023
1 parent 9270a07 commit 252b25d
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
### Added
* Support for IgnoreFirstRecord ingestion option
## Fixed
* AttachPolicyClientOptions method by @JorTurFer

Expand Down
13 changes: 13 additions & 0 deletions kusto/ingest/file_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,19 @@ func FlushImmediately() FileOption {
}
}

// IgnoreFirstRecord tells Kusto to flush on write.
func IgnoreFirstRecord() FileOption {
return option{
run: func(p *properties.All) error {
p.Ingestion.Additional.IgnoreFirstRecord = true
return nil
},
clientScopes: QueuedClient | ManagedClient,
sourceScope: FromFile | FromReader | FromBlob,
name: "IgnoreFirstRecord",
}
}

// DataFormat indicates what type of encoding format was used for source data.
// Not all options can be used in every method.
type DataFormat = properties.DataFormat
Expand Down
5 changes: 3 additions & 2 deletions kusto/ingest/internal/properties/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,9 @@ type Additional struct {
IngestionMappingType DataFormat `json:"ingestionMappingType,omitempty"`
// ValidationPolicy is a JSON encoded string that tells our ingestion action what policies we want on the
// data being ingested and what to do when that is violated.
ValidationPolicy string `json:"validationPolicy,omitempty"`
Format DataFormat `json:"format,omitempty"`
ValidationPolicy string `json:"validationPolicy,omitempty"`
Format DataFormat `json:"format,omitempty"`
IgnoreFirstRecord bool `json:"ignoreFirstRecord"`
// Tags is a list of tags to associated with the ingested data.
Tags []string `json:"tags,omitempty"`
// IngestIfNotExists is a string value that, if specified, prevents ingestion from succeeding if the table already
Expand Down
22 changes: 22 additions & 0 deletions kusto/test/etoe/etoe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,28 @@ func TestFileIngestion(t *testing.T) { //ok
},
want: &[]CountResult{{Count: 500}},
},
{
desc: "Ingest from csv with ignore first record",
ingestor: queuedIngestor,
src: csvFileFromString(t),
options: []ingest.FileOption{ingest.IgnoreFirstRecord()},
stmt: countStatement,
table: queuedTable,
doer: func(row *table.Row, update interface{}) error {
rec := CountResult{}
if err := row.ToStruct(&rec); err != nil {
return err
}
recs := update.(*[]CountResult)
*recs = append(*recs, rec)
return nil
},
gotInit: func() interface{} {
v := []CountResult{}
return &v
},
want: &[]CountResult{{Count: 2}},
},
{
desc: "Ingest from blob with existing mapping managed",
ingestor: managedIngestor,
Expand Down

0 comments on commit 252b25d

Please sign in to comment.