Skip to content

Commit

Permalink
enhance post bytes add hash key (#215)
Browse files Browse the repository at this point in the history
* enhance post bytes

* enhance post bytes

* enhance post bytes
  • Loading branch information
EvanLjp authored Jul 27, 2023
1 parent 8d1518c commit e672da6
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 0 deletions.
2 changes: 2 additions & 0 deletions client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ type ClientInterface interface {
// PostLogStoreLogs put logs into Shard logstore by hashKey.
// The callers should transform user logs into LogGroup.
PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKey *string) (err error)
// PostRawLogWithCompressType put logs into logstore with specific compress type and hashKey.
PostRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int, hashKey *string) (err error)
// PutLogsWithCompressType put logs into logstore with specific compress type.
// The callers should transform user logs into LogGroup.
PutLogsWithCompressType(project, logstore string, lg *LogGroup, compressType int) (err error)
Expand Down
9 changes: 9 additions & 0 deletions client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ func (c *Client) PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKe
return ls.PostLogStoreLogs(lg, hashKey)
}

// PostRawLogWithCompressType put raw log data to log service, no marshal
func (c *Client) PostRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int, hashKey *string) (err error) {
ls := convertLogstore(c, project, logstore)
if err := ls.SetPutLogCompressType(compressType); err != nil {
return err
}
return ls.PostRawLogs(rawLogData, hashKey)
}

// PutLogsWithCompressType put logs into logstore with specific compress type.
// The callers should transform user logs into LogGroup.
func (c *Client) PutLogsWithCompressType(project, logstore string, lg *LogGroup, compressType int) (err error) {
Expand Down
62 changes: 62 additions & 0 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,68 @@ func (s *LogStore) PutRawLog(rawLogData []byte) (err error) {
return nil
}

func (s *LogStore) PostRawLogs(body []byte, hashKey *string) (err error) {
if len(body) == 0 {
// empty log group or empty hashkey
return nil
}

if hashKey == nil || *hashKey == "" {
// empty hash call PutLogs
return s.PutRawLog(body)
}

var out []byte
var h map[string]string
var outLen int
switch s.putLogCompressType {
case Compress_LZ4:
// Compresse body with lz4
out = make([]byte, lz4.CompressBlockBound(len(body)))
var hashTable [1 << 16]int
n, err := lz4.CompressBlock(body, out, hashTable[:])
if err != nil {
return NewClientError(err)
}
// copy incompressible data as lz4 format
if n == 0 {
n, _ = copyIncompressible(body, out)
}

h = map[string]string{
"x-log-compresstype": "lz4",
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = n
break
case Compress_None:
// no compress
out = body
h = map[string]string{
"x-log-bodyrawsize": strconv.Itoa(len(body)),
"Content-Type": "application/x-protobuf",
}
outLen = len(out)
}

uri := fmt.Sprintf("/logstores/%v/shards/route?key=%v", s.Name, *hashKey)
r, err := request(s.project, "POST", uri, h, out[:outLen])
if err != nil {
return NewClientError(err)
}
defer r.Body.Close()
body, _ = ioutil.ReadAll(r.Body)
if r.StatusCode != http.StatusOK {
err := new(Error)
if jErr := json.Unmarshal(body, err); jErr != nil {
return NewBadResponseError(string(body), r.Header, r.StatusCode)
}
return err
}
return nil
}

// PutLogs put logs into logstore.
// The callers should transform user logs into LogGroup.
func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
Expand Down
11 changes: 11 additions & 0 deletions token_auto_update_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,17 @@ func (c *TokenAutoUpdateClient) PostLogStoreLogs(project, logstore string, lg *L
return
}

// PostRawLogWithCompressType put raw log data to log service, no marshal
func (c *TokenAutoUpdateClient) PostRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int, hashKey *string) (err error) {
for i := 0; i < c.maxTryTimes; i++ {
err = c.logClient.PostRawLogWithCompressType(project, logstore, rawLogData, compressType, hashKey)
if !c.processError(err) {
return
}
}
return
}

// PutRawLogWithCompressType put raw log data to log service, no marshal
func (c *TokenAutoUpdateClient) PutRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int) (err error) {
for i := 0; i < c.maxTryTimes; i++ {
Expand Down

0 comments on commit e672da6

Please sign in to comment.