From e672da69aabc623f21df6a40a2f92a8b41473065 Mon Sep 17 00:00:00 2001 From: Evan <31562192+EvanLjp@users.noreply.github.com> Date: Thu, 27 Jul 2023 15:46:46 +0800 Subject: [PATCH] enhance post bytes add hash key (#215) * enhance post bytes * enhance post bytes * enhance post bytes --- client_interface.go | 2 ++ client_store.go | 9 ++++++ log_store.go | 62 +++++++++++++++++++++++++++++++++++++ token_auto_update_client.go | 11 +++++++ 4 files changed, 84 insertions(+) diff --git a/client_interface.go b/client_interface.go index fdc298f2..1ba9b85f 100644 --- a/client_interface.go +++ b/client_interface.go @@ -201,6 +201,8 @@ type ClientInterface interface { // PostLogStoreLogs put logs into Shard logstore by hashKey. // The callers should transform user logs into LogGroup. PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKey *string) (err error) + // PostRawLogWithCompressType put logs into logstore with specific compress type and hashKey. + PostRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int, hashKey *string) (err error) // PutLogsWithCompressType put logs into logstore with specific compress type. // The callers should transform user logs into LogGroup. PutLogsWithCompressType(project, logstore string, lg *LogGroup, compressType int) (err error) diff --git a/client_store.go b/client_store.go index 99de9110..e1ea948e 100644 --- a/client_store.go +++ b/client_store.go @@ -103,6 +103,15 @@ func (c *Client) PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKe return ls.PostLogStoreLogs(lg, hashKey) } +// PostRawLogWithCompressType put raw log data to log service, no marshal +func (c *Client) PostRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int, hashKey *string) (err error) { + ls := convertLogstore(c, project, logstore) + if err := ls.SetPutLogCompressType(compressType); err != nil { + return err + } + return ls.PostRawLogs(rawLogData, hashKey) +} + // PutLogsWithCompressType put logs into logstore with specific compress type. // The callers should transform user logs into LogGroup. func (c *Client) PutLogsWithCompressType(project, logstore string, lg *LogGroup, compressType int) (err error) { diff --git a/log_store.go b/log_store.go index f9bf3f68..eb98d75a 100644 --- a/log_store.go +++ b/log_store.go @@ -195,6 +195,68 @@ func (s *LogStore) PutRawLog(rawLogData []byte) (err error) { return nil } +func (s *LogStore) PostRawLogs(body []byte, hashKey *string) (err error) { + if len(body) == 0 { + // empty log group or empty hashkey + return nil + } + + if hashKey == nil || *hashKey == "" { + // empty hash call PutLogs + return s.PutRawLog(body) + } + + var out []byte + var h map[string]string + var outLen int + switch s.putLogCompressType { + case Compress_LZ4: + // Compresse body with lz4 + out = make([]byte, lz4.CompressBlockBound(len(body))) + var hashTable [1 << 16]int + n, err := lz4.CompressBlock(body, out, hashTable[:]) + if err != nil { + return NewClientError(err) + } + // copy incompressible data as lz4 format + if n == 0 { + n, _ = copyIncompressible(body, out) + } + + h = map[string]string{ + "x-log-compresstype": "lz4", + "x-log-bodyrawsize": strconv.Itoa(len(body)), + "Content-Type": "application/x-protobuf", + } + outLen = n + break + case Compress_None: + // no compress + out = body + h = map[string]string{ + "x-log-bodyrawsize": strconv.Itoa(len(body)), + "Content-Type": "application/x-protobuf", + } + outLen = len(out) + } + + uri := fmt.Sprintf("/logstores/%v/shards/route?key=%v", s.Name, *hashKey) + r, err := request(s.project, "POST", uri, h, out[:outLen]) + if err != nil { + return NewClientError(err) + } + defer r.Body.Close() + body, _ = ioutil.ReadAll(r.Body) + if r.StatusCode != http.StatusOK { + err := new(Error) + if jErr := json.Unmarshal(body, err); jErr != nil { + return NewBadResponseError(string(body), r.Header, r.StatusCode) + } + return err + } + return nil +} + // PutLogs put logs into logstore. // The callers should transform user logs into LogGroup. func (s *LogStore) PutLogs(lg *LogGroup) (err error) { diff --git a/token_auto_update_client.go b/token_auto_update_client.go index 28b22621..25d6504c 100644 --- a/token_auto_update_client.go +++ b/token_auto_update_client.go @@ -716,6 +716,17 @@ func (c *TokenAutoUpdateClient) PostLogStoreLogs(project, logstore string, lg *L return } +// PostRawLogWithCompressType put raw log data to log service, no marshal +func (c *TokenAutoUpdateClient) PostRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int, hashKey *string) (err error) { + for i := 0; i < c.maxTryTimes; i++ { + err = c.logClient.PostRawLogWithCompressType(project, logstore, rawLogData, compressType, hashKey) + if !c.processError(err) { + return + } + } + return +} + // PutRawLogWithCompressType put raw log data to log service, no marshal func (c *TokenAutoUpdateClient) PutRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int) (err error) { for i := 0; i < c.maxTryTimes; i++ {