Skip to content

Commit

Permalink
logs ttl support added in ttl api
Browse files Browse the repository at this point in the history
  • Loading branch information
nityanandagohain committed Aug 4, 2022
1 parent 9dcf913 commit 61ebd3a
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 5 deletions.
96 changes: 95 additions & 1 deletion pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1888,7 +1888,7 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
return &GetFilteredSpansAggregatesResponse, nil
}

// SetTTL sets the TTL for traces or metrics tables.
// SetTTL sets the TTL for traces or metrics or logs tables.
// This is an async API which creates goroutines to set TTL.
// Status of TTL update is tracked with ttl_status table in sqlite db.
func (r *ClickHouseReader) SetTTL(ctx context.Context,
Expand Down Expand Up @@ -2017,6 +2017,59 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
return
}
}(tableName)
case constants.LogsTTL:
tableName = r.logsDB + "." + r.logsTable
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending {
return nil, &model.ApiError{Typ: model.ErrorConflict, Err: fmt.Errorf("TTL is already running")}
}
go func(tableName string) {
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (transaction_id, created_at, updated_at, table_name, ttl, status, cold_storage_ttl) VALUES (?, ?, ?, ?, ?, ?, ?)", uuid, time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending, coldStorageDuration)
if dbErr != nil {
zap.S().Error(fmt.Errorf("error in inserting to ttl_status table: %s", dbErr.Error()))
return
}
req = fmt.Sprintf(
"ALTER TABLE %v MODIFY TTL toDateTime(timestamp / 1000000000) + "+
"INTERVAL %v SECOND DELETE", tableName, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(timestamp / 1000000000)"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
}
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil {
zap.S().Error(fmt.Errorf("error in setting cold storage: %s", err.Err.Error()))
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
if dbErr != nil {
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
return
}
}
return
}
zap.S().Debugf("Executing TTL request: %s\n", req)
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
if err := r.db.Exec(ctx, req); err != nil {
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
if dbErr != nil {
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
return
}
return
}
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id)
if dbErr != nil {
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
return
}
}(tableName)

default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v",
Expand Down Expand Up @@ -2180,6 +2233,24 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
}
}

getLogsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
var dbResp []model.DBResponseTTL

query := fmt.Sprintf("SELECT engine_full FROM system.tables WHERE name='%v' AND database='%v'", r.logsTable, r.logsDB)

err := r.db.Select(ctx, &dbResp, query)

if err != nil {
zap.S().Error(fmt.Errorf("error while getting ttl. Err=%v", err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. Err=%v", err)}
}
if len(dbResp) == 0 {
return nil, nil
} else {
return &dbResp[0], nil
}
}

switch ttlParams.Type {
case constants.TraceTTL:
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable}
Expand Down Expand Up @@ -2224,6 +2295,29 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa

delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, ExpectedMetricsTime: ttlQuery.TTL, ExpectedMetricsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil

case constants.LogsTTL:
tableNameArray := []string{r.logsDB + "." + r.logsTable}
status, err := r.setTTLQueryStatus(ctx, tableNameArray)
if err != nil {
return nil, err
}
dbResp, err := getLogsTTL()
if err != nil {
return nil, err
}
ttlQuery, err := r.checkTTLStatusItem(ctx, tableNameArray[0])
if err != nil {
return nil, err
}
ttlQuery.TTL = ttlQuery.TTL / 3600 // convert to hours
if ttlQuery.ColdStorageTtl != -1 {
ttlQuery.ColdStorageTtl = ttlQuery.ColdStorageTtl / 3600 // convert to hours
}

delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{LogsTime: delTTL, LogsMoveTime: moveTTL, ExpectedLogsTime: ttlQuery.TTL, ExpectedLogsMoveTime: ttlQuery.ColdStorageTtl, Status: status}, nil

default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while getting ttl. ttl type should be metrics|traces, got %v",
ttlParams.Type)}
Expand Down
8 changes: 4 additions & 4 deletions pkg/query-service/app/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,8 @@ func parseCountErrorsRequest(r *http.Request) (*model.CountErrorsParams, error)
}

params := &model.CountErrorsParams{
Start: startTime,
End: endTime,
Start: startTime,
End: endTime,
}

return params, nil
Expand Down Expand Up @@ -590,7 +590,7 @@ func parseTTLParams(r *http.Request) (*model.TTLParams, error) {
}

// Validate the type parameter
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL {
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL && typeTTL != constants.LogsTTL {
return nil, fmt.Errorf("type param should be metrics|traces, got %v", typeTTL)
}

Expand Down Expand Up @@ -629,7 +629,7 @@ func parseGetTTL(r *http.Request) (*model.GetTTLParams, error) {
return nil, fmt.Errorf("type param cannot be empty from the query")
} else {
// Validate the type parameter
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL {
if typeTTL != constants.TraceTTL && typeTTL != constants.MetricsTTL && typeTTL != constants.LogsTTL {
return nil, fmt.Errorf("type param should be metrics|traces, got %v", typeTTL)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/query-service/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func IsTelemetryEnabled() bool {

const TraceTTL = "traces"
const MetricsTTL = "metrics"
const LogsTTL = "logs"

func GetAlertManagerApiPrefix() string {
if os.Getenv("ALERTMANAGER_API_PREFIX") != "" {
Expand Down
4 changes: 4 additions & 0 deletions pkg/query-service/model/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,14 @@ type GetTTLResponseItem struct {
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
LogsTime int `json:"logs_ttl_duration_hrs,omitempty"`
LogsMoveTime int `json:"logs_move_ttl_duration_hrs,omitempty"`
ExpectedMetricsTime int `json:"expected_metrics_ttl_duration_hrs,omitempty"`
ExpectedMetricsMoveTime int `json:"expected_metrics_move_ttl_duration_hrs,omitempty"`
ExpectedTracesTime int `json:"expected_traces_ttl_duration_hrs,omitempty"`
ExpectedTracesMoveTime int `json:"expected_traces_move_ttl_duration_hrs,omitempty"`
ExpectedLogsTime int `json:"expected_logs_ttl_duration_hrs,omitempty"`
ExpectedLogsMoveTime int `json:"expected_logs_move_ttl_duration_hrs,omitempty"`
Status string `json:"status"`
}

Expand Down

0 comments on commit 61ebd3a

Please sign in to comment.