diff --git a/v2/cmd/samples/httptonats/main.go b/v2/cmd/samples/httptonats/main.go index a9525082f..e11e29ab6 100644 --- a/v2/cmd/samples/httptonats/main.go +++ b/v2/cmd/samples/httptonats/main.go @@ -2,15 +2,14 @@ package main import ( "context" - "fmt" + "io" "log" "os" - "github.com/cloudevents/sdk-go/v2/client" - "github.com/cloudevents/sdk-go/v2/event" + "github.com/kelseyhightower/envconfig" + cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" cloudeventsnats "github.com/cloudevents/sdk-go/v2/protocol/nats" - "github.com/kelseyhightower/envconfig" ) type envConfig struct { @@ -30,70 +29,43 @@ func main() { log.Printf("[ERROR] Failed to process env var: %s", err) os.Exit(1) } - os.Exit(_main(os.Args[1:], env)) -} - -type Receiver struct { - Client client.Client -} - -type Example struct { - Sequence int `json:"id"` - Message string `json:"message"` -} - -func (r *Receiver) Receive(event event.Event) error { - fmt.Printf("Got Event Context: %+v\n", event.Context) - - data := &Example{} - if err := event.DataAs(data); err != nil { - fmt.Printf("Got Data Error: %s\n", err.Error()) - } - fmt.Printf("Got Data: %+v\n", data) - - fmt.Printf("forwarding...") - - if err := r.Client.Send(context.Background(), event); err != nil { - fmt.Printf("forwarding failed: %s", err.Error()) - } - - fmt.Printf("----------------------------\n") - return nil -} - -func _main(args []string, env envConfig) int { ctx := context.Background() - np, err := cloudeventsnats.New(env.NATSServer, env.Subject) + natsProtocol, err := cloudeventsnats.New(env.NATSServer, env.Subject) if err != nil { log.Fatalf("failed to create nats protcol, %s", err.Error()) } - nc, err := client.New(np) - if err != nil { - log.Printf("failed to create client, %v", err) - return 1 - } - - r := &Receiver{Client: nc} - p, err := cloudeventshttp.New(cloudeventshttp.WithPort(env.Port)) + httpProtocol, err := cloudeventshttp.New(cloudeventshttp.WithPort(env.Port)) if err != nil { - log.Fatalf("failed to create protocol: %s", err.Error()) + log.Fatalf("failed to create http protocol: %s", err.Error()) } - c, err := client.New(p) - if err != nil { - log.Printf("failed to create client, %v", err) - return 1 - } - err = c.StartReceiver(ctx, r.Receive) - - if err != nil { - log.Printf("failed to StartHTTPReceiver, %v", err) - } + // Pipe all messages incoming to the httpProtocol to the natsProtocol + go func() { + for { + // Blocking call to wait for new messages from httpProtocol + message, err := httpProtocol.Receive(ctx) + if err != nil { + if err == io.EOF { + return // Context closed and/or receiver closed + } + log.Printf("Error while receiving a message: %s", err.Error()) + } + // Send message directly to natsProtocol + err = natsProtocol.Send(ctx, message) + if err != nil { + log.Printf("Error while forwarding the message: %s", err.Error()) + } + } + }() + + // Start the HTTP Server invoking OpenInbound() + go func() { + if err := httpProtocol.OpenInbound(ctx); err != nil { + log.Printf("failed to StartHTTPReceiver, %v", err) + } + }() - log.Printf("listening on port %d\n", env.Port) <-ctx.Done() - - return 0 }