-
Notifications
You must be signed in to change notification settings - Fork 21
/
main.go
118 lines (104 loc) · 2.56 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"log/slog"
"os"
"os/signal"
"time"
"github.com/kalbhor/tasqueue/v2"
rb "github.com/kalbhor/tasqueue/v2/brokers/in-memory"
"github.com/kalbhor/tasqueue/v2/examples/tasks"
rr "github.com/kalbhor/tasqueue/v2/results/in-memory"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)
// newExporter returns a console exporter.
func newExporter(w io.Writer) (trace.SpanExporter, error) {
return stdouttrace.New(
stdouttrace.WithWriter(w),
// Use human-readable output.
stdouttrace.WithPrettyPrint(),
// Do not print timestamps for the demo.
stdouttrace.WithoutTimestamps(),
)
}
// newResource returns a resource describing this application.
func newResource() *resource.Resource {
r, _ := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String("tasqueue"),
semconv.ServiceVersionKey.String("v0.1.0"),
attribute.String("environment", "demo"),
),
)
return r
}
func main() {
l := log.New(os.Stdout, "", 0)
// Write telemetry data to a file.
f, err := os.Create("traces.txt")
if err != nil {
l.Fatal(err)
}
defer f.Close()
exp, err := newExporter(f)
if err != nil {
l.Fatal(err)
}
tp := trace.NewTracerProvider(
trace.WithBatcher(exp),
trace.WithResource(newResource()),
)
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
l.Fatal(err)
}
}()
otel.SetTracerProvider(tp)
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
srv, err := tasqueue.NewServer(tasqueue.ServerOpts{
Broker: rb.New(),
Results: rr.New(),
Logger: slog.Default().Handler(),
TraceProvider: tp,
})
if err != nil {
log.Fatal(err)
}
err = srv.RegisterTask("add", tasks.SumProcessor, tasqueue.TaskOpts{})
if err != nil {
log.Fatal(err)
}
var chain []tasqueue.Job
for i := 0; i < 3; i++ {
b, _ := json.Marshal(tasks.SumPayload{Arg1: i, Arg2: 4})
task, err := tasqueue.NewJob("add", b, tasqueue.JobOpts{})
if err != nil {
log.Fatal(err)
}
chain = append(chain, task)
}
t, _ := tasqueue.NewGroup(chain, tasqueue.GroupOpts{})
x, _ := srv.EnqueueGroup(ctx, t)
go func() {
for {
select {
case <-time.Tick(time.Second * 1):
fmt.Println(srv.GetGroup(ctx, x))
}
}
}()
srv.Start(ctx)
// Create a task payload.
fmt.Println("exit..")
}