@@ -11,6 +11,7 @@ import (
1111 "os"
1212 "sync"
1313
14+ "github.com/adevinta/vulnerability-db/pkg/asyncapi/kafka"
1415 "github.com/adevinta/vulnerability-db/pkg/maintenance"
1516 "github.com/adevinta/vulnerability-db/pkg/notify"
1617 "github.com/adevinta/vulnerability-db/pkg/processor"
@@ -43,14 +44,9 @@ func main() {
4344 }
4445
4546 // Build notifier.
46- snsConf := notify.SNSConfig {
47- TopicArn : conf .SNS .TopicARN ,
48- Enabled : conf .SNS .Enabled ,
49- Endpoint : conf .SNS .Endpoint ,
50- }
51- snsNotifier , err := notify .NewSNSNotifier (snsConf , logger )
47+ notifier , err := buildNotifier (conf , logger )
5248 if err != nil {
53- log .Fatalf ("Error creating notifier: %v" , err )
49+ log .Fatalf ("Error building notifier: %v" , err )
5450 }
5551
5652 // Build processor.
@@ -59,7 +55,7 @@ func main() {
5955 log .Fatalf ("Error creating results client: %v" , err )
6056 }
6157
62- processor , err := processor .NewCheckProcessor (snsNotifier , db , resultsClient , conf .Report .URLReplace , conf .MaxEventAge , logger )
58+ processor , err := processor .NewCheckProcessor (notifier , db , resultsClient , conf .Report .URLReplace , conf .MaxEventAge , logger )
6359 if err != nil {
6460 log .Fatalf ("Error creating queue processor: %v" , err )
6561 }
@@ -97,6 +93,48 @@ func main() {
9793 wg .Wait ()
9894}
9995
96+ // buildNotifier builds the appropiate notifier given the defined configuration.
97+ // TODO: Once the integrations dependent on the old notification format have been
98+ // deprecated or updated to comply with the new format through Kafka topic channel
99+ // we can get rid of SNS and multi implementations of notifier and just use Kafka.
100+ func buildNotifier (conf * config , logger * log.Logger ) (notify.Notifier , error ) {
101+ if ! conf .SNS .Enabled && ! conf .Kafka .Enabled {
102+ return notify .NewNoopNotifier (), nil
103+ }
104+ if conf .SNS .Enabled && ! conf .Kafka .Enabled {
105+ return buildSNSNotifier (conf , logger )
106+ }
107+ if ! conf .SNS .Enabled && conf .Kafka .Enabled {
108+ return buildKafkaNotifier (conf , logger )
109+ }
110+ // Multi Notifier
111+ k , err := buildKafkaNotifier (conf , logger )
112+ if err != nil {
113+ return nil , err
114+ }
115+ s , err := buildSNSNotifier (conf , logger )
116+ if err != nil {
117+ return nil , err
118+ }
119+ return notify .NewMultiNotifier (k , s ), nil
120+ }
121+
122+ func buildSNSNotifier (conf * config , logger * log.Logger ) (* notify.SNSNotifier , error ) {
123+ return notify .NewSNSNotifier (notify.SNSConfig {
124+ TopicArn : conf .SNS .TopicARN ,
125+ Endpoint : conf .SNS .Endpoint ,
126+ }, logger )
127+ }
128+
129+ func buildKafkaNotifier (conf * config , logger * log.Logger ) (* notify.KafkaNotifier , error ) {
130+ kafkaCli , err := kafka .NewClient (conf .Kafka .User , conf .Kafka .Pass ,
131+ conf .Kafka .BrokerURL , conf .Kafka .Topic )
132+ if err != nil {
133+ return nil , err
134+ }
135+ return notify .NewKafkaNotifier (& kafkaCli , logger ), nil
136+ }
137+
100138func setupLogger (cfg config ) * log.Logger {
101139 var logger = log .New ()
102140
0 commit comments