From 45924031df61a6581918b5e7bbe0aba240888b81 Mon Sep 17 00:00:00 2001 From: ben Date: Thu, 11 Jan 2024 07:55:33 -0700 Subject: [PATCH] feat: implement if_seq_no and if_primary_term parameters in bulk indexer (#783) --- esutil/bulk_indexer.go | 11 +++++++++++ esutil/bulk_indexer_internal_test.go | 12 ++++++++++++ 2 files changed, 23 insertions(+) diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index c462876562..87b81d9ab9 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -107,6 +107,8 @@ type BulkIndexerItem struct { VersionType string Body io.ReadSeeker RetryOnConflict *int + IfSeqNo *int64 + IfPrimaryTerm *int64 meta bytes.Buffer // Item metadata header payloadLength int // Item payload total length metadata+newline+body length @@ -177,6 +179,15 @@ func (item *BulkIndexerItem) marshallMeta() { aux = aux[:0] } + if item.DocumentID != "" && item.IfSeqNo != nil && item.IfPrimaryTerm != nil { + item.meta.WriteRune(',') + item.meta.WriteString(`"if_seq_no":`) + item.meta.WriteString(strconv.FormatInt(*item.IfSeqNo, 10)) + item.meta.WriteRune(',') + item.meta.WriteString(`"if_primary_term":`) + item.meta.WriteString(strconv.FormatInt(*item.IfPrimaryTerm, 10)) + } + item.meta.WriteRune('}') item.meta.WriteRune('}') item.meta.WriteRune('\n') diff --git a/esutil/bulk_indexer_internal_test.go b/esutil/bulk_indexer_internal_test.go index 0ee9264732..e78621e39d 100644 --- a/esutil/bulk_indexer_internal_test.go +++ b/esutil/bulk_indexer_internal_test.go @@ -598,6 +598,8 @@ func TestBulkIndexer(t *testing.T) { }) t.Run("Worker.writeMeta()", func(t *testing.T) { v := int64(23) + ifSeqNo := int64(45) + ifPrimaryTerm := int64(67) type args struct { item BulkIndexerItem } @@ -706,6 +708,16 @@ func TestBulkIndexer(t *testing.T) { }}, `{"update":{"_id":"1","retry_on_conflict":3}}` + "\n", }, + { + "with if_seq_no and if_primary_term", + args{BulkIndexerItem{ + Action: "index", + DocumentID: "1", + IfSeqNo: &ifSeqNo, + IfPrimaryTerm: &ifPrimaryTerm, + }}, + `{"index":{"_id":"1","if_seq_no":45,"if_primary_term":67}}` + "\n", + }, } for _, tt := range tests { tt := tt