diff --git a/client_interface.go b/client_interface.go index 1ba9b85f..62e7a081 100644 --- a/client_interface.go +++ b/client_interface.go @@ -220,12 +220,14 @@ type ClientInterface interface { // The nextCursor is the next curosr can be used to read logs at next time. GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string, logGroupMaxCount int) (out []byte, nextCursor string, err error) + GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) // PullLogs gets logs from shard specified by shardId according cursor and endCursor. // The logGroupMaxCount is the max number of logGroup could be returned. // The nextCursor is the next cursor can be used to read logs at next time. // @note if you want to pull logs continuous, set endCursor = "" PullLogs(project, logstore string, shardID int, cursor, endCursor string, logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) + PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) // GetHistograms query logs with [from, to) time range GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) // GetLogs query logs with [from, to) time range diff --git a/client_store.go b/client_store.go index e1ea948e..bd1b169c 100644 --- a/client_store.go +++ b/client_store.go @@ -186,8 +186,20 @@ func (c *Client) GetPrevCursorTime(project, logstore string, shardID int, cursor // The nextCursor is the next curosr can be used to read logs at next time. func (c *Client) GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string, logGroupMaxCount int) (out []byte, nextCursor string, err error) { - ls := convertLogstore(c, project, logstore) - return ls.GetLogsBytes(shardID, cursor, endCursor, logGroupMaxCount) + plr := &PullLogRequest{ + Project: project, + Logstore: logstore, + ShardID: shardID, + Cursor: cursor, + EndCursor: endCursor, + LogGroupMaxCount: logGroupMaxCount, + } + return c.GetLogsBytesV2(plr) +} + +func (c *Client) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) { + ls := convertLogstore(c, plr.Project, plr.Logstore) + return ls.GetLogsBytesV2(plr) } // PullLogs gets logs from shard specified by shardId according cursor and endCursor. @@ -200,6 +212,11 @@ func (c *Client) PullLogs(project, logstore string, shardID int, cursor, endCurs return ls.PullLogs(shardID, cursor, endCursor, logGroupMaxCount) } +func (c *Client) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) { + ls := convertLogstore(c, plr.Project, plr.Logstore) + return ls.PullLogsV2(plr) +} + // GetHistograms query logs with [from, to) time range func (c *Client) GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) { ls := convertLogstore(c, project, logstore) diff --git a/consumer/config.go b/consumer/config.go index 66284961..97584b80 100644 --- a/consumer/config.go +++ b/consumer/config.go @@ -9,6 +9,7 @@ type LogHubConfig struct { //:param SecurityToken: If you use sts token to consume data, you must make sure consumer will be stopped before this token expired. //:param Project: //:param Logstore: + //:param Query: //:param ConsumerGroupName: //:param ConsumerName: //:param CursorPosition: This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed. @@ -43,6 +44,7 @@ type LogHubConfig struct { AccessKeySecret string Project string Logstore string + Query string ConsumerGroupName string ConsumerName string CursorPosition string diff --git a/consumer/consumer_client.go b/consumer/consumer_client.go index 4e788a37..f9ffea7f 100644 --- a/consumer/consumer_client.go +++ b/consumer/consumer_client.go @@ -127,10 +127,19 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, nextCursor string, rawSize int, err error) { var logBytes []byte + plr := &sls.PullLogRequest{ + Project: consumer.option.Project, + Logstore: consumer.option.Logstore, + ShardID: shardId, + Query: consumer.option.Query, + Cursor: cursor, + LogGroupMaxCount: consumer.option.MaxFetchLogGroupCount, + } + if plr.Query != "" { + plr.PullMode = "scan_on_stream" + } for retry := 0; retry < 3; retry++ { - logBytes, nextCursor, err = consumer.client.GetLogsBytes(consumer.option.Project, consumer.option.Logstore, shardId, - cursor, "", - consumer.option.MaxFetchLogGroupCount) + logBytes, nextCursor, err = consumer.client.GetLogsBytesV2(plr) if err == nil { rawSize = len(logBytes) gl, err = sls.LogsBytesDecode(logBytes) diff --git a/log_store.go b/log_store.go index eb98d75a..151914d9 100644 --- a/log_store.go +++ b/log_store.go @@ -437,25 +437,29 @@ func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error return cursor, nil } +func (s *LogStore) GetLogsBytes(shardID int, cursor, endCursor string, + logGroupMaxCount int) (out []byte, nextCursor string, err error) { + plr := &PullLogRequest{ + ShardID: shardID, + Cursor: cursor, + EndCursor: endCursor, + LogGroupMaxCount: logGroupMaxCount, + } + return s.GetLogsBytesV2(plr) +} + // GetLogsBytes gets logs binary data from shard specified by shardId according cursor and endCursor. // The logGroupMaxCount is the max number of logGroup could be returned. // The nextCursor is the next curosr can be used to read logs at next time. -func (s *LogStore) GetLogsBytes(shardID int, cursor, endCursor string, - logGroupMaxCount int) (out []byte, nextCursor string, err error) { +func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Accept": "application/x-protobuf", "Accept-Encoding": "lz4", } - uri := "" - if endCursor == "" { - uri = fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v", - s.Name, shardID, cursor, logGroupMaxCount) - } else { - uri = fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&end_cursor=%v&count=%v", - s.Name, shardID, cursor, endCursor, logGroupMaxCount) - } + urlVal := plr.ToURLParams() + uri := fmt.Sprintf("/logstores/%v/shards/%v?%s", s.Name, plr.ShardID, urlVal.Encode()) r, err := request(s.project, "GET", uri, h, nil) if err != nil { @@ -536,8 +540,18 @@ func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) { // @note if you want to pull logs continuous, set endCursor = "" func (s *LogStore) PullLogs(shardID int, cursor, endCursor string, logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) { + plr := &PullLogRequest{ + ShardID: shardID, + Cursor: cursor, + EndCursor: endCursor, + LogGroupMaxCount: logGroupMaxCount, + } + return s.PullLogsV2(plr) +} + +func (s *LogStore) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) { - out, nextCursor, err := s.GetLogsBytes(shardID, cursor, endCursor, logGroupMaxCount) + out, nextCursor, err := s.GetLogsBytesV2(plr) if err != nil { return nil, "", err } diff --git a/model.go b/model.go index f59714dd..ea2bd895 100644 --- a/model.go +++ b/model.go @@ -38,6 +38,35 @@ func (glr *GetLogRequest) ToURLParams() url.Values { return urlVal } +type PullLogRequest struct { + Project string + Logstore string + ShardID int + Cursor string + EndCursor string + LogGroupMaxCount int + Query string + PullMode string +} + +func (plr *PullLogRequest) ToURLParams() url.Values { + urlVal := url.Values{} + urlVal.Add("type", "logs") + urlVal.Add("cursor", plr.Cursor) + urlVal.Add("count", strconv.Itoa(plr.LogGroupMaxCount)) + if plr.EndCursor != "" { + urlVal.Add("end_cursor", plr.EndCursor) + } + if plr.Query != "" { + urlVal.Add("query", plr.Query) + } + if plr.PullMode != "" { + urlVal.Add("pullMode", plr.PullMode) + } + + return urlVal +} + // GetHistogramsResponse defines response from GetHistograms call type SingleHistogram struct { Progress string `json:"progress"` diff --git a/token_auto_update_client.go b/token_auto_update_client.go index 25d6504c..2e637ae0 100644 --- a/token_auto_update_client.go +++ b/token_auto_update_client.go @@ -770,8 +770,20 @@ func (c *TokenAutoUpdateClient) GetCursorTime(project, logstore string, shardID func (c *TokenAutoUpdateClient) GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string, logGroupMaxCount int) (out []byte, nextCursor string, err error) { + plr := &PullLogRequest{ + Project: project, + Logstore: logstore, + ShardID: shardID, + Cursor: cursor, + EndCursor: endCursor, + LogGroupMaxCount: logGroupMaxCount, + } + return c.GetLogsBytesV2(plr) +} + +func (c *TokenAutoUpdateClient) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) { for i := 0; i < c.maxTryTimes; i++ { - out, nextCursor, err = c.logClient.GetLogsBytes(project, logstore, shardID, cursor, endCursor, logGroupMaxCount) + out, nextCursor, err = c.logClient.GetLogsBytesV2(plr) if !c.processError(err) { return } @@ -781,8 +793,20 @@ func (c *TokenAutoUpdateClient) GetLogsBytes(project, logstore string, shardID i func (c *TokenAutoUpdateClient) PullLogs(project, logstore string, shardID int, cursor, endCursor string, logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) { + plr := &PullLogRequest{ + Project: project, + Logstore: logstore, + ShardID: shardID, + Cursor: cursor, + EndCursor: endCursor, + LogGroupMaxCount: logGroupMaxCount, + } + return c.PullLogsV2(plr) +} + +func (c *TokenAutoUpdateClient) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) { for i := 0; i < c.maxTryTimes; i++ { - gl, nextCursor, err = c.logClient.PullLogs(project, logstore, shardID, cursor, endCursor, logGroupMaxCount) + gl, nextCursor, err = c.logClient.PullLogsV2(plr) if !c.processError(err) { return }