Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add query for client #220

Merged
merged 5 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,14 @@ type ClientInterface interface {
// The nextCursor is the next curosr can be used to read logs at next time.
GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error)
GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, nextCursor string, err error)
// PullLogs gets logs from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next cursor can be used to read logs at next time.
// @note if you want to pull logs continuous, set endCursor = ""
PullLogs(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error)
PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我想这个common的方法可以改个名字:PullLogsV2之类的?不要过于突出 Query(一些用户用不到这个功能,反而会多问)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已改为PullLogsV2,PullLogRequest放到了model.go里面

// GetHistograms query logs with [from, to) time range
GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error)
// GetLogs query logs with [from, to) time range
Expand Down
32 changes: 30 additions & 2 deletions client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ import (
"github.com/go-kit/kit/log/level"
)

type PullLogRequest struct {
Project string
Logstore string
ShardID int
Cursor string
EndCursor string
LogGroupMaxCount int
Query string
PullMode string
}

func convertLogstore(c *Client, project, logstore string) *LogStore {
c.accessKeyLock.RLock()
proj := convertLocked(c, project)
Expand Down Expand Up @@ -186,8 +197,20 @@ func (c *Client) GetPrevCursorTime(project, logstore string, shardID int, cursor
// The nextCursor is the next curosr can be used to read logs at next time.
func (c *Client) GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogsBytes(shardID, cursor, endCursor, logGroupMaxCount)
plr := &PullLogRequest{
Project: project,
Logstore: logstore,
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.GetLogsBytesWithQuery(plr)
}

func (c *Client) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.GetLogsBytesWithQuery(plr)
}

// PullLogs gets logs from shard specified by shardId according cursor and endCursor.
Expand All @@ -200,6 +223,11 @@ func (c *Client) PullLogs(project, logstore string, shardID int, cursor, endCurs
return ls.PullLogs(shardID, cursor, endCursor, logGroupMaxCount)
}

func (c *Client) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.PullLogsWithQuery(plr)
}

// GetHistograms query logs with [from, to) time range
func (c *Client) GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) {
ls := convertLogstore(c, project, logstore)
Expand Down
2 changes: 2 additions & 0 deletions consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type LogHubConfig struct {
//:param SecurityToken: If you use sts token to consume data, you must make sure consumer will be stopped before this token expired.
//:param Project:
//:param Logstore:
//:param Query:
//:param ConsumerGroupName:
//:param ConsumerName:
//:param CursorPosition: This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed.
Expand Down Expand Up @@ -43,6 +44,7 @@ type LogHubConfig struct {
AccessKeySecret string
Project string
Logstore string
Query string
ConsumerGroupName string
ConsumerName string
CursorPosition string
Expand Down
15 changes: 12 additions & 3 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,19 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err

func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, nextCursor string, rawSize int, err error) {
var logBytes []byte
plr := &sls.PullLogRequest{
Project: consumer.option.Project,
Logstore: consumer.option.Logstore,
ShardID: shardId,
Query: consumer.option.Query,
Cursor: cursor,
LogGroupMaxCount: consumer.option.MaxFetchLogGroupCount,
}
if plr.Query != "" {
plr.PullMode = "scan_on_stream"
}
for retry := 0; retry < 3; retry++ {
logBytes, nextCursor, err = consumer.client.GetLogsBytes(consumer.option.Project, consumer.option.Logstore, shardId,
cursor, "",
consumer.option.MaxFetchLogGroupCount)
logBytes, nextCursor, err = consumer.client.GetLogsBytesWithQuery(plr)
if err == nil {
rawSize = len(logBytes)
gl, err = sls.LogsBytesDecode(logBytes)
Expand Down
43 changes: 33 additions & 10 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,24 +437,37 @@ func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error
return cursor, nil
}

func (s *LogStore) GetLogsBytes(shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
plr := &PullLogRequest{
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return s.GetLogsBytesWithQuery(plr)
}

// GetLogsBytes gets logs binary data from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (s *LogStore) GetLogsBytes(shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Accept": "application/x-protobuf",
"Accept-Encoding": "lz4",
}

uri := ""
if endCursor == "" {
uri = fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v",
s.Name, shardID, cursor, logGroupMaxCount)
} else {
uri = fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&end_cursor=%v&count=%v",
s.Name, shardID, cursor, endCursor, logGroupMaxCount)
uri := fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v",
s.Name, plr.ShardID, plr.Cursor, plr.LogGroupMaxCount)
if plr.EndCursor != "" {
uri += fmt.Sprintf("&end_cursor=%v", plr.EndCursor)
}
if plr.Query != "" {
uri += fmt.Sprintf("&query=%v", plr.Query)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

裸拼字符串query字段可能有问题,建议整体做一下escape
image

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
if plr.PullMode != "" {
uri += fmt.Sprintf("&pullMode=%v", plr.PullMode)
}

r, err := request(s.project, "GET", uri, h, nil)
Expand Down Expand Up @@ -536,8 +549,18 @@ func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) {
// @note if you want to pull logs continuous, set endCursor = ""
func (s *LogStore) PullLogs(shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
plr := &PullLogRequest{
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return s.PullLogsWithQuery(plr)
}

func (s *LogStore) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {

out, nextCursor, err := s.GetLogsBytes(shardID, cursor, endCursor, logGroupMaxCount)
out, nextCursor, err := s.GetLogsBytesWithQuery(plr)
if err != nil {
return nil, "", err
}
Expand Down
28 changes: 26 additions & 2 deletions token_auto_update_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,20 @@ func (c *TokenAutoUpdateClient) GetCursorTime(project, logstore string, shardID

func (c *TokenAutoUpdateClient) GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
plr := &PullLogRequest{
Project: project,
Logstore: logstore,
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.GetLogsBytesWithQuery(plr)
}

func (c *TokenAutoUpdateClient) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
for i := 0; i < c.maxTryTimes; i++ {
out, nextCursor, err = c.logClient.GetLogsBytes(project, logstore, shardID, cursor, endCursor, logGroupMaxCount)
out, nextCursor, err = c.logClient.GetLogsBytesWithQuery(plr)
if !c.processError(err) {
return
}
Expand All @@ -781,8 +793,20 @@ func (c *TokenAutoUpdateClient) GetLogsBytes(project, logstore string, shardID i

func (c *TokenAutoUpdateClient) PullLogs(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
plr := &PullLogRequest{
Project: project,
Logstore: logstore,
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.PullLogsWithQuery(plr)
}

func (c *TokenAutoUpdateClient) PullLogsWithQuery(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {
for i := 0; i < c.maxTryTimes; i++ {
gl, nextCursor, err = c.logClient.PullLogs(project, logstore, shardID, cursor, endCursor, logGroupMaxCount)
gl, nextCursor, err = c.logClient.PullLogsWithQuery(plr)
if !c.processError(err) {
return
}
Expand Down
Loading