Skip to content

Commit 86a39f8

Browse files
committed
add shutdown
1 parent 9633942 commit 86a39f8

File tree

4 files changed

+119
-2
lines changed

4 files changed

+119
-2
lines changed

errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ package golangfuse
33
import "errors"
44

55
var AlreadyStartedErr = errors.New("already started")
6+
var AlreadyShutdownErr = errors.New("already shutdown")

event_buffer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func (i *eventBuffer) Start(ctx context.Context, period time.Duration) {
2626
for {
2727
select {
2828
case <-ticker.C:
29-
i.flush(ctx)
29+
i.Flush(ctx)
3030
case <-ctx.Done():
3131
return
3232
}
@@ -39,7 +39,7 @@ func (i *eventBuffer) Add(event IngestionEvent) {
3939
i.bufferedEvents = append(i.bufferedEvents, event)
4040
}
4141

42-
func (i *eventBuffer) flush(ctx context.Context) {
42+
func (i *eventBuffer) Flush(ctx context.Context) {
4343
i.mu.Lock()
4444
items := i.bufferedEvents
4545
i.bufferedEvents = nil

langfuse.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import (
1414

1515
type Langfuse interface {
1616
StartSendingEvents(ctx context.Context, period time.Duration) error
17+
// Shutdown stops the event buffer and cleans up resources.
18+
// It also ensures that any pending events are sent before shutdown.
19+
Shutdown(ctx context.Context) error
1720
Trace(input, output any, options ...TraceOption)
1821
GetSystemPromptTemplate(ctx context.Context, promptName string) (string, error)
1922
}
@@ -22,6 +25,8 @@ type langfuseImpl struct {
2225
restClient *resty.Client
2326
eventBuffer *eventBuffer
2427
isSendingEventsStarted atomic.Bool
28+
isShutdown atomic.Bool
29+
cancelFunc context.CancelFunc
2530
endpoint string
2631
promptLabel string
2732
}
@@ -42,14 +47,39 @@ func NewWithHttpClient(httpClient *http.Client, endpoint, publicKey, secretKey s
4247
}
4348

4449
func (c *langfuseImpl) StartSendingEvents(ctx context.Context, period time.Duration) error {
50+
if c.isShutdown.Load() {
51+
return AlreadyShutdownErr
52+
}
4553
if c.isSendingEventsStarted.CompareAndSwap(false, true) {
54+
ctx, cancel := context.WithCancel(ctx)
55+
c.cancelFunc = cancel
4656
go c.eventBuffer.Start(ctx, period)
4757
return nil
4858
} else {
4959
return AlreadyStartedErr
5060
}
5161
}
5262

63+
func (c *langfuseImpl) Shutdown(ctx context.Context) error {
64+
// Check if already shutdown
65+
if c.isShutdown.CompareAndSwap(false, true) {
66+
// Check if events were started
67+
if !c.isSendingEventsStarted.Load() {
68+
return nil
69+
}
70+
71+
// Cancel the event buffer goroutine
72+
c.cancelFunc()
73+
74+
// Flush any remaining events before shutdown
75+
c.eventBuffer.Flush(ctx)
76+
77+
return nil
78+
} else {
79+
return AlreadyShutdownErr
80+
}
81+
}
82+
5383
func (c *langfuseImpl) GetSystemPromptTemplate(ctx context.Context, promptName string) (string, error) {
5484
promptObject := ChatPrompt{}
5585
resp, err := c.restClient.R().

langfuse_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,92 @@ func (s *ClientTest) getClientWithMockedHttpTransport(transport httpmock.RoundTr
209209
)
210210
}
211211

212+
func (s *ClientTest) TestShutdownShouldReturnErrorWhenAlreadyShutdown() {
213+
// Given
214+
c := s.getClient()
215+
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
216+
s.Require().NoError(err)
217+
err = c.Shutdown(s.ctx)
218+
s.Require().NoError(err)
219+
220+
// When
221+
err = c.Shutdown(s.ctx)
222+
223+
// Then
224+
s.Require().ErrorContains(err, "already shutdown")
225+
}
226+
227+
func (s *ClientTest) TestShutdownShouldSucceedAfterStart() {
228+
// Given
229+
c := s.getClient()
230+
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
231+
s.Require().NoError(err)
232+
233+
// When
234+
err = c.Shutdown(s.ctx)
235+
236+
// Then
237+
s.Require().NoError(err)
238+
}
239+
240+
func (s *ClientTest) TestShutdownShouldFlushPendingEvents() {
241+
// Given
242+
var sentRequestBody []byte
243+
wg := &sync.WaitGroup{}
244+
wg.Add(1)
245+
c := s.getClientWithMockedHttpTransport(func(req *http.Request) (*http.Response, error) {
246+
defer wg.Done()
247+
body, err := io.ReadAll(req.Body)
248+
if err != nil {
249+
return nil, err
250+
}
251+
sentRequestBody = body
252+
return &http.Response{
253+
StatusCode: http.StatusMultiStatus,
254+
Body: io.NopCloser(strings.NewReader("{}")),
255+
}, nil
256+
})
257+
err := c.StartSendingEvents(s.ctx, 1*time.Hour) // Long period to prevent automatic Flush
258+
s.Require().NoError(err)
259+
260+
// When
261+
c.Trace("input", "output")
262+
err = c.Shutdown(s.ctx)
263+
s.Require().NoError(err)
264+
wg.Wait()
265+
266+
// Then
267+
type requestBody struct {
268+
Batch []struct {
269+
Body struct {
270+
Input string `json:"input"`
271+
Output string `json:"output"`
272+
}
273+
} `json:"batch"`
274+
}
275+
bodyObj := requestBody{}
276+
err = json.Unmarshal(sentRequestBody, &bodyObj)
277+
s.Require().NoError(err)
278+
s.Require().Len(bodyObj.Batch, 1)
279+
s.Require().Equal("input", bodyObj.Batch[0].Body.Input)
280+
s.Require().Equal("output", bodyObj.Batch[0].Body.Output)
281+
}
282+
283+
func (s *ClientTest) TestStartSendingEventsShouldReturnErrorAfterShutdown() {
284+
// Given
285+
c := s.getClient()
286+
err := c.StartSendingEvents(s.ctx, 1*time.Microsecond)
287+
s.Require().NoError(err)
288+
err = c.Shutdown(s.ctx)
289+
s.Require().NoError(err)
290+
291+
// When
292+
err = c.StartSendingEvents(s.ctx, 1*time.Microsecond)
293+
294+
// Then
295+
s.Require().ErrorContains(err, "already shutdown")
296+
}
297+
212298
func TestLangfuseClient(t *testing.T) {
213299
suite.Run(t, new(ClientTest))
214300
}

0 commit comments

Comments
 (0)