Skip to content

Commit

Permalink
Merge pull request #220 from panawala/feature/add_query_for_client
Browse files Browse the repository at this point in the history
add query for client
  • Loading branch information
shabicheng authored Aug 15, 2023
2 parents 0ac9c47 + de70b94 commit 8c55f03
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 18 deletions.
2 changes: 2 additions & 0 deletions client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,14 @@ type ClientInterface interface {
// The nextCursor is the next curosr can be used to read logs at next time.
GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error)
GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error)
// PullLogs gets logs from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next cursor can be used to read logs at next time.
// @note if you want to pull logs continuous, set endCursor = ""
PullLogs(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error)
PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error)
// GetHistograms query logs with [from, to) time range
GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error)
// GetLogs query logs with [from, to) time range
Expand Down
21 changes: 19 additions & 2 deletions client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,20 @@ func (c *Client) GetPrevCursorTime(project, logstore string, shardID int, cursor
// The nextCursor is the next curosr can be used to read logs at next time.
func (c *Client) GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogsBytes(shardID, cursor, endCursor, logGroupMaxCount)
plr := &PullLogRequest{
Project: project,
Logstore: logstore,
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.GetLogsBytesV2(plr)
}

func (c *Client) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.GetLogsBytesV2(plr)
}

// PullLogs gets logs from shard specified by shardId according cursor and endCursor.
Expand All @@ -200,6 +212,11 @@ func (c *Client) PullLogs(project, logstore string, shardID int, cursor, endCurs
return ls.PullLogs(shardID, cursor, endCursor, logGroupMaxCount)
}

func (c *Client) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.PullLogsV2(plr)
}

// GetHistograms query logs with [from, to) time range
func (c *Client) GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) {
ls := convertLogstore(c, project, logstore)
Expand Down
2 changes: 2 additions & 0 deletions consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type LogHubConfig struct {
//:param SecurityToken: If you use sts token to consume data, you must make sure consumer will be stopped before this token expired.
//:param Project:
//:param Logstore:
//:param Query:
//:param ConsumerGroupName:
//:param ConsumerName:
//:param CursorPosition: This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed.
Expand Down Expand Up @@ -43,6 +44,7 @@ type LogHubConfig struct {
AccessKeySecret string
Project string
Logstore string
Query string
ConsumerGroupName string
ConsumerName string
CursorPosition string
Expand Down
15 changes: 12 additions & 3 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,19 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err

func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, nextCursor string, rawSize int, err error) {
var logBytes []byte
plr := &sls.PullLogRequest{
Project: consumer.option.Project,
Logstore: consumer.option.Logstore,
ShardID: shardId,
Query: consumer.option.Query,
Cursor: cursor,
LogGroupMaxCount: consumer.option.MaxFetchLogGroupCount,
}
if plr.Query != "" {
plr.PullMode = "scan_on_stream"
}
for retry := 0; retry < 3; retry++ {
logBytes, nextCursor, err = consumer.client.GetLogsBytes(consumer.option.Project, consumer.option.Logstore, shardId,
cursor, "",
consumer.option.MaxFetchLogGroupCount)
logBytes, nextCursor, err = consumer.client.GetLogsBytesV2(plr)
if err == nil {
rawSize = len(logBytes)
gl, err = sls.LogsBytesDecode(logBytes)
Expand Down
36 changes: 25 additions & 11 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,25 +437,29 @@ func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error
return cursor, nil
}

func (s *LogStore) GetLogsBytes(shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
plr := &PullLogRequest{
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return s.GetLogsBytesV2(plr)
}

// GetLogsBytes gets logs binary data from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (s *LogStore) GetLogsBytes(shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Accept": "application/x-protobuf",
"Accept-Encoding": "lz4",
}

uri := ""
if endCursor == "" {
uri = fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v",
s.Name, shardID, cursor, logGroupMaxCount)
} else {
uri = fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&end_cursor=%v&count=%v",
s.Name, shardID, cursor, endCursor, logGroupMaxCount)
}
urlVal := plr.ToURLParams()
uri := fmt.Sprintf("/logstores/%v/shards/%v?%s", s.Name, plr.ShardID, urlVal.Encode())

r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
Expand Down Expand Up @@ -536,8 +540,18 @@ func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) {
// @note if you want to pull logs continuous, set endCursor = ""
func (s *LogStore) PullLogs(shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
plr := &PullLogRequest{
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return s.PullLogsV2(plr)
}

func (s *LogStore) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {

out, nextCursor, err := s.GetLogsBytes(shardID, cursor, endCursor, logGroupMaxCount)
out, nextCursor, err := s.GetLogsBytesV2(plr)
if err != nil {
return nil, "", err
}
Expand Down
29 changes: 29 additions & 0 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,35 @@ func (glr *GetLogRequest) ToURLParams() url.Values {
return urlVal
}

type PullLogRequest struct {
Project string
Logstore string
ShardID int
Cursor string
EndCursor string
LogGroupMaxCount int
Query string
PullMode string
}

func (plr *PullLogRequest) ToURLParams() url.Values {
urlVal := url.Values{}
urlVal.Add("type", "logs")
urlVal.Add("cursor", plr.Cursor)
urlVal.Add("count", strconv.Itoa(plr.LogGroupMaxCount))
if plr.EndCursor != "" {
urlVal.Add("end_cursor", plr.EndCursor)
}
if plr.Query != "" {
urlVal.Add("query", plr.Query)
}
if plr.PullMode != "" {
urlVal.Add("pullMode", plr.PullMode)
}

return urlVal
}

// GetHistogramsResponse defines response from GetHistograms call
type SingleHistogram struct {
Progress string `json:"progress"`
Expand Down
28 changes: 26 additions & 2 deletions token_auto_update_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,20 @@ func (c *TokenAutoUpdateClient) GetCursorTime(project, logstore string, shardID

func (c *TokenAutoUpdateClient) GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
plr := &PullLogRequest{
Project: project,
Logstore: logstore,
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.GetLogsBytesV2(plr)
}

func (c *TokenAutoUpdateClient) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
for i := 0; i < c.maxTryTimes; i++ {
out, nextCursor, err = c.logClient.GetLogsBytes(project, logstore, shardID, cursor, endCursor, logGroupMaxCount)
out, nextCursor, err = c.logClient.GetLogsBytesV2(plr)
if !c.processError(err) {
return
}
Expand All @@ -781,8 +793,20 @@ func (c *TokenAutoUpdateClient) GetLogsBytes(project, logstore string, shardID i

func (c *TokenAutoUpdateClient) PullLogs(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
plr := &PullLogRequest{
Project: project,
Logstore: logstore,
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.PullLogsV2(plr)
}

func (c *TokenAutoUpdateClient) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {
for i := 0; i < c.maxTryTimes; i++ {
gl, nextCursor, err = c.logClient.PullLogs(project, logstore, shardID, cursor, endCursor, logGroupMaxCount)
gl, nextCursor, err = c.logClient.PullLogsV2(plr)
if !c.processError(err) {
return
}
Expand Down

0 comments on commit 8c55f03

Please sign in to comment.