Skip to content

Commit 9633942

Browse files
committed
expose jinja converter
1 parent cf712e9 commit 9633942

File tree

5 files changed

+109
-41
lines changed

5 files changed

+109
-41
lines changed

event_buffer.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package golangfuse
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/sirupsen/logrus"
9+
)
10+
11+
type EventsFlushFunc func(ctx context.Context, events []IngestionEvent) error
12+
13+
type eventBuffer struct {
14+
bufferedEvents []IngestionEvent
15+
mu sync.Mutex
16+
flushHandler EventsFlushFunc
17+
}
18+
19+
func newEventBufferer(flushHandler EventsFlushFunc) *eventBuffer {
20+
return &eventBuffer{flushHandler: flushHandler}
21+
}
22+
23+
func (i *eventBuffer) Start(ctx context.Context, period time.Duration) {
24+
ticker := time.NewTicker(period)
25+
defer ticker.Stop()
26+
for {
27+
select {
28+
case <-ticker.C:
29+
i.flush(ctx)
30+
case <-ctx.Done():
31+
return
32+
}
33+
}
34+
}
35+
36+
func (i *eventBuffer) Add(event IngestionEvent) {
37+
i.mu.Lock()
38+
defer i.mu.Unlock()
39+
i.bufferedEvents = append(i.bufferedEvents, event)
40+
}
41+
42+
func (i *eventBuffer) flush(ctx context.Context) {
43+
i.mu.Lock()
44+
items := i.bufferedEvents
45+
i.bufferedEvents = nil
46+
i.mu.Unlock()
47+
if len(items) > 0 {
48+
err := i.flushHandler(ctx, items)
49+
if err != nil {
50+
logrus.WithError(err).Error("golangfuse: error in event flush handler")
51+
} else {
52+
logrus.Tracef("golangfuse: flushed %d events", len(items))
53+
}
54+
}
55+
}

langfuse.go

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,23 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7-
"regexp"
87
"sync/atomic"
98
"time"
109

1110
"github.com/divar-ir/golangfuse/internal/constants"
12-
"github.com/divar-ir/golangfuse/internal/observer"
1311
"github.com/google/uuid"
1412
"resty.dev/v3"
1513
)
1614

1715
type Langfuse interface {
1816
StartSendingEvents(ctx context.Context, period time.Duration) error
1917
Trace(input, output any, options ...TraceOption)
20-
GetPromptTemplate(ctx context.Context, promptName string) (string, error)
18+
GetSystemPromptTemplate(ctx context.Context, promptName string) (string, error)
2119
}
2220

2321
type langfuseImpl struct {
2422
restClient *resty.Client
25-
eventObserver observer.Observer[IngestionEvent]
26-
eventQueue observer.Queue[IngestionEvent]
23+
eventBuffer *eventBuffer
2724
isSendingEventsStarted atomic.Bool
2825
endpoint string
2926
promptLabel string
@@ -37,24 +34,23 @@ func NewWithHttpClient(httpClient *http.Client, endpoint, publicKey, secretKey s
3734
client := resty.NewWithClient(httpClient).SetBasicAuth(publicKey, secretKey)
3835
c := &langfuseImpl{
3936
restClient: client,
40-
eventQueue: observer.NewQueue[IngestionEvent](),
4137
endpoint: endpoint,
4238
promptLabel: "production", // TODO: use option pattern to override this default if needed
4339
}
44-
c.eventObserver = observer.NewObserver[IngestionEvent](c.eventQueue, c.sendEvents)
40+
c.eventBuffer = newEventBufferer(c.sendEvents)
4541
return c
4642
}
4743

4844
func (c *langfuseImpl) StartSendingEvents(ctx context.Context, period time.Duration) error {
4945
if c.isSendingEventsStarted.CompareAndSwap(false, true) {
50-
go c.eventObserver.StartObserve(ctx, period)
46+
go c.eventBuffer.Start(ctx, period)
5147
return nil
5248
} else {
5349
return AlreadyStartedErr
5450
}
5551
}
5652

57-
func (c *langfuseImpl) GetPromptTemplate(ctx context.Context, promptName string) (string, error) {
53+
func (c *langfuseImpl) GetSystemPromptTemplate(ctx context.Context, promptName string) (string, error) {
5854
promptObject := ChatPrompt{}
5955
resp, err := c.restClient.R().
6056
SetContext(ctx).
@@ -75,7 +71,7 @@ func (c *langfuseImpl) GetPromptTemplate(ctx context.Context, promptName string)
7571
if promptObject.Prompt[0].Role != "system" {
7672
return "", fmt.Errorf("prompt role is not system")
7773
}
78-
return convertJinjaVariablesToGoTemplate(promptObject.Prompt[0].Content), nil
74+
return promptObject.Prompt[0].Content, nil
7975
}
8076

8177
func (c *langfuseImpl) Trace(input, output any, options ...TraceOption) {
@@ -86,7 +82,7 @@ func (c *langfuseImpl) Trace(input, output any, options ...TraceOption) {
8682
for _, opt := range options {
8783
opt(trace)
8884
}
89-
c.eventQueue.Enqueue(IngestionEvent{
85+
c.eventBuffer.Add(IngestionEvent{
9086
ID: uuid.NewString(),
9187
Timestamp: time.Now(),
9288
Type: constants.IngestionEventTypeTraceCreate,
@@ -108,8 +104,3 @@ func (c *langfuseImpl) sendEvents(ctx context.Context, events []IngestionEvent)
108104
}
109105
return nil
110106
}
111-
112-
func convertJinjaVariablesToGoTemplate(prompt string) string {
113-
re := regexp.MustCompile(`\{\{\s*([a-zA-Z0-9_]+)\s*\}\}`)
114-
return re.ReplaceAllString(prompt, "{{.$1}}")
115-
}

langfuse_test.go

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -38,34 +38,11 @@ func (s *ClientTest) TestGetPromptTemplateShouldParseApiResponse() {
3838
c := s.getLangfuseClientForTest(promptName, "This is system prompt")
3939

4040
// When
41-
promptTemplate, err := c.GetPromptTemplate(s.ctx, promptName)
41+
promptTemplate, err := c.GetSystemPromptTemplate(s.ctx, promptName)
4242
s.Require().NoError(err)
4343
s.Require().Equal("This is system prompt", promptTemplate)
4444
}
4545

46-
func (s *ClientTest) TestGetPromptTemplateShouldReturnVariablesWithGolangFormat() {
47-
// Given
48-
const promptName = "test-prompt"
49-
jinjaVarialbeVariants := []string{
50-
"{{myVar}}",
51-
"{{myVar }}",
52-
"{{ myVar}}",
53-
"{{ myVar }}",
54-
}
55-
56-
for _, v := range jinjaVarialbeVariants {
57-
c := s.getLangfuseClientForTest(promptName,
58-
fmt.Sprintf("This is system prompt %s variable.", v))
59-
60-
// When
61-
promptTemplate, err := c.GetPromptTemplate(s.ctx, promptName)
62-
63-
// Then
64-
s.Require().NoError(err)
65-
s.Require().Equal("This is system prompt {{.myVar}} variable.", promptTemplate)
66-
}
67-
}
68-
6946
func (s *ClientTest) getLangfuseClientForTest(promptName, promptContent string) golangfuse.Langfuse {
7047
apiResponse := fmt.Sprintf(`{
7148
"id" : "id",
@@ -104,7 +81,7 @@ func (s *ClientTest) TestShouldSetBasicAuth() {
10481
})
10582

10683
// When
107-
_, err := c.GetPromptTemplate(s.ctx, "test-prompt")
84+
_, err := c.GetSystemPromptTemplate(s.ctx, "test-prompt")
10885
s.Require().ErrorContains(err, "http mock called")
10986

11087
// Then

utils.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package golangfuse
2+
3+
import "regexp"
4+
5+
func JinjaToGoTemplate(prompt string) string {
6+
re := regexp.MustCompile(`\{\{\s*([a-zA-Z0-9_]+)\s*\}\}`)
7+
return re.ReplaceAllString(prompt, "{{.$1}}")
8+
}

utils_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package golangfuse_test
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/divar-ir/golangfuse"
8+
"github.com/stretchr/testify/suite"
9+
)
10+
11+
type UtilsTest struct {
12+
suite.Suite
13+
}
14+
15+
func (s *UtilsTest) TestJinjaToGoTemplateShouldReturnVariablesWithGolangFormat() {
16+
// Given
17+
jinjaVarialbeVariants := []string{
18+
"{{myVar}}",
19+
"{{myVar }}",
20+
"{{ myVar}}",
21+
"{{ myVar }}",
22+
}
23+
24+
for _, v := range jinjaVarialbeVariants {
25+
source := fmt.Sprintf("This is system prompt %s variable.", v)
26+
27+
// When
28+
target := golangfuse.JinjaToGoTemplate(source)
29+
30+
// Then
31+
s.Require().Equal("This is system prompt {{.myVar}} variable.", target)
32+
}
33+
}
34+
35+
func TestUtils(t *testing.T) {
36+
suite.Run(t, new(UtilsTest))
37+
}

0 commit comments

Comments
 (0)