Skip to content

Commit

Permalink
Merge branch 'develop' into pranshuchittora/feat/getting-started-page
Browse files Browse the repository at this point in the history
  • Loading branch information
Pranshu Chittora authored Sep 12, 2022
2 parents d1baa96 + ac86d84 commit 0b2da2a
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 37 deletions.
51 changes: 49 additions & 2 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type ClickHouseReader struct {
logsResourceKeys string
queryEngine *promql.Engine
remoteStorage *remote.Storage
fanoutStorage *storage.Storage

promConfigFile string
promConfig *config.Config
Expand Down Expand Up @@ -143,7 +144,7 @@ func NewReader(localDB *sqlx.DB, configFile string) *ClickHouseReader {
}
}

func (r *ClickHouseReader) Start() {
func (r *ClickHouseReader) Start(readerReady chan bool) {
logLevel := promlog.AllowedLevel{}
logLevel.Set("debug")
// allowedFormat := promlog.AllowedFormat{}
Expand Down Expand Up @@ -311,6 +312,8 @@ func (r *ClickHouseReader) Start() {
}
r.queryEngine = queryEngine
r.remoteStorage = remoteStorage
r.fanoutStorage = &fanoutStorage
readerReady <- true

if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
Expand All @@ -319,6 +322,14 @@ func (r *ClickHouseReader) Start() {

}

func (r *ClickHouseReader) GetQueryEngine() *promql.Engine {
return r.queryEngine
}

func (r *ClickHouseReader) GetFanoutStorage() *storage.Storage {
return r.fanoutStorage
}

func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (promConfig *config.Config, err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)

Expand Down Expand Up @@ -2813,7 +2824,7 @@ func (r *ClickHouseReader) GetMetricResult(ctx context.Context, query string) ([

if err != nil {
zap.S().Debug("Error in processing query: ", err)
return nil, fmt.Errorf("error in processing query")
return nil, err
}

var (
Expand Down Expand Up @@ -3239,3 +3250,39 @@ func (r *ClickHouseReader) AggregateLogs(ctx context.Context, params *model.Logs

return &aggregateResponse, nil
}

func (r *ClickHouseReader) QueryDashboardVars(ctx context.Context, query string) (*model.DashboardVar, error) {
var result model.DashboardVar
rows, err := r.db.Query(ctx, query)

zap.S().Info(query)

if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, err
}

var (
columnTypes = rows.ColumnTypes()
vars = make([]interface{}, len(columnTypes))
)
for i := range columnTypes {
vars[i] = reflect.New(columnTypes[i].ScanType()).Interface()
}

defer rows.Close()
for rows.Next() {
if err := rows.Scan(vars...); err != nil {
return nil, err
}
for _, v := range vars {
switch v := v.(type) {
case *string, *int8, *int16, *int32, *int64, *uint8, *uint16, *uint32, *uint64, *float32, *float64, *time.Time, *bool:
result.VariableValues = append(result.VariableValues, reflect.ValueOf(v).Elem().Interface())
default:
return nil, fmt.Errorf("unsupported value type encountered")
}
}
}
return &result, nil
}
94 changes: 74 additions & 20 deletions pkg/query-service/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"io/ioutil"
"net/http"
"strconv"
"strings"
"sync"
"text/template"
"time"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -320,6 +322,7 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/api/v1/dashboards/{uuid}", ViewAccess(aH.getDashboard)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/dashboards/{uuid}", EditAccess(aH.updateDashboard)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/dashboards/{uuid}", EditAccess(aH.deleteDashboard)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/variables/query", ViewAccess(aH.queryDashboardVars)).Methods(http.MethodGet)

router.HandleFunc("/api/v1/feedback", OpenAccess(aH.submitFeedback)).Methods(http.MethodPost)
// router.HandleFunc("/api/v1/get_percentiles", aH.getApplicationPercentiles).Methods(http.MethodGet)
Expand Down Expand Up @@ -483,9 +486,11 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
type channelResult struct {
Series []*model.Series
Err error
Name string
Query string
}

execClickHouseQueries := func(queries map[string]string) ([]*model.Series, error) {
execClickHouseQueries := func(queries map[string]string) ([]*model.Series, error, map[string]string) {
var seriesList []*model.Series
ch := make(chan channelResult, len(queries))
var wg sync.WaitGroup
Expand All @@ -500,7 +505,7 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
}

if err != nil {
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err)}
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query}
return
}
ch <- channelResult{Series: seriesList}
Expand All @@ -511,21 +516,23 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
close(ch)

var errs []error
errQuriesByName := make(map[string]string)
// read values from the channel
for r := range ch {
if r.Err != nil {
errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Query
continue
}
seriesList = append(seriesList, r.Series...)
}
if len(errs) != 0 {
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n"))
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n")), errQuriesByName
}
return seriesList, nil
return seriesList, nil, nil
}

execPromQueries := func(metricsQueryRangeParams *model.QueryRangeParamsV2) ([]*model.Series, error) {
execPromQueries := func(metricsQueryRangeParams *model.QueryRangeParamsV2) ([]*model.Series, error, map[string]string) {
var seriesList []*model.Series
ch := make(chan channelResult, len(metricsQueryRangeParams.CompositeMetricQuery.PromQueries))
var wg sync.WaitGroup
Expand All @@ -538,6 +545,19 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
go func(name string, query *model.PromQuery) {
var seriesList []*model.Series
defer wg.Done()
tmpl := template.New("promql-query")
tmpl, tmplErr := tmpl.Parse(query.Query)
if tmplErr != nil {
ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query}
return
}
var queryBuf bytes.Buffer
tmplErr = tmpl.Execute(&queryBuf, metricsQueryRangeParams.Variables)
if tmplErr != nil {
ch <- channelResult{Err: fmt.Errorf("error in parsing query-%s: %v", name, tmplErr), Name: name, Query: query.Query}
return
}
query.Query = queryBuf.String()
queryModel := model.QueryRangeParams{
Start: time.UnixMilli(metricsQueryRangeParams.Start),
End: time.UnixMilli(metricsQueryRangeParams.End),
Expand All @@ -546,7 +566,7 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
}
promResult, _, err := (*aH.reader).GetQueryRangeResult(r.Context(), &queryModel)
if err != nil {
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err)}
ch <- channelResult{Err: fmt.Errorf("error in query-%s: %v", name, err), Name: name, Query: query.Query}
return
}
matrix, _ := promResult.Matrix()
Expand All @@ -567,51 +587,66 @@ func (aH *APIHandler) queryRangeMetricsV2(w http.ResponseWriter, r *http.Request
close(ch)

var errs []error
errQuriesByName := make(map[string]string)
// read values from the channel
for r := range ch {
if r.Err != nil {
errs = append(errs, r.Err)
errQuriesByName[r.Name] = r.Query
continue
}
seriesList = append(seriesList, r.Series...)
}
if len(errs) != 0 {
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n"))
return nil, fmt.Errorf("encountered multiple errors: %s", metrics.FormatErrs(errs, "\n")), errQuriesByName
}
return seriesList, nil
return seriesList, nil, nil
}

var seriesList []*model.Series
var err error
var errQuriesByName map[string]string
switch metricsQueryRangeParams.CompositeMetricQuery.QueryType {
case model.QUERY_BUILDER:
runQueries := metrics.PrepareBuilderMetricQueries(metricsQueryRangeParams, constants.SIGNOZ_TIMESERIES_TABLENAME)
if runQueries.Err != nil {
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: runQueries.Err}, nil)
return
}
seriesList, err = execClickHouseQueries(runQueries.Queries)
seriesList, err, errQuriesByName = execClickHouseQueries(runQueries.Queries)

case model.CLICKHOUSE:
queries := make(map[string]string)
for name, chQuery := range metricsQueryRangeParams.CompositeMetricQuery.ClickHouseQueries {
if chQuery.Disabled {
continue
}
queries[name] = chQuery.Query
tmpl := template.New("clickhouse-query")
tmpl, err := tmpl.Parse(chQuery.Query)
if err != nil {
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
var query bytes.Buffer
err = tmpl.Execute(&query, metricsQueryRangeParams.Variables)
if err != nil {
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
queries[name] = query.String()
}
seriesList, err = execClickHouseQueries(queries)
seriesList, err, errQuriesByName = execClickHouseQueries(queries)
case model.PROM:
seriesList, err = execPromQueries(metricsQueryRangeParams)
seriesList, err, errQuriesByName = execPromQueries(metricsQueryRangeParams)
default:
err = fmt.Errorf("invalid query type")
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, errQuriesByName)
return
}

if err != nil {
apiErrObj := &model.ApiError{Typ: model.ErrorBadData, Err: err}
respondError(w, apiErrObj, nil)
respondError(w, apiErrObj, errQuriesByName)
return
}
if metricsQueryRangeParams.CompositeMetricQuery.PanelType == model.QUERY_VALUE &&
Expand Down Expand Up @@ -707,6 +742,25 @@ func (aH *APIHandler) deleteDashboard(w http.ResponseWriter, r *http.Request) {

}

func (aH *APIHandler) queryDashboardVars(w http.ResponseWriter, r *http.Request) {

query := r.URL.Query().Get("query")
if query == "" {
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("query is required")}, nil)
return
}
if strings.Contains(strings.ToLower(query), "alter table") {
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("query shouldn't alter data")}, nil)
return
}
dashboardVars, err := (*aH.reader).QueryDashboardVars(r.Context(), query)
if err != nil {
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
aH.respond(w, dashboardVars)
}

func (aH *APIHandler) updateDashboard(w http.ResponseWriter, r *http.Request) {

uuid := mux.Vars(r)["uuid"]
Expand Down Expand Up @@ -1034,11 +1088,11 @@ func (aH *APIHandler) queryRangeMetrics(w http.ResponseWriter, r *http.Request)
if res.Err != nil {
switch res.Err.(type) {
case promql.ErrQueryCanceled:
respondError(w, &model.ApiError{model.ErrorCanceled, res.Err}, nil)
respondError(w, &model.ApiError{Typ: model.ErrorCanceled, Err: res.Err}, nil)
case promql.ErrQueryTimeout:
respondError(w, &model.ApiError{model.ErrorTimeout, res.Err}, nil)
respondError(w, &model.ApiError{Typ: model.ErrorTimeout, Err: res.Err}, nil)
}
respondError(w, &model.ApiError{model.ErrorExec, res.Err}, nil)
respondError(w, &model.ApiError{Typ: model.ErrorExec, Err: res.Err}, nil)
}

response_data := &model.QueryData{
Expand Down Expand Up @@ -1088,11 +1142,11 @@ func (aH *APIHandler) queryMetrics(w http.ResponseWriter, r *http.Request) {
if res.Err != nil {
switch res.Err.(type) {
case promql.ErrQueryCanceled:
respondError(w, &model.ApiError{model.ErrorCanceled, res.Err}, nil)
respondError(w, &model.ApiError{Typ: model.ErrorCanceled, Err: res.Err}, nil)
case promql.ErrQueryTimeout:
respondError(w, &model.ApiError{model.ErrorTimeout, res.Err}, nil)
respondError(w, &model.ApiError{Typ: model.ErrorTimeout, Err: res.Err}, nil)
}
respondError(w, &model.ApiError{model.ErrorExec, res.Err}, nil)
respondError(w, &model.ApiError{Typ: model.ErrorExec, Err: res.Err}, nil)
}

response_data := &model.QueryData{
Expand Down
Loading

0 comments on commit 0b2da2a

Please sign in to comment.