Skip to content

Commit

Permalink
feat: ingestion log state queued (#176)
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Fiedorowicz <mfiedorowicz@netboxlabs.com>
  • Loading branch information
mfiedorowicz authored Sep 25, 2024
1 parent 9afb78e commit 45fdcac
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 125 deletions.
4 changes: 2 additions & 2 deletions diode-proto/diode/v1/reconciler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ message IngestionError {

enum State {
UNSPECIFIED = 0;
NEW = 1;
QUEUED = 1;
RECONCILED = 2;
FAILED = 3;
NO_CHANGES = 4;
Expand All @@ -64,7 +64,7 @@ enum State {
// Ingestion metrics
message IngestionMetrics {
int32 total = 1;
int32 new = 2;
int32 queued = 2;
int32 reconciled = 3;
int32 failed = 4;
int32 no_changes = 5;
Expand Down
217 changes: 109 additions & 108 deletions diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions diode-server/reconciler/ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis.
DataType: objectType,
Entity: v,
IngestionTs: int64(ingestionTs),
State: reconcilerpb.State_NEW,
State: reconcilerpb.State_QUEUED,
}

if _, err = p.writeIngestionLog(ctx, key, ingestionLog); err != nil {
Expand All @@ -225,7 +225,7 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis.
RequestID: ingestReq.GetId(),
DataType: objectType,
Entity: v,
State: int(reconcilerpb.State_NEW),
State: int(reconcilerpb.State_QUEUED),
}

changeSet, err := p.reconcileEntity(ctx, ingestEntity)
Expand Down
10 changes: 5 additions & 5 deletions diode-server/reconciler/logs_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func retrieveIngestionMetrics(ctx context.Context, client RedisClient) (*reconci
results := []*redis.Cmd{
pipe.Do(ctx, "FT.SEARCH", "ingest-entity", "*", "LIMIT", 0, 0),
}
for s := reconcilerpb.State_NEW; s <= reconcilerpb.State_NO_CHANGES; s++ {
for s := reconcilerpb.State_QUEUED; s <= reconcilerpb.State_NO_CHANGES; s++ {
stateName, ok := reconcilerpb.State_name[int32(s)]
if !ok {
return nil, fmt.Errorf("failed to retrieve ingestion logs: failed to get state name of %d", s)
Expand All @@ -112,8 +112,8 @@ func retrieveIngestionMetrics(ctx context.Context, client RedisClient) (*reconci
return nil, fmt.Errorf("failed to retrieve ingestion logs: failed to parse total_results")
}
total := int32(totalRes)
if q == int(reconcilerpb.State_NEW) {
metrics.New = total
if q == int(reconcilerpb.State_QUEUED) {
metrics.Queued = total
} else if q == int(reconcilerpb.State_RECONCILED) {
metrics.Reconciled = total
} else if q == int(reconcilerpb.State_FAILED) {
Expand Down Expand Up @@ -207,8 +207,8 @@ func retrieveIngestionLogs(ctx context.Context, logger *slog.Logger, client Redi
if in.State != nil {
if in.GetState() == reconcilerpb.State_UNSPECIFIED {
metrics.Total = response.TotalResults
} else if in.GetState() == reconcilerpb.State_NEW {
metrics.New = response.TotalResults
} else if in.GetState() == reconcilerpb.State_QUEUED {
metrics.Queued = response.TotalResults
} else if in.GetState() == reconcilerpb.State_RECONCILED {
metrics.Reconciled = response.TotalResults
} else if in.GetState() == reconcilerpb.State_FAILED {
Expand Down
14 changes: 7 additions & 7 deletions diode-server/reconciler/server_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func TestRetrieveLogs(t *testing.T) {
},
{
name: "filter by new state",
in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_NEW.Enum()},
in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_QUEUED.Enum()},
result: interface{}(map[interface{}]interface{}{
"attributes": []interface{}{},
"format": "STRING",
Expand All @@ -312,7 +312,7 @@ func TestRetrieveLogs(t *testing.T) {
Logs: []*reconcilerpb.IngestionLog{
{
DataType: "dcim.interface",
State: reconcilerpb.State_NEW,
State: reconcilerpb.State_QUEUED,
RequestId: "req-id",
IngestionTs: 1725552914392208722,
ProducerAppName: "diode-agent",
Expand All @@ -333,11 +333,11 @@ func TestRetrieveLogs(t *testing.T) {
},
},
Metrics: &reconcilerpb.IngestionMetrics{
New: 1,
Queued: 1,
},
NextPageToken: "AAAFlw==",
},
queryFilter: "@state:{NEW}",
queryFilter: "@state:{QUEUED}",
queryLimitOffset: 0,
failCmd: false,
hasError: false,
Expand Down Expand Up @@ -862,7 +862,7 @@ func TestRetrieveIngestionLogsMetricsOnly(t *testing.T) {
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false}))

expected := &reconcilerpb.IngestionMetrics{
New: 3,
Queued: 3,
Reconciled: 3,
Failed: 2,
NoChanges: 2,
Expand Down Expand Up @@ -895,10 +895,10 @@ func TestRetrieveIngestionLogsMetricsOnly(t *testing.T) {
"results": []interface{}{
map[interface{}]interface{}{},
},
"total_results": int64(expected.New),
"total_results": int64(expected.Queued),
"warning": []interface{}{},
}))
mockPipeliner.On("Do", ctx, []interface{}{"FT.SEARCH", "ingest-entity", "@state:{NEW}", "LIMIT", 0, 0}).Return(cmdNew)
mockPipeliner.On("Do", ctx, []interface{}{"FT.SEARCH", "ingest-entity", "@state:{QUEUED}", "LIMIT", 0, 0}).Return(cmdNew)

cmdReconciled := redis.NewCmd(ctx)
cmdReconciled.SetVal(interface{}(map[interface{}]interface{}{
Expand Down

0 comments on commit 45fdcac

Please sign in to comment.