Skip to content

Commit

Permalink
Refactored httptonats example (#411)
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper authored Mar 20, 2020
1 parent b96966c commit 2e94687
Showing 1 changed file with 31 additions and 59 deletions.
90 changes: 31 additions & 59 deletions v2/cmd/samples/httptonats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package main

import (
"context"
"fmt"
"io"
"log"
"os"

"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/kelseyhightower/envconfig"

cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http"
cloudeventsnats "github.com/cloudevents/sdk-go/v2/protocol/nats"
"github.com/kelseyhightower/envconfig"
)

type envConfig struct {
Expand All @@ -30,70 +29,43 @@ func main() {
log.Printf("[ERROR] Failed to process env var: %s", err)
os.Exit(1)
}
os.Exit(_main(os.Args[1:], env))
}

type Receiver struct {
Client client.Client
}

type Example struct {
Sequence int `json:"id"`
Message string `json:"message"`
}

func (r *Receiver) Receive(event event.Event) error {
fmt.Printf("Got Event Context: %+v\n", event.Context)

data := &Example{}
if err := event.DataAs(data); err != nil {
fmt.Printf("Got Data Error: %s\n", err.Error())
}
fmt.Printf("Got Data: %+v\n", data)

fmt.Printf("forwarding...")

if err := r.Client.Send(context.Background(), event); err != nil {
fmt.Printf("forwarding failed: %s", err.Error())
}

fmt.Printf("----------------------------\n")
return nil
}

func _main(args []string, env envConfig) int {
ctx := context.Background()

np, err := cloudeventsnats.New(env.NATSServer, env.Subject)
natsProtocol, err := cloudeventsnats.New(env.NATSServer, env.Subject)
if err != nil {
log.Fatalf("failed to create nats protcol, %s", err.Error())
}
nc, err := client.New(np)
if err != nil {
log.Printf("failed to create client, %v", err)
return 1
}

r := &Receiver{Client: nc}

p, err := cloudeventshttp.New(cloudeventshttp.WithPort(env.Port))
httpProtocol, err := cloudeventshttp.New(cloudeventshttp.WithPort(env.Port))
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
log.Fatalf("failed to create http protocol: %s", err.Error())
}

c, err := client.New(p)
if err != nil {
log.Printf("failed to create client, %v", err)
return 1
}
err = c.StartReceiver(ctx, r.Receive)

if err != nil {
log.Printf("failed to StartHTTPReceiver, %v", err)
}
// Pipe all messages incoming to the httpProtocol to the natsProtocol
go func() {
for {
// Blocking call to wait for new messages from httpProtocol
message, err := httpProtocol.Receive(ctx)
if err != nil {
if err == io.EOF {
return // Context closed and/or receiver closed
}
log.Printf("Error while receiving a message: %s", err.Error())
}
// Send message directly to natsProtocol
err = natsProtocol.Send(ctx, message)
if err != nil {
log.Printf("Error while forwarding the message: %s", err.Error())
}
}
}()

// Start the HTTP Server invoking OpenInbound()
go func() {
if err := httpProtocol.OpenInbound(ctx); err != nil {
log.Printf("failed to StartHTTPReceiver, %v", err)
}
}()

log.Printf("listening on port %d\n", env.Port)
<-ctx.Done()

return 0
}

0 comments on commit 2e94687

Please sign in to comment.