Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
module: [".", "bedrock", "cache", "openai", "scanner", "word2vec"]
module: [".", "llm/bedrock", "llm/openai", "llm/word2vec"]

steps:
- uses: actions/setup-go@v5
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
module: [".", "bedrock", "cache", "openai", "scanner", "word2vec"]
module: [".", "llm/bedrock", "llm/openai", "llm/word2vec"]

steps:
- uses: actions/setup-go@v5
Expand Down
54 changes: 37 additions & 17 deletions cache/cache.go → aio/cache.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//
// Copyright (C) 2024 Dmitry Kolesnikov
// Copyright (C) 2024 - 2025 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the MIT license. See the LICENSE file for details.
// https://github.com/kshard/embeddings
//

package cache
package aio

import (
"context"
Expand All @@ -18,52 +18,72 @@ import (
"github.com/kshard/embeddings"
)

// Getter interface abstract storage
type Getter interface{ Get([]byte) ([]byte, error) }

// Setter interface abstract storage
type Putter interface{ Put([]byte, []byte) error }

// KeyVal interface
type KeyVal interface {
Getter
Putter
}

type Cache struct {
embeddings.Embedder
cache KeyVal
}

var _ embeddings.Embedder = (*Cache)(nil)

// Creates caching layer for embeddings client.
//
// Use github.com/akrylysov/pogreb to cache embedding on local file systems:
//
// cli, err := /* create embeddings client */
// db, err := pogreb.Open("embeddings.cache", nil)
// text := cache.New(db, cli)
func New(cache Cache, embed embeddings.Embeddings) *Client {
return &Client{
embed: embed,
cache: cache,
// text := cache.NewCache(db, cli)
func NewCache(cache KeyVal, embedder embeddings.Embedder) *Cache {
return &Cache{
Embedder: embedder,
cache: cache,
}
}

func (c *Client) HashKey(text string) []byte {
func (c *Cache) HashKey(text string) []byte {
hash := sha1.New()
hash.Write([]byte(text))
return hash.Sum(nil)
}

func (c *Client) ConsumedTokens() int { return c.embed.ConsumedTokens() }

// Calculates embedding vector
func (c *Client) Embedding(ctx context.Context, text string) ([]float32, error) {
func (c *Cache) Embedding(ctx context.Context, text string) (embeddings.Embedding, error) {
hkey := c.HashKey(text)

val, err := c.cache.Get(hkey)
if err != nil {
return nil, err
return embeddings.Embedding{}, err
}

if len(val) != 0 {
return decodeFVec(val), nil
return embeddings.Embedding{
Text: text,
Vector: decodeFVec(val),
}, nil
}

vec, err := c.embed.Embedding(ctx, text)
reply, err := c.Embedder.Embedding(ctx, text)
if err != nil {
return nil, err
return embeddings.Embedding{}, err
}

err = c.cache.Put(hkey, encodeFVec(vec))
err = c.cache.Put(hkey, encodeFVec(reply.Vector))
if err != nil {
slog.Warn("failed to cache vector", "error", err)
}

return vec, nil
return reply, nil
}

func encodeFVec(v []float32) []byte {
Expand Down
17 changes: 4 additions & 13 deletions cache/cache_test.go → aio/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@
// https://github.com/kshard/embeddings
//

package cache_test
package aio_test

import (
"bytes"
"context"
"testing"

"github.com/kshard/embeddings/cache"
"github.com/kshard/embeddings/aio"
)

func TestCache(t *testing.T) {
kv := keyval{}
c := cache.New(kv, embed{})
c := aio.NewCache(kv, mockVector())

if c.ConsumedTokens() != 10 {
if c.UsedTokens() != 10 {
t.Errorf("unexpected ConsumedTokens output")
}

Expand All @@ -35,15 +35,6 @@ func TestCache(t *testing.T) {
}
}

// mock embedding client
type embed struct{}

func (embed) ConsumedTokens() int { return 10 }

func (embed) Embedding(ctx context.Context, text string) ([]float32, error) {
return []float32{0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0}, nil
}

// mock key-value
type keyval map[string][]byte

Expand Down
64 changes: 64 additions & 0 deletions aio/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//
// Copyright (C) 2024 - 2025 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the MIT license. See the LICENSE file for details.
// https://github.com/kshard/embeddings
//

package aio

import (
"context"
"log/slog"

"github.com/kshard/embeddings"
"golang.org/x/time/rate"
)

// Rate limit startegy for LLMs I/O
type Limiter struct {
embeddings.Embedder
debt int
rps *rate.Limiter
tps *rate.Limiter
}

var _ embeddings.Embedder = (*Limiter)(nil)

// Create rate limit strategy for LLMs.
// It defines per minute policy for requests and tokens.
func NewLimiter(requestPerMin int, tokensPerMin int, embedder embeddings.Embedder) *Limiter {
return &Limiter{
Embedder: embedder,
debt: 0,
rps: rate.NewLimiter(rate.Limit(requestPerMin)/60, requestPerMin),
tps: rate.NewLimiter(rate.Limit(tokensPerMin)/60, tokensPerMin),
}
}

func (c *Limiter) Embedding(ctx context.Context, text string) (embeddings.Embedding, error) {
if err := c.rps.Wait(ctx); err != nil {
return embeddings.Embedding{}, err
}

if err := c.tps.WaitN(ctx, c.debt); err != nil {
return embeddings.Embedding{}, err
}

reply, err := c.Embedder.Embedding(ctx, text)
if err != nil {
return embeddings.Embedding{}, err
}

c.debt = reply.UsedTokens

slog.Debug("LLM is prompted",
slog.Float64("budget", c.tps.Tokens()),
slog.Int("debt", c.debt),
slog.Int("sessionTokens", c.Embedder.UsedTokens()),
slog.Int("replyTokens", reply.UsedTokens),
)

return reply, nil
}
72 changes: 72 additions & 0 deletions aio/limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//
// Copyright (C) 2025 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the MIT license. See the LICENSE file for details.
// https://github.com/kshard/chatter
//

package aio_test

import (
"context"
"testing"
"time"

"github.com/fogfish/it/v2"
"github.com/kshard/embeddings"
"github.com/kshard/embeddings/aio"
)

func mockTokensUsage(n int) mock {
return mock{embeddings.Embedding{Vector: nil, UsedTokens: n}}
}

func TestLimiter(t *testing.T) {
n := 8

t.Run("RequestPerMinute", func(t *testing.T) {
rpm := n
tpm := 100000
api := aio.NewLimiter(rpm, tpm, mockTokensUsage(1000))

prompt := func() error {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

_, err := api.Embedding(ctx, "text")
return err
}

for range rpm {
err := prompt()
it.Then(t).Should(it.Nil(err))
}

err := prompt()
it.Then(t).ShouldNot(it.Nil(err))
})

t.Run("TokensPerMinute", func(t *testing.T) {
rpm := 100000
tpm := n * 1000
api := aio.NewLimiter(rpm, tpm, mockTokensUsage(1000))

prompt := func() error {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

_, err := api.Embedding(ctx, "text")
return err
}

// +1 due to debt model
for range n + 1 {
err := prompt()
it.Then(t).Should(it.Nil(err))
}

err := prompt()
it.Then(t).ShouldNot(it.Nil(err))
})
}
35 changes: 35 additions & 0 deletions aio/mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//
// Copyright (C) 2024 - 2025 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the MIT license. See the LICENSE file for details.
// https://github.com/kshard/embeddings
//

package aio_test

import (
"context"

"github.com/kshard/embeddings"
)

// mock embedding client
type mock struct {
reply embeddings.Embedding
}

func mockVector() mock {
return mock{
embeddings.Embedding{
UsedTokens: 10,
Vector: []float32{0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0},
},
}
}

func (mock mock) UsedTokens() int { return mock.reply.UsedTokens }

func (mock mock) Embedding(ctx context.Context, text string) (embeddings.Embedding, error) {
return mock.reply, nil
}
File renamed without changes.
Loading
Loading