-
Notifications
You must be signed in to change notification settings - Fork 20
/
handlers.go
203 lines (164 loc) · 5.48 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
package http
import (
"context"
"sync"
"github.com/EinStack/glide/pkg/api/schemas"
"github.com/EinStack/glide/pkg/routers"
"github.com/EinStack/glide/pkg/telemetry"
"github.com/gofiber/contrib/websocket"
"github.com/gofiber/fiber/v2"
"go.uber.org/zap"
)
type Handler = func(c *fiber.Ctx) error
// Swagger 101:
// - https://github.com/swaggo/swag/tree/master/example/celler
// LangChatHandler
//
// @id glide-language-chat
// @Summary Language Chat
// @Description Talk to different LLM Chat APIs via unified endpoint
// @tags Language
// @Param router path string true "Router ID"
// @Param payload body schemas.ChatRequest true "Request Data"
// @Accept json
// @Produce json
// @Success 200 {object} schemas.ChatResponse
// @Failure 400 {object} schemas.Error
// @Failure 404 {object} schemas.Error
// @Router /v1/language/{router}/chat [POST]
func LangChatHandler(routerManager *routers.RouterManager) Handler {
return func(c *fiber.Ctx) error {
if !c.Is("json") {
return c.Status(fiber.StatusBadRequest).JSON(schemas.ErrUnsupportedMediaType)
}
// Unmarshal request body
var req *schemas.ChatRequest
err := c.BodyParser(&req)
if err != nil {
return c.Status(fiber.StatusBadRequest).JSON(schemas.NewPayloadParseErr(err))
}
// Get router ID from path
routerID := c.Params("router")
router, err := routerManager.GetLangRouter(routerID)
if err != nil {
httpErr := schemas.FromErr(err)
return c.Status(httpErr.Status).JSON(httpErr)
}
// Chat with router
resp, err := router.Chat(c.Context(), req)
if err != nil {
httpErr := schemas.FromErr(err)
return c.Status(httpErr.Status).JSON(httpErr)
}
// Return chat response
return c.Status(fiber.StatusOK).JSON(resp)
}
}
func LangStreamRouterValidator(routerManager *routers.RouterManager) Handler {
return func(c *fiber.Ctx) error {
if websocket.IsWebSocketUpgrade(c) {
routerID := c.Params("router")
_, err := routerManager.GetLangRouter(routerID)
if err != nil {
httpErr := schemas.FromErr(err)
return c.Status(httpErr.Status).JSON(httpErr)
}
return c.Next()
}
return fiber.ErrUpgradeRequired
}
}
// LangStreamChatHandler
//
// @id glide-language-chat-stream
// @Summary Language Chat
// @Description Talk to different LLM Stream Chat APIs via a unified websocket endpoint
// @tags Language
// @Param router path string true "Router ID"
// @Param Connection header string true "Websocket Connection Type"
// @Param Upgrade header string true "Upgrade header"
// @Param Sec-WebSocket-Key header string true "Websocket Security Token"
// @Param Sec-WebSocket-Version header string true "Websocket Security Token"
// @Accept json
// @Success 101
// @Failure 426
// @Failure 404 {object} schemas.Error
// @Router /v1/language/{router}/chatStream [GET]
func LangStreamChatHandler(tel *telemetry.Telemetry, routerManager *routers.RouterManager) Handler {
// TODO: expose websocket connection configs https://github.com/gofiber/contrib/tree/main/websocket
return websocket.New(func(c *websocket.Conn) {
routerID := c.Params("router")
// websocket.Conn bindings https://pkg.go.dev/github.com/fasthttp/websocket?tab=doc#pkg-index
var (
err error
wg sync.WaitGroup
)
chatStreamC := make(chan *schemas.ChatStreamMessage)
router, _ := routerManager.GetLangRouter(routerID)
defer close(chatStreamC)
defer c.Conn.Close()
wg.Add(1)
go func() {
defer wg.Done()
for chatStreamMsg := range chatStreamC {
if err = c.WriteJSON(chatStreamMsg); err != nil {
break
}
}
}()
for {
var chatRequest schemas.ChatStreamRequest
if err = c.ReadJSON(&chatRequest); err != nil {
// TODO: handle bad request schemas gracefully and return back validation errors
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
tel.L().Warn("Streaming Chat connection is closed", zap.Error(err), zap.String("routerID", routerID))
}
tel.L().Debug("Streaming chat connection is closed by client", zap.Error(err), zap.String("routerID", routerID))
break
}
// TODO: handle termination gracefully
wg.Add(1)
go func(chatRequest schemas.ChatStreamRequest) {
defer wg.Done()
router.ChatStream(context.Background(), &chatRequest, chatStreamC)
}(chatRequest)
}
wg.Wait()
})
}
// LangRoutersHandler
//
// @id glide-language-routers
// @Summary Language Router List
// @Description Retrieve list of configured active language routers and their configurations
// @tags Language
// @Accept json
// @Produce json
// @Success 200 {object} schemas.RouterListSchema
// @Router /v1/language/ [GET]
func LangRoutersHandler(routerManager *routers.RouterManager) Handler {
return func(c *fiber.Ctx) error {
configuredRouters := routerManager.GetLangRouters()
cfgs := make([]interface{}, 0, len(configuredRouters)) // opaque by design
for _, router := range configuredRouters {
cfgs = append(cfgs, router.Config)
}
return c.Status(fiber.StatusOK).JSON(schemas.RouterListSchema{Routers: cfgs})
}
}
// HealthHandler
//
// @id glide-health
// @Summary Gateway Health
// @Description
// @tags Operations
// @Accept json
// @Produce json
// @Success 200 {object} schemas.HealthSchema
// @Router /v1/health/ [get]
func HealthHandler(c *fiber.Ctx) error {
return c.Status(fiber.StatusOK).JSON(schemas.HealthSchema{Healthy: true})
}
func NotFoundHandler(c *fiber.Ctx) error {
return c.Status(fiber.StatusNotFound).JSON(schemas.ErrRouteNotFound)
}