Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 0 additions & 146 deletions async_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1400,79 +1400,6 @@ func (al *AsyncInitialLoader) loadCompaniesConcurrent(ctx context.Context) error
return nil
}

// loadCompaniesSequential loads companies sequentially (fallback method)
func (al *AsyncInitialLoader) loadCompaniesSequential(ctx context.Context) error {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and loadUsersSequentially were unused.

pageSize := al.config.PageSize
offset := 0
totalCompanies := 0
var allCacheKeys []string

for {
select {
case <-ctx.Done():
return fmt.Errorf("companies loading cancelled: %w", ctx.Err())
default:
}

// Rate limiting
if err := al.rateLimiter.Wait(ctx); err != nil {
return err
}

// Circuit breaker check
if !al.apiCircuitBreaker.CanExecute() {
err := fmt.Errorf("companies loading blocked by circuit breaker")
al.apiCircuitBreaker.RecordFailure()
return err
}

companiesResp, err := al.schematicClient.Companies.ListCompanies(ctx, &schematicgo.ListCompaniesRequest{
Limit: &pageSize,
Offset: &offset,
})
if err != nil {
al.apiCircuitBreaker.RecordFailure()
return err
}

al.apiCircuitBreaker.RecordSuccess()

// Convert and cache companies
for _, companyData := range companiesResp.Data {
company := convertToRulesEngineCompany(companyData)

cacheResults := al.cacheCompanyForKeys(ctx, company)
for cacheKey, cacheErr := range cacheResults {
if cacheErr != nil {
al.logger.Error(ctx, fmt.Sprintf("Cache error for company %s key '%s': %v", company.ID, cacheKey, cacheErr))
} else {
allCacheKeys = append(allCacheKeys, cacheKey)
}
}
}

totalCompanies += len(companiesResp.Data)

// Check if we reached the end
if len(companiesResp.Data) < pageSize {
break
}

offset += pageSize
}

// Evict missing keys
if len(allCacheKeys) > 0 {
if err := al.companiesCache.DeleteMissing(ctx, allCacheKeys); err != nil {
al.logger.Error(ctx, fmt.Sprintf("Failed to evict missing company cache keys: %v", err))
}
}

al.totalCompanies = totalCompanies
al.logger.Info(ctx, fmt.Sprintf("Successfully cached %d companies via sequential async loading", totalCompanies))
return nil
}

// loadUsersAsync loads all users with concurrent requests and rate limiting
func (al *AsyncInitialLoader) loadUsersAsync(ctx context.Context) error {
al.logger.Info(ctx, "Starting async users loading")
Expand Down Expand Up @@ -1611,79 +1538,6 @@ func (al *AsyncInitialLoader) loadUsersConcurrent(ctx context.Context) error {
return nil
}

// loadUsersSequential loads users sequentially (fallback method)
func (al *AsyncInitialLoader) loadUsersSequential(ctx context.Context) error {
pageSize := al.config.PageSize
offset := 0
totalUsers := 0
var allCacheKeys []string

for {
select {
case <-ctx.Done():
return fmt.Errorf("users loading cancelled: %w", ctx.Err())
default:
}

// Rate limiting
if err := al.rateLimiter.Wait(ctx); err != nil {
return err
}

// Circuit breaker check
if !al.apiCircuitBreaker.CanExecute() {
err := fmt.Errorf("users loading blocked by circuit breaker")
al.apiCircuitBreaker.RecordFailure()
return err
}

usersResp, err := al.schematicClient.Companies.ListUsers(ctx, &schematicgo.ListUsersRequest{
Limit: &pageSize,
Offset: &offset,
})
if err != nil {
al.apiCircuitBreaker.RecordFailure()
return err
}

al.apiCircuitBreaker.RecordSuccess()

// Convert and cache users
for _, userData := range usersResp.Data {
user := convertToRulesEngineUser(userData)

cacheResults := al.cacheUserForKeys(ctx, user)
for cacheKey, cacheErr := range cacheResults {
if cacheErr != nil {
al.logger.Error(ctx, fmt.Sprintf("Cache error for user %s key '%s': %v", user.ID, cacheKey, cacheErr))
} else {
allCacheKeys = append(allCacheKeys, cacheKey)
}
}
}

totalUsers += len(usersResp.Data)

// Check if we reached the end
if len(usersResp.Data) < pageSize {
break
}

offset += pageSize
}

// Evict missing keys
if len(allCacheKeys) > 0 {
if err := al.usersCache.DeleteMissing(ctx, allCacheKeys); err != nil {
al.logger.Error(ctx, fmt.Sprintf("Failed to evict missing user cache keys: %v", err))
}
}

al.totalUsers = totalUsers
al.logger.Info(ctx, fmt.Sprintf("Successfully cached %d users via sequential async loading", totalUsers))
return nil
}

// cacheCompanyForKeys caches a company for all its key combinations (matching existing implementation)
func (al *AsyncInitialLoader) cacheCompanyForKeys(ctx context.Context, company *rulesengine.Company) map[string]error {
if company == nil || len(company.Keys) == 0 {
Expand Down
15 changes: 12 additions & 3 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func (r *redisCache[T]) Delete(ctx context.Context, key string) error {

// DeleteMissing removes keys that are not in the provided list
func (r *redisCache[T]) DeleteMissing(ctx context.Context, keys []string) error {
// Get all keys with the pattern (assuming cache keys have a pattern)
if len(keys) == 0 {
return nil
}
Expand Down Expand Up @@ -153,9 +152,19 @@ func (r *redisCache[T]) DeleteMissing(ctx context.Context, keys []string) error
}
}

// Delete the keys in batches
if len(keysToDelete) > 0 {
return r.client.Del(ctx, keysToDelete...).Err()
var deletionErrors []error

for _, key := range keysToDelete {
if err := r.client.Del(ctx, key).Err(); err != nil {
deletionErrors = append(deletionErrors, fmt.Errorf("failed to delete key %s: %w", key, err))
}
}

// Return first error if any occurred, but continue processing all keys
if len(deletionErrors) > 0 {
return fmt.Errorf("failed to delete %d/%d keys, first error: %w", len(deletionErrors), len(keysToDelete), deletionErrors[0])
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/alicebob/miniredis/v2 v2.35.0
github.com/redis/go-redis/v9 v9.14.1
github.com/schematichq/rulesengine v0.1.8
github.com/schematichq/schematic-datastream-ws v0.2.2
github.com/schematichq/schematic-datastream-ws v0.2.3
github.com/schematichq/schematic-go v1.3.5
github.com/stretchr/testify v1.11.1
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ github.com/redis/go-redis/v9 v9.14.1 h1:nDCrEiJmfOWhD76xlaw+HXT0c9hfNWeXgl0vIRYS
github.com/redis/go-redis/v9 v9.14.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/schematichq/rulesengine v0.1.8 h1:hmP0dXP3SNR43PbgnUihuvGnPpWgejEM3tY8/yTNkf0=
github.com/schematichq/rulesengine v0.1.8/go.mod h1:HmweBgjg+kVe6VuNMLc7FISHFF0wzr5cVGLAxlVP8+4=
github.com/schematichq/schematic-datastream-ws v0.2.2 h1:q0WTy7oyAmo9gWaQ65FvzrltE6u+1eYt877ttWzocig=
github.com/schematichq/schematic-datastream-ws v0.2.2/go.mod h1:NKrg326C9YfCDOI84On5GZhvrovzDpFCzW8wdBwPZ3g=
github.com/schematichq/schematic-datastream-ws v0.2.3 h1:kuVCZt6fgvAgc7zKTcRJlffOsXBZ3vs7KUIHvvfJ50Y=
github.com/schematichq/schematic-datastream-ws v0.2.3/go.mod h1:NKrg326C9YfCDOI84On5GZhvrovzDpFCzW8wdBwPZ3g=
github.com/schematichq/schematic-go v1.3.5 h1:SyPeLSAIdrJGIm5cmk8h69b0rliYP0CEFMDGPFPOK7U=
github.com/schematichq/schematic-go v1.3.5/go.mod h1:F8alToo/qP/e6V26uCDqfEDdRWNgBogFwKW5fDNaRfw=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
Expand Down