Skip to content

Commit 7d0af35

Browse files
authored
Merge pull request #75 from quickwit-oss/fmassot/fetch-with-index-pattern
Add support for index pattern
2 parents 4d491b9 + 09030cc commit 7d0af35

File tree

4 files changed

+122
-112
lines changed

4 files changed

+122
-112
lines changed

pkg/quickwit/quickwit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
9191
}
9292

9393
if !toOk {
94-
timeField, err = GetTimestampFieldInfos(index, settings.URL, httpCli)
94+
timeField, err = GetTimestampField(index, settings.URL, httpCli)
9595
if nil != err {
9696
return nil, err
9797
}

pkg/quickwit/timestamp_infos.go

Lines changed: 77 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ import (
66
"fmt"
77
"io"
88
"net/http"
9+
"strings"
910
)
1011

11-
type QuickwitMapping struct {
12+
type QuickwitIndexMetadata struct {
1213
IndexConfig struct {
1314
DocMapping struct {
14-
TimestampField string `json:"timestamp_field"`
15-
FieldMappings []FieldMappings `json:"field_mappings"`
15+
TimestampField string `json:"timestamp_field"`
1616
} `json:"doc_mapping"`
1717
} `json:"index_config"`
1818
}
@@ -34,25 +34,48 @@ func NewErrorCreationPayload(statusCode int, message string) error {
3434
return errors.New(string(json))
3535
}
3636

37-
func DecodeTimestampFieldInfos(statusCode int, body []byte) (string, error) {
38-
var payload QuickwitMapping
39-
err := json.Unmarshal(body, &payload)
37+
// TODO: refactor either by using a timestamp alias suppprted by quickwit
38+
// or by only using the `GetTimestampFieldFromIndexPattern` once the endpoint
39+
// /indexes?index_id_pattern= is supported, which is after the next quickwit release > 0.7.1
40+
func GetTimestampField(index string, qwickwitUrl string, cli *http.Client) (string, error) {
41+
if strings.Contains(index, "*") || strings.Contains(index, ",") {
42+
return GetTimestampFieldFromIndexPattern(index, qwickwitUrl, cli)
43+
}
44+
return GetTimestampFieldFromIndex(index, qwickwitUrl, cli)
45+
}
4046

47+
func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Client) (string, error) {
48+
mappingEndpointUrl := qwickwitUrl + "/indexes/" + index
49+
qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl)
50+
r, err := cli.Get(mappingEndpointUrl)
4151
if err != nil {
42-
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
52+
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
53+
qwlog.Error(errMsg)
54+
return "", err
55+
}
56+
57+
statusCode := r.StatusCode
58+
59+
if statusCode < 200 || statusCode >= 400 {
60+
errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl)
4361
qwlog.Error(errMsg)
4462
return "", NewErrorCreationPayload(statusCode, errMsg)
4563
}
4664

47-
timestampFieldName := payload.IndexConfig.DocMapping.TimestampField
65+
defer r.Body.Close()
66+
body, err := io.ReadAll(r.Body)
67+
if err != nil {
68+
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
69+
qwlog.Error(errMsg)
70+
return "", NewErrorCreationPayload(statusCode, errMsg)
71+
}
4872

49-
qwlog.Info(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName))
50-
return timestampFieldName, nil
73+
return DecodeTimestampFieldFromIndexConfig(body)
5174
}
5275

53-
func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (string, error) {
54-
mappingEndpointUrl := qwUrl + "/indexes/" + index
55-
qwlog.Info("Calling quickwit endpoint: " + mappingEndpointUrl)
76+
func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, cli *http.Client) (string, error) {
77+
mappingEndpointUrl := qwickwitUrl + "/indexes?index_id_pattern=" + indexPattern
78+
qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl)
5679
r, err := cli.Get(mappingEndpointUrl)
5780
if err != nil {
5881
errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error())
@@ -76,5 +99,45 @@ func GetTimestampFieldInfos(index string, qwUrl string, cli *http.Client) (strin
7699
return "", NewErrorCreationPayload(statusCode, errMsg)
77100
}
78101

79-
return DecodeTimestampFieldInfos(statusCode, body)
102+
return DecodeTimestampFieldFromIndexConfigs(body)
103+
}
104+
105+
func DecodeTimestampFieldFromIndexConfigs(body []byte) (string, error) {
106+
var payload []QuickwitIndexMetadata
107+
err := json.Unmarshal(body, &payload)
108+
if err != nil {
109+
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
110+
qwlog.Error(errMsg)
111+
return "", NewErrorCreationPayload(500, errMsg)
112+
}
113+
114+
var timestampFieldName string = ""
115+
for _, indexMetadata := range payload {
116+
if timestampFieldName == "" {
117+
timestampFieldName = indexMetadata.IndexConfig.DocMapping.TimestampField
118+
continue
119+
}
120+
121+
if timestampFieldName != indexMetadata.IndexConfig.DocMapping.TimestampField {
122+
errMsg := fmt.Sprintf("Index matching the pattern should have the same timestamp fields, two found: %s and %s", timestampFieldName, indexMetadata.IndexConfig.DocMapping.TimestampField)
123+
qwlog.Error(errMsg)
124+
return "", NewErrorCreationPayload(400, errMsg)
125+
}
126+
}
127+
128+
qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName))
129+
return timestampFieldName, nil
130+
}
131+
132+
func DecodeTimestampFieldFromIndexConfig(body []byte) (string, error) {
133+
var payload QuickwitIndexMetadata
134+
err := json.Unmarshal(body, &payload)
135+
if err != nil {
136+
errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body))
137+
qwlog.Error(errMsg)
138+
return "", NewErrorCreationPayload(500, errMsg)
139+
}
140+
timestampFieldName := payload.IndexConfig.DocMapping.TimestampField
141+
qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName))
142+
return timestampFieldName, nil
80143
}

pkg/quickwit/timestamp_infos_test.go

Lines changed: 43 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -13,142 +13,89 @@ func TestDecodeTimestampFieldInfos(t *testing.T) {
1313
query := []byte(`
1414
{
1515
"version": "0.6",
16-
"index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W",
1716
"index_config": {
1817
"version": "0.6",
19-
"index_id": "myindex",
20-
"index_uri": "s3://quickwit-indexes/myindex",
2118
"doc_mapping": {
22-
"field_mappings": [
23-
{
24-
"name": "foo",
25-
"type": "text",
26-
"fast": false,
27-
"fieldnorms": false,
28-
"indexed": true,
29-
"record": "basic",
30-
"stored": true,
31-
"tokenizer": "default"
32-
},
33-
{
34-
"name": "timestamp",
35-
"type": "datetime",
36-
"fast": true,
37-
"fast_precision": "seconds",
38-
"indexed": true,
39-
"input_formats": [
40-
"rfc3339",
41-
"unix_timestamp"
42-
],
43-
"output_format": "rfc3339",
44-
"stored": true
45-
}
46-
],
47-
"tag_fields": [],
48-
"store_source": true,
49-
"index_field_presence": false,
5019
"timestamp_field": "timestamp",
5120
"mode": "dynamic",
52-
"dynamic_mapping": {},
53-
"partition_key": "foo",
54-
"max_num_partitions": 1,
5521
"tokenizers": []
5622
},
57-
"indexing_settings": {},
58-
"search_settings": {
59-
"default_search_fields": [
60-
"foo"
61-
]
62-
},
6323
"retention": null
6424
},
65-
"checkpoint": {},
66-
"create_timestamp": 1701075471,
6725
"sources": []
6826
}
6927
`)
7028

7129
// When
72-
timestampFieldName, err := DecodeTimestampFieldInfos(200, query)
30+
timestampFieldName, err := DecodeTimestampFieldFromIndexConfig(query)
7331

7432
// Then
7533
require.NoError(t, err)
7634
require.Equal(t, timestampFieldName, "timestamp")
7735
})
7836

79-
t.Run("Test decode nested fields", func(t *testing.T) {
37+
t.Run("Test decode from list of index config", func(t *testing.T) {
8038
// Given
8139
query := []byte(`
40+
[
8241
{
8342
"version": "0.6",
84-
"index_uid": "myindex:01HG7ZZK3ZD7XF6BKQCZJHSJ5W",
8543
"index_config": {
86-
"version": "0.6",
87-
"index_id": "myindex",
88-
"index_uri": "s3://quickwit-indexes/myindex",
8944
"doc_mapping": {
90-
"field_mappings": [
91-
{
92-
"name": "foo",
93-
"type": "text",
94-
"fast": false,
95-
"fieldnorms": false,
96-
"indexed": true,
97-
"record": "basic",
98-
"stored": true,
99-
"tokenizer": "default"
100-
},
101-
{
102-
"name": "sub",
103-
"type": "object",
104-
"field_mappings": [
105-
{
106-
"fast": true,
107-
"fast_precision": "seconds",
108-
"indexed": true,
109-
"input_formats": [
110-
"rfc3339",
111-
"unix_timestamp"
112-
],
113-
"name": "timestamp",
114-
"output_format": "rfc3339",
115-
"stored": true,
116-
"type": "datetime"
117-
}
118-
]
119-
}
120-
],
121-
"tag_fields": [],
122-
"store_source": true,
123-
"index_field_presence": false,
124-
"timestamp_field": "sub.timestamp",
125-
"mode": "dynamic",
126-
"dynamic_mapping": {},
127-
"partition_key": "foo",
128-
"max_num_partitions": 1,
129-
"tokenizers": []
45+
"timestamp_field": "sub.timestamp"
13046
},
13147
"indexing_settings": {},
132-
"search_settings": {
133-
"default_search_fields": [
134-
"foo"
135-
]
136-
},
13748
"retention": null
13849
},
139-
"checkpoint": {},
140-
"create_timestamp": 1701075471,
14150
"sources": []
14251
}
52+
]
14353
`)
14454

14555
// When
146-
timestampFieldName, err := DecodeTimestampFieldInfos(200, query)
56+
timestampFieldName, err := DecodeTimestampFieldFromIndexConfigs(query)
14757

14858
// Then
14959
require.NoError(t, err)
15060
require.Equal(t, timestampFieldName, "sub.timestamp")
15161
})
62+
63+
t.Run("Test decode from list of index config with different timestamp fields return an error", func(t *testing.T) {
64+
// Given
65+
query := []byte(`
66+
[
67+
{
68+
"version": "0.6",
69+
"index_config": {
70+
"doc_mapping": {
71+
"timestamp_field": "sub.timestamp"
72+
},
73+
"indexing_settings": {},
74+
"retention": null
75+
},
76+
"sources": []
77+
},
78+
{
79+
"version": "0.6",
80+
"index_config": {
81+
"doc_mapping": {
82+
"timestamp_field": "sub.timestamp2"
83+
},
84+
"indexing_settings": {},
85+
"retention": null
86+
},
87+
"sources": []
88+
}
89+
]
90+
`)
91+
92+
// When
93+
_, err := DecodeTimestampFieldFromIndexConfigs(query)
94+
95+
// Then
96+
require.Error(t, err)
97+
require.ErrorContains(t, err, "Index matching the pattern should have the same timestamp fields")
98+
})
15299
})
153100
}
154101

src/datasource.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,7 @@ export function enhanceDataFrameWithDataLinks(dataFrame: DataFrame, dataLinks: D
772772
config: {},
773773
values: displayedMessages,
774774
}
775-
console.log('newField');
775+
console.log(dataFrame);
776776
dataFrame.fields = [newField, ...dataFrame.fields];
777777
}
778778

0 commit comments

Comments
 (0)