Skip to content

Commit

Permalink
Feat/endoflife collector (#2215)
Browse files Browse the repository at this point in the history
* Add EOL Certifier

Signed-off-by: robert-cronin <robert.owen.cronin@gmail.com>

* Add EOL Scanner

Signed-off-by: robert-cronin <robert.owen.cronin@gmail.com>

---------

Signed-off-by: robert-cronin <robert.owen.cronin@gmail.com>
  • Loading branch information
robert-cronin authored Nov 4, 2024
1 parent a5fe089 commit 9bbadbc
Show file tree
Hide file tree
Showing 38 changed files with 2,425 additions and 39 deletions.
16 changes: 14 additions & 2 deletions cmd/guacingest/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type options struct {
headerFile string
queryVulnOnIngestion bool
queryLicenseOnIngestion bool
queryEOLOnIngestion bool
}

func ingest(cmd *cobra.Command, args []string) {
Expand All @@ -60,6 +61,7 @@ func ingest(cmd *cobra.Command, args []string) {
viper.GetBool("csub-tls-skip-verify"),
viper.GetBool("add-vuln-on-ingest"),
viper.GetBool("add-license-on-ingest"),
viper.GetBool("add-eol-on-ingest"),
args)
if err != nil {
fmt.Printf("unable to validate flags: %v\n", err)
Expand Down Expand Up @@ -99,7 +101,16 @@ func ingest(cmd *cobra.Command, args []string) {
defer csubClient.Close()

emit := func(d *processor.Document) error {
if _, err := ingestor.Ingest(ctx, d, opts.graphqlEndpoint, transport, csubClient, opts.queryVulnOnIngestion, opts.queryLicenseOnIngestion); err != nil {
if _, err := ingestor.Ingest(
ctx,
d,
opts.graphqlEndpoint,
transport,
csubClient,
opts.queryVulnOnIngestion,
opts.queryLicenseOnIngestion,
opts.queryEOLOnIngestion,
); err != nil {
var urlErr *url.Error
if errors.As(err, &urlErr) {
return fmt.Errorf("unable to ingest document due to connection error with graphQL %q : %w", d.SourceInformation.Source, urlErr)
Expand Down Expand Up @@ -130,7 +141,7 @@ func ingest(cmd *cobra.Command, args []string) {
}

func validateFlags(pubsubAddr, blobAddr, csubAddr, graphqlEndpoint, headerFile string, csubTls, csubTlsSkipVerify bool,
queryVulnIngestion bool, queryLicenseIngestion bool, args []string) (options, error) {
queryVulnIngestion bool, queryLicenseIngestion bool, queryEOLIngestion bool, args []string) (options, error) {
var opts options
opts.pubsubAddr = pubsubAddr
opts.blobAddr = blobAddr
Expand All @@ -143,6 +154,7 @@ func validateFlags(pubsubAddr, blobAddr, csubAddr, graphqlEndpoint, headerFile s
opts.headerFile = headerFile
opts.queryVulnOnIngestion = queryVulnIngestion
opts.queryLicenseOnIngestion = queryLicenseIngestion
opts.queryEOLOnIngestion = queryEOLIngestion

return opts, nil
}
2 changes: 1 addition & 1 deletion cmd/guacingest/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func init() {
cobra.OnInitialize(cli.InitConfig)

set, err := cli.BuildFlags([]string{"pubsub-addr", "blob-addr", "csub-addr", "gql-addr",
"header-file", "add-vuln-on-ingest", "add-license-on-ingest"})
"header-file", "add-vuln-on-ingest", "add-license-on-ingest", "add-eol-on-ingest"})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to setup flag: %v", err)
os.Exit(1)
Expand Down
13 changes: 12 additions & 1 deletion cmd/guacone/cmd/deps_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type depsDevOptions struct {
headerFile string
queryVulnOnIngestion bool
queryLicenseOnIngestion bool
queryEOLOnIngestion bool
// sets artificial latency on the deps.dev collector (default to nil)
addedLatency *time.Duration
}
Expand Down Expand Up @@ -87,7 +88,16 @@ var depsDevCmd = &cobra.Command{
emit := func(d *processor.Document) error {
totalNum += 1

if _, err := ingestor.Ingest(ctx, d, opts.graphqlEndpoint, transport, csc, opts.queryVulnOnIngestion, opts.queryLicenseOnIngestion); err != nil {
if _, err := ingestor.Ingest(
ctx,
d,
opts.graphqlEndpoint,
transport,
csc,
opts.queryVulnOnIngestion,
opts.queryLicenseOnIngestion,
opts.queryEOLOnIngestion,
); err != nil {
gotErr = true
return fmt.Errorf("unable to ingest document: %w", err)
}
Expand Down Expand Up @@ -145,6 +155,7 @@ func validateDepsDevFlags(args []string) (*depsDevOptions, client.Client, error)
headerFile: viper.GetString("header-file"),
queryVulnOnIngestion: viper.GetBool("add-vuln-on-ingest"),
queryLicenseOnIngestion: viper.GetBool("add-license-on-ingest"),
queryEOLOnIngestion: viper.GetBool("add-eol-on-ingest"),
}

addedLatencyStr := viper.GetString("deps-dev-latency")
Expand Down
281 changes: 281 additions & 0 deletions cmd/guacone/cmd/eol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
//
// Copyright 2024 The GUAC Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/Khan/genqlient/graphql"
"github.com/guacsec/guac/pkg/assembler/clients/generated"
"github.com/guacsec/guac/pkg/certifier/certify"
"github.com/guacsec/guac/pkg/certifier/components/root_package"
"github.com/guacsec/guac/pkg/certifier/eol"
"github.com/guacsec/guac/pkg/cli"
csub_client "github.com/guacsec/guac/pkg/collectsub/client"
"github.com/guacsec/guac/pkg/handler/processor"
"github.com/guacsec/guac/pkg/ingestor"
"github.com/guacsec/guac/pkg/logging"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

const (
eolQuerySize = 1000
)

type eolOptions struct {
graphqlEndpoint string
headerFile string
poll bool
csubClientOptions csub_client.CsubClientOptions
interval time.Duration
addedLatency *time.Duration
batchSize int
lastScan *int
}

var eolCmd = &cobra.Command{
Use: "eol [flags]",
Short: "runs the End of Life (EOL) certifier",
Run: func(cmd *cobra.Command, args []string) {
opts, err := validateEOLFlags(
viper.GetString("gql-addr"),
viper.GetString("header-file"),
viper.GetString("interval"),
viper.GetString("csub-addr"),
viper.GetBool("poll"),
viper.GetBool("csub-tls"),
viper.GetBool("csub-tls-skip-verify"),
viper.GetString("certifier-latency"),
viper.GetInt("certifier-batch-size"),
viper.GetInt("last-scan"),
)
if err != nil {
fmt.Printf("unable to validate flags: %v\n", err)
_ = cmd.Help()
os.Exit(1)
}

ctx := logging.WithLogger(context.Background())
logger := logging.FromContext(ctx)
transport := cli.HTTPHeaderTransport(ctx, opts.headerFile, http.DefaultTransport)

if err := certify.RegisterCertifier(eol.NewEOLCertifier, eol.EOLCollector); err != nil {
logger.Fatalf("unable to register certifier: %v", err)
}

// initialize collectsub client
csubClient, err := csub_client.NewClient(opts.csubClientOptions)
if err != nil {
logger.Infof("collectsub client initialization failed, this ingestion will not pull in any additional data through the collectsub service: %v", err)
csubClient = nil
} else {
defer csubClient.Close()
}

httpClient := http.Client{Transport: transport}
gqlclient := graphql.NewClient(opts.graphqlEndpoint, &httpClient)
packageQuery := root_package.NewPackageQuery(gqlclient, generated.QueryTypeEol, opts.batchSize, eolQuerySize, opts.addedLatency, opts.lastScan)

totalNum := 0
docChan := make(chan *processor.Document)
ingestionStop := make(chan bool, 1)
tickInterval := 30 * time.Second
ticker := time.NewTicker(tickInterval)

var gotErr int32
var wg sync.WaitGroup
ingestion := func() {
defer wg.Done()
var totalDocs []*processor.Document
const threshold = 1000
stop := false
for !stop {
select {
case <-ticker.C:
if len(totalDocs) > 0 {
err = ingestor.MergedIngest(ctx, totalDocs, opts.graphqlEndpoint, transport, csubClient, false, false, false)
if err != nil {
stop = true
atomic.StoreInt32(&gotErr, 1)
logger.Errorf("unable to ingest documents: %v", err)
}
totalDocs = []*processor.Document{}
}
ticker.Reset(tickInterval)
case d := <-docChan:
totalNum += 1
totalDocs = append(totalDocs, d)
if len(totalDocs) >= threshold {
err = ingestor.MergedIngest(ctx, totalDocs, opts.graphqlEndpoint, transport, csubClient, false, false, false)
if err != nil {
stop = true
atomic.StoreInt32(&gotErr, 1)
logger.Errorf("unable to ingest documents: %v", err)
}
totalDocs = []*processor.Document{}
ticker.Reset(tickInterval)
}
case <-ingestionStop:
stop = true
case <-ctx.Done():
return
}
}
for len(docChan) > 0 {
totalNum += 1
totalDocs = append(totalDocs, <-docChan)
if len(totalDocs) >= threshold {
err = ingestor.MergedIngest(ctx, totalDocs, opts.graphqlEndpoint, transport, csubClient, false, false, false)
if err != nil {
atomic.StoreInt32(&gotErr, 1)
logger.Errorf("unable to ingest documents: %v", err)
}
totalDocs = []*processor.Document{}
}
}
if len(totalDocs) > 0 {
err = ingestor.MergedIngest(ctx, totalDocs, opts.graphqlEndpoint, transport, csubClient, false, false, false)
if err != nil {
atomic.StoreInt32(&gotErr, 1)
logger.Errorf("unable to ingest documents: %v", err)
}
}
}
wg.Add(1)
go ingestion()

// Set emit function to go through the entire pipeline
emit := func(d *processor.Document) error {
docChan <- d
return nil
}

// Collect
errHandler := func(err error) bool {
if err != nil {
logger.Errorf("certifier ended with error: %v", err)
atomic.StoreInt32(&gotErr, 1)
}
// process documents already captures
return true
}

ctx, cf := context.WithCancel(ctx)
done := make(chan bool, 1)
wg.Add(1)
go func() {
defer wg.Done()
if err := certify.Certify(ctx, packageQuery, emit, errHandler, opts.poll, opts.interval); err != nil {
logger.Errorf("Unhandled error in the certifier: %s", err)
}
done <- true
}()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
select {
case s := <-sigs:
logger.Infof("Signal received: %s, shutting down gracefully\n", s.String())
cf()
case <-done:
logger.Infof("All certifiers completed")
}
ingestionStop <- true
wg.Wait()
cf()

if atomic.LoadInt32(&gotErr) == 1 {
logger.Errorf("completed ingestion with errors")
} else {
logger.Infof("completed ingesting %v documents", totalNum)
}
},
}

func validateEOLFlags(
graphqlEndpoint,
headerFile,
interval,
csubAddr string,
poll,
csubTls,
csubTlsSkipVerify bool,
certifierLatencyStr string,
batchSize int, lastScan int,
) (eolOptions, error) {
var opts eolOptions
opts.graphqlEndpoint = graphqlEndpoint
opts.headerFile = headerFile
opts.poll = poll

if interval == "" {
// 14 days by default
opts.interval = 14 * 24 * time.Hour
} else {
i, err := time.ParseDuration(interval)
if err != nil {
return opts, err
}
opts.interval = i
}

if certifierLatencyStr != "" {
addedLatency, err := time.ParseDuration(certifierLatencyStr)
if err != nil {
return opts, fmt.Errorf("failed to parse duration with error: %w", err)
}
opts.addedLatency = &addedLatency
} else {
opts.addedLatency = nil
}

opts.batchSize = batchSize

if lastScan != 0 {
opts.lastScan = &lastScan
}

csubOpts, err := csub_client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
if err != nil {
return opts, fmt.Errorf("unable to validate csub client flags: %w", err)
}
opts.csubClientOptions = csubOpts

return opts, nil
}

func init() {
set, err := cli.BuildFlags([]string{"certifier-latency",
"certifier-batch-size", "last-scan"})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to setup flag: %v", err)
os.Exit(1)
}
eolCmd.PersistentFlags().AddFlagSet(set)
if err := viper.BindPFlags(eolCmd.PersistentFlags()); err != nil {
fmt.Fprintf(os.Stderr, "failed to bind flags: %v", err)
os.Exit(1)
}
certifierCmd.AddCommand(eolCmd)
}
Loading

0 comments on commit 9bbadbc

Please sign in to comment.