Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: QS package for integrations #4578

Merged
merged 12 commits into from
Feb 28, 2024
Prev Previous commit
Next Next commit
chore: integrations v0 qs API: refactor installed integration struct
  • Loading branch information
raj-k-singh committed Feb 22, 2024
commit e6969dd6897485a188008a78d38a7eb01c42bdcb
52 changes: 46 additions & 6 deletions pkg/query-service/app/integrations/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package integrations

import (
"context"
"fmt"
"strings"

"go.signoz.io/signoz/pkg/query-service/app/logparsingpipeline"
"go.signoz.io/signoz/pkg/query-service/model"
Expand Down Expand Up @@ -63,13 +65,38 @@ func (m *Manager) ListInstalledIntegrations(
) ([]Integration, *model.ApiError) {
installed, apiErr := m.installedIntegrationsRepo.list(ctx)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "could not fetch installed integrations data")
return nil, model.WrapApiError(
apiErr, "could not fetch installed integrations data",
)
}

integrationIds := []string{}
for _, ii := range installed {
integrationIds = append(integrationIds, ii.IntegrationId)
}
integrationDetails, apiErr := m.availableIntegrationsRepo.get(ctx, integrationIds)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "could not fetch integrations details",
)
}
if len(integrationDetails) != len(integrationIds) {
missingIds := []string{}
for _, iid := range integrationIds {
if _, exists := integrationDetails[iid]; !exists {
missingIds = append(missingIds, iid)
}
}
return nil, model.NotFoundError(fmt.Errorf(
"could not get details for all installed integrations with id: %s",
strings.Join(missingIds, ", "),
))
}

result := []Integration{}
for _, ii := range installed {
result = append(result, Integration{
IntegrationDetails: ii.IntegrationDetails,
IntegrationDetails: integrationDetails[ii.IntegrationId],
IsInstalled: true,
})
}
Expand All @@ -80,18 +107,31 @@ func (m *Manager) InstallIntegration(
ctx context.Context,
integrationId string,
) (*Integration, *model.ApiError) {
ai, apiErr := m.availableIntegrationsRepo.get(ctx, integrationId)
ais, apiErr := m.availableIntegrationsRepo.get(
ctx, []string{integrationId},
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "could not find integration to be installed")
return nil, model.WrapApiError(apiErr, fmt.Sprintf(
"could not find integration to be installed: %s", integrationId,
))
}

integrationDetails, wasFound := ais[integrationId]
if !wasFound {
return nil, model.NotFoundError(fmt.Errorf(
"could not find integration to be installed: %s", integrationId,
))
}

ii, apiErr := m.installedIntegrationsRepo.upsert(ctx, *ai)
_, apiErr = m.installedIntegrationsRepo.upsert(
ctx, integrationDetails,
)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "could not insert installed integration")
}

return &Integration{
IntegrationDetails: ii.IntegrationDetails,
IntegrationDetails: integrationDetails,
IsInstalled: true,
}, nil
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/query-service/app/integrations/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package integrations

import (
"context"
"fmt"
"os"
"slices"
"testing"

"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -147,20 +147,22 @@ func (t *TestAvailableIntegrations) list(
}

func (t *TestAvailableIntegrations) get(
ctx context.Context, id string,
) (*IntegrationDetails, *model.ApiError) {
ctx context.Context, ids []string,
) (map[string]IntegrationDetails, *model.ApiError) {
availableIntegrations, apiErr := t.list(ctx)
if apiErr != nil {
return nil, apiErr
}

result := map[string]IntegrationDetails{}

for _, ai := range availableIntegrations {
if ai.Id == id {
return &ai, nil
if slices.Contains(ids, ai.Id) {
result[ai.Id] = ai
}
}

return nil, model.NotFoundError(fmt.Errorf("no integration found with id %s", id))
return result, nil
}

func NewTestManager(t *testing.T) *Manager {
Expand Down
35 changes: 31 additions & 4 deletions pkg/query-service/app/integrations/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,51 @@ package integrations

import (
"context"
"database/sql/driver"
"encoding/json"
"time"

"github.com/pkg/errors"
"go.signoz.io/signoz/pkg/query-service/model"
)

type InstalledIntegrationConfig map[string]interface{}

// For serializing to and from db
func (c *InstalledIntegrationConfig) Scan(src interface{}) error {
if data, ok := src.([]byte); ok {
return json.Unmarshal(data, &c)
}
return nil
}

func (c *InstalledIntegrationConfig) Value() (driver.Value, error) {
filterSetJson, err := json.Marshal(c)
if err != nil {
return nil, errors.Wrap(err, "could not serialize integration config to JSON")
}
return filterSetJson, nil
}

// Actual integration details are expected to be fetched from relevant AvailableIntegrationsRepo.
type InstalledIntegration struct {
IntegrationDetails `db:"data"`
installedAt time.Time `db:"installed_at"`
IntegrationId string `db:"integration_id"`
Config InstalledIntegrationConfig `db:"config_json"`
InstalledAt time.Time `db:"installed_at"`
}

type InstalledIntegrationsRepo interface {
list(context.Context) ([]InstalledIntegration, *model.ApiError)
get(ctx context.Context, ids []string) (map[string]InstalledIntegration, *model.ApiError)
get(ctx context.Context, integrationIds []string) (map[string]InstalledIntegration, *model.ApiError)
upsert(context.Context, IntegrationDetails) (*InstalledIntegration, *model.ApiError)
delete(ctx context.Context, integrationId string) *model.ApiError
}

type AvailableIntegrationsRepo interface {
list(context.Context) ([]IntegrationDetails, *model.ApiError)
get(ctx context.Context, id string) (*IntegrationDetails, *model.ApiError)
get(ctx context.Context, integrationIds []string) (map[string]IntegrationDetails, *model.ApiError)

// AvailableIntegrationsRepo implementations are expected to cache details of installed integrations for quick retrieval.
// For v0 only bundled integrations are available, later versions are expected
// to add methods in this interface for caching details for installed integrations locally
}
33 changes: 19 additions & 14 deletions pkg/query-service/app/integrations/sqlite_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ func InitSqliteDBIfNeeded(db *sqlx.DB) error {

createTablesStatements := `
CREATE TABLE IF NOT EXISTS integrations_installed(
id TEXT PRIMARY KEY,
data TEXT,
integration_id TEXT PRIMARY KEY,
config_json TEXT,
installed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`
Expand Down Expand Up @@ -54,23 +54,27 @@ func (r *InstalledIntegrationsSqliteRepo) list(
}

func (r *InstalledIntegrationsSqliteRepo) get(
ctx context.Context, ids []string,
ctx context.Context, integrationIds []string,
) (map[string]InstalledIntegration, *model.ApiError) {
integrations := []InstalledIntegration{}

idPlaceholders := []string{}
idValues := []interface{}{}
for _, id := range ids {
for _, id := range integrationIds {
idPlaceholders = append(idPlaceholders, "?")
idValues = append(idValues, id)
}

err := r.db.GetContext(ctx, &integrations, fmt.Sprintf(`
select
data
installed_at
from integrations_installed
where id in (%s)`, strings.Join(idPlaceholders, ", ")),
err := r.db.SelectContext(
ctx, &integrations, fmt.Sprintf(`
select
integration_id,
config_json,
installed_at
from integrations_installed
where integration_id in (%s)`,
strings.Join(idPlaceholders, ", "),
),
idValues...,
)
if err != nil {
Expand All @@ -81,7 +85,7 @@ func (r *InstalledIntegrationsSqliteRepo) get(

result := map[string]InstalledIntegration{}
for _, ii := range integrations {
result[ii.Id] = ii
result[ii.IntegrationId] = ii
}

return result, nil
Expand All @@ -98,10 +102,11 @@ func (r *InstalledIntegrationsSqliteRepo) upsert(
_, dbErr := r.db.ExecContext(
ctx, `
INSERT INTO integrations_installed (
id,
data
integration_id,
config_json
) values ($1, $2)
on conflict(id) do update set data=excluded.data
on conflict(integration_id) do update
set config_json=excluded.config_json
`, integration.Id, string(integrationJson),
)
if dbErr != nil {
Expand Down