Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: TKC-2302 added new networking between multiple agents and CP #5683

Draft
wants to merge 41 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
111a46f
feat: added runner id to the agent runner
exu Jul 24, 2024
a78efda
feat: set runnerId as cluster id if set
exu Jul 24, 2024
15bf822
fix: use var
exu Jul 24, 2024
d2ab0cb
fix: added runner id
exu Jul 24, 2024
cea406c
fix: added metadata log
exu Jul 24, 2024
4638eec
fix: debug metadata 2
exu Jul 24, 2024
1ad914b
fix: debug metadata 2
exu Jul 24, 2024
8221a16
fix: debug metadata 3
exu Jul 24, 2024
7ca08b6
fix: debug metadata 4
exu Jul 24, 2024
d4e2b2f
feat added runner id to test workflows result collection
exu Jul 25, 2024
9fda13e
feat added runner id to test workflows result collection
exu Jul 25, 2024
f0bc10a
fix: moved to execution
exu Jul 25, 2024
3275233
fix: added runner id to pro context
exu Jul 25, 2024
a18536f
fix: typo
exu Jul 25, 2024
224a3d0
fix: pass context correctly
exu Jul 25, 2024
4bbc94b
fix: added metadata to wf executions
exu Jul 25, 2024
751d357
chore: debug context
exu Jul 26, 2024
60857d6
fix: debug
exu Jul 26, 2024
09678a1
fix: pass runner id to the execution object
exu Jul 26, 2024
916f8bd
feat: added runner id to execution summary
exu Jul 26, 2024
d30c9c2
feat: added runner and tags to running contextg
exu Jul 31, 2024
d3f10e0
fix: pass runner id to the cloud client everywhere
exu Jul 31, 2024
d972f58
fix: not override context metadata
exu Jul 31, 2024
087f93d
fix: agent integration tests
exu Jul 31, 2024
7ab74bf
fix: added default runnerId to the agent
exu Jul 31, 2024
978cf29
feat: added env org details command
exu Aug 5, 2024
42a488f
feat: added env org details
exu Aug 5, 2024
48a3f46
chore: refactor
exu Aug 5, 2024
13511b9
chore: rename command
exu Aug 6, 2024
82b1c4c
feat: added executor v2
exu Aug 6, 2024
b1111dd
feat: undeprecate executor
exu Aug 6, 2024
b77d161
feat: added runner init command
exu Aug 6, 2024
e2e0849
fix: added running context to execution details
exu Aug 7, 2024
8a05b62
fix: golang ci fixes
exu Aug 8, 2024
640adaf
fix: added log when storing data
exu Aug 8, 2024
e4d0708
fix: added request logging
exu Aug 8, 2024
84cbc6c
feat: filters for tags
exu Aug 9, 2024
b95db95
feat: added setters
exu Aug 9, 2024
e582a1c
chore: rename tags in filter
exu Aug 9, 2024
47bf9b0
feat: filter through tags
exu Aug 9, 2024
5b90d64
feat: pass runningcontext to executionsummaryu
exu Aug 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions api/v1/testkube.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7049,6 +7049,16 @@ components:
context:
type: string
description: Context value depending from its type
runnerIds:
type: array
items:
type: string
tags:
type: object
additionalProperties:
type: string



Webhook:
description: CRD based webhook data
Expand Down Expand Up @@ -7889,6 +7899,8 @@ components:
type: boolean
description: whether webhooks on the execution of this test workflow are disabled
default: false
runningContext:
$ref: "#/components/schemas/RunningContext"

TestWorkflowWithExecution:
type: object
Expand Down Expand Up @@ -7980,6 +7992,11 @@ components:
example:
- true
- false
runnerId:
type: string
description: Runner id that executed the test workflow
runningContext:
$ref: "#/components/schemas/RunningContext"
required:
- id
- name
Expand All @@ -7993,6 +8010,10 @@ components:
description: unique execution identifier
format: bson objectId
example: "62f395e004109209b50edfc1"
runnerID:
type: string
description: runner identifier
example: "some-unique-id-per-env-1"
name:
type: string
description: execution name
Expand All @@ -8012,6 +8033,8 @@ components:
$ref: "#/components/schemas/TestWorkflowResultSummary"
workflow:
$ref: "#/components/schemas/TestWorkflowSummary"
runningContext:
$ref: "#/components/schemas/RunningContext"
required:
- id
- name
Expand Down
55 changes: 39 additions & 16 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
testworkflow2 "github.com/kubeshop/testkube/pkg/repository/testworkflow"
"github.com/kubeshop/testkube/pkg/secretmanager"
"github.com/kubeshop/testkube/pkg/tcl/checktcl"
"github.com/kubeshop/testkube/pkg/tcl/controlplanetcl"
"github.com/kubeshop/testkube/pkg/tcl/schedulertcl"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/presets"

Expand Down Expand Up @@ -147,13 +148,18 @@ func main() {
cfg.CleanLegacyVars()
exitOnError("error getting application config", err)

md := metadata.Pairs("api-key", cfg.TestkubeProAPIKey, "runner-id", cfg.TestkubeProRunnerId)
ctx := metadata.NewOutgoingContext(context.Background(), md)

features, err := featureflags.Get()
exitOnError("error getting application feature flags", err)

log.DefaultLogger.Infow("Feature flags configured", "ff", features)
logger := log.DefaultLogger.With("apiVersion", version.Version)

logger.Infow("Feature flags configured", "ff", features)

// Run services within an errgroup to propagate errors between services.
g, ctx := errgroup.WithContext(context.Background())
g, ctx := errgroup.WithContext(ctx)

// Cancel the errgroup context on SIGINT and SIGTERM,
// which shuts everything down gracefully.
Expand Down Expand Up @@ -271,18 +277,18 @@ func main() {
var artifactStorage domainstorage.ArtifactsStorage
var storageClient domainstorage.Client
if mode == common.ModeAgent {
resultsRepository = cloudresult.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
testResultsRepository = cloudtestresult.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
configRepository = cloudconfig.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
resultsRepository = cloudresult.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
testResultsRepository = cloudtestresult.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
configRepository = cloudconfig.NewCloudResultRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
// Pro edition only (tcl protected code)
testWorkflowResultsRepository = cloudtestworkflow.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
testWorkflowResultsRepository = cloudtestworkflow.NewCloudRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
var opts []cloudtestworkflow.Option
if cfg.StorageSkipVerify {
opts = append(opts, cloudtestworkflow.WithSkipVerify())
}
testWorkflowOutputRepository = cloudtestworkflow.NewCloudOutputRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, opts...)
testWorkflowOutputRepository = cloudtestworkflow.NewCloudOutputRepository(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId, opts...)
triggerLeaseBackend = triggers.NewAcquireAlwaysLeaseBackend()
artifactStorage = cloudartifacts.NewCloudArtifactsStorage(grpcClient, grpcConn, cfg.TestkubeProAPIKey)
artifactStorage = cloudartifacts.NewCloudArtifactsStorage(grpcClient, grpcConn, cfg.TestkubeProAPIKey, cfg.TestkubeProRunnerId)
} else {
mongoSSLConfig := getMongoSSLConfig(cfg, secretClient)
db, err := storage.GetMongoDatabase(cfg.APIMongoDSN, cfg.APIMongoDB, cfg.APIMongoDBType, cfg.APIMongoAllowTLS, mongoSSLConfig)
Expand Down Expand Up @@ -431,13 +437,24 @@ func main() {
exitOnError("Creating job templates", err)
}

proContext := newProContext(cfg, grpcClient)
proContext := newProContext(ctx, cfg, grpcClient)
proContext.ClusterId = clusterId

// Check Pro/Enterprise subscription
var subscriptionChecker checktcl.SubscriptionChecker
if mode == common.ModeAgent {
subscriptionChecker, err = checktcl.NewSubscriptionChecker(ctx, proContext, grpcClient, grpcConn)
exitOnError("Failed creating subscription checker", err)

// Load environment/org details based on token grpc call
environment, err := controlplanetcl.GetEnvironment(ctx, proContext, grpcClient, grpcConn)
warnOnError("Getting environment details from control plane", err)
proContext.EnvID = environment.Id
proContext.EnvName = environment.Name
proContext.EnvSlug = environment.Slug
proContext.OrgID = environment.OrganizationId
proContext.OrgName = environment.OrganizationName
proContext.OrgSlug = environment.OrganizationSlug
}

serviceAccountNames := map[string]string{
Expand Down Expand Up @@ -585,6 +602,7 @@ func main() {
cfg.ImageDataPersistentCacheKey,
cfg.TestkubeDashboardURI,
clusterId,
proContext.RunnerId,
)

go testWorkflowExecutor.Recover(context.Background())
Expand Down Expand Up @@ -893,9 +911,10 @@ func newGRPCTransportCredentials(cfg *config.Config) (credentials.TransportCrede
})
}

func newProContext(cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext {
func newProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext {
proContext := config.ProContext{
APIKey: cfg.TestkubeProAPIKey,
RunnerId: cfg.TestkubeProRunnerId,
URL: cfg.TestkubeProURL,
TLSInsecure: cfg.TestkubeProTLSInsecure,
WorkerCount: cfg.TestkubeProWorkerCount,
Expand All @@ -913,22 +932,20 @@ func newProContext(cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient)
return proContext
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
md := metadata.Pairs("api-key", cfg.TestkubeProAPIKey)
ctx = metadata.NewOutgoingContext(ctx, md)
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
getProContext, err := grpcClient.GetProContext(ctx, &emptypb.Empty{})
proContextResponse, err := grpcClient.GetProContext(ctx, &emptypb.Empty{})
if err != nil {
log.DefaultLogger.Warnf("cannot fetch pro-context from cloud: %s", err)
return proContext
}

if proContext.EnvID == "" {
proContext.EnvID = getProContext.EnvId
proContext.EnvID = proContextResponse.EnvId
}

if proContext.OrgID == "" {
proContext.OrgID = getProContext.OrgId
proContext.OrgID = proContextResponse.OrgId
}

return proContext
Expand All @@ -940,3 +957,9 @@ func exitOnError(title string, err error) {
os.Exit(1)
}
}

func warnOnError(title string, err error) {
if err != nil {
log.DefaultLogger.Errorw(title, "error", err)
}
}
1 change: 1 addition & 0 deletions cmd/kubectl-testkube/commands/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func PopulateMasterFlags(cmd *cobra.Command, opts *HelmOptions) {
cmd.Flags().StringVar(&opts.Master.AgentToken, "agent-token", "", "Testkube Pro agent key [required for centralized mode]")
cmd.Flags().StringVar(&opts.Master.OrgId, "org-id", "", "Testkube Pro organization id [required for centralized mode]")
cmd.Flags().StringVar(&opts.Master.EnvId, "env-id", "", "Testkube Pro environment id [required for centralized mode]")
cmd.Flags().StringVar(&opts.Master.RunnerId, "runner-id", "", "Testkube Pro Multi Runner id [required for centralized mode]")

cmd.Flags().BoolVar(&opts.Master.Features.LogsV2, "feature-logs-v2", false, "Logs v2 feature flag")
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/kubectl-testkube/commands/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ func prepareTestkubeProHelmArgs(options HelmOptions, isMigration bool) []string
args = append(args, "--set", fmt.Sprintf("testkube-logs.pro.envId=%s", options.Master.EnvId))
}

if options.Master.RunnerId != "" {
args = append(args, "--set", fmt.Sprintf("testkube-api.cloud.runnerId=%s", options.Master.RunnerId))
}

if options.Master.OrgId != "" {
args = append(args, "--set", fmt.Sprintf("testkube-api.cloud.orgId=%s", options.Master.OrgId))
args = append(args, "--set", fmt.Sprintf("testkube-logs.pro.orgId=%s", options.Master.OrgId))
Expand Down
1 change: 1 addition & 0 deletions cmd/kubectl-testkube/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func init() {
RootCmd.AddCommand(NewGenerateCmd())

RootCmd.AddCommand(NewInitCmd())
RootCmd.AddCommand(NewRunnerCmd())
RootCmd.AddCommand(NewUpgradeCmd())
RootCmd.AddCommand(NewPurgeCmd())
RootCmd.AddCommand(NewWatchCmd())
Expand Down
22 changes: 22 additions & 0 deletions cmd/kubectl-testkube/commands/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package commands

import (
"github.com/spf13/cobra"

"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/pro"
)

func NewRunnerCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "runner <command>",
Aliases: []string{""},
Short: "Testkube Runner related commands",
Run: func(cmd *cobra.Command, args []string) {
cmd.Help()
},
}

cmd.AddCommand(pro.NewInitCmd())

return cmd
}
1 change: 1 addition & 0 deletions cmd/kubectl-testkube/config/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type Master struct {
IdToken string `json:"idToken,omitempty"`
OrgId string `json:"orgId,omitempty"`
EnvId string `json:"envId,omitempty"`
RunnerId string `json:"runnerId,omitempty"`
Insecure bool `json:"insecure,omitempty"`
UiUrlPrefix string `json:"uiUrlPrefix,omitempty"`
AgentUrlPrefix string `json:"agentUrlPrefix,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion cmd/testworkflow-toolkit/commands/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func NewArtifactsCmd() *cobra.Command {
if env.CloudEnabled() {
ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second)
defer cancel()
ctx = agent.AddAPIKeyMeta(ctx, env.Config().Cloud.ApiKey)
ctx = agent.AddContextMetadata(ctx, env.Config().Cloud.ApiKey, "")
executor, client := env.Cloud(ctx)
proContext, err := client.GetProContext(ctx, &emptypb.Empty{})
var supported []*cloud.Capability
Expand Down
2 changes: 1 addition & 1 deletion cmd/testworkflow-toolkit/env/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,5 @@ func Cloud(ctx context.Context) (cloudexecutor.Executor, cloud.TestKubeCloudAPIC
ui.Fail(fmt.Errorf("failed to connect with Cloud: %w", err))
}
grpcClient := cloud.NewTestKubeCloudAPIClient(grpcConn)
return cloudexecutor.NewCloudGRPCExecutor(grpcClient, grpcConn, cfg.ApiKey), grpcClient
return cloudexecutor.NewCloudGRPCExecutor(grpcClient, grpcConn, cfg.ApiKey, cfg.RunnerId), grpcClient
}
1 change: 1 addition & 0 deletions cmd/testworkflow-toolkit/env/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type envCloudConfig struct {
UiUrl string `envconfig:"TK_C_UI_URL"`
OrgId string `envconfig:"TK_C_ORG_ID"`
EnvId string `envconfig:"TK_C_ENV_ID"`
RunnerId string `envconfig:"TK_C_RUNNER_ID" default:""`
SkipVerify bool `envconfig:"TK_C_SKIP_VERIFY" default:"false"`
TlsInsecure bool `envconfig:"TK_C_TLS_INSECURE" default:"false"`
}
Expand Down
7 changes: 6 additions & 1 deletion internal/app/api/v1/testworkflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (

testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1"
"github.com/kubeshop/testkube/internal/common"
"github.com/kubeshop/testkube/pkg/agent"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/log"
"github.com/kubeshop/testkube/pkg/mapper/testworkflows"
"github.com/kubeshop/testkube/pkg/scheduler"
"github.com/kubeshop/testkube/pkg/testworkflows/testworkflowresolver"
Expand Down Expand Up @@ -345,7 +347,8 @@ func (s *TestkubeAPI) PreviewTestWorkflowHandler() fiber.Handler {
// TODO: Add metrics
func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler {
return func(c *fiber.Ctx) (err error) {
ctx := c.Context()
// pass metadata to context
ctx := agent.Context(c.Context(), *s.proContext)
name := c.Params("id")
errPrefix := fmt.Sprintf("failed to execute test workflow '%s'", name)
workflow, err := s.TestWorkflowsClient.Get(name)
Expand All @@ -360,6 +363,8 @@ func (s *TestkubeAPI) ExecuteTestWorkflowHandler() fiber.Handler {
return s.BadRequest(c, errPrefix, "invalid body", err)
}

log.DefaultLogger.Infow("TestWorkflow execution request", "name", name, "request", request)

var results []testkube.TestWorkflowExecution
var errs []error

Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Config struct {
TestkubeOAuthProvider string `envconfig:"TESTKUBE_OAUTH_PROVIDER" default:""`
TestkubeOAuthScopes string `envconfig:"TESTKUBE_OAUTH_SCOPES" default:""`
TestkubeProAPIKey string `envconfig:"TESTKUBE_PRO_API_KEY" default:""`
TestkubeProRunnerId string `envconfig:"TESTKUBE_PRO_RUNNER_ID" default:"default-runner"`
TestkubeProURL string `envconfig:"TESTKUBE_PRO_URL" default:""`
TestkubeProTLSInsecure bool `envconfig:"TESTKUBE_PRO_TLS_INSECURE" default:"false"`
TestkubeProWorkerCount int `envconfig:"TESTKUBE_PRO_WORKER_COUNT" default:"50"`
Expand Down
6 changes: 6 additions & 0 deletions internal/config/procontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ type ProContext struct {
WorkflowNotificationsWorkerCount int
SkipVerify bool
EnvID string
EnvName string
EnvSlug string
OrgID string
OrgName string
OrgSlug string
Migrate string
ConnectionTimeout int
DashboardURI string
ClusterId string
RunnerId string
}
Loading
Loading