Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle out of order events #5071

Merged
merged 20 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions cmd/spire-server/cli/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,11 @@ type serverConfig struct {
}

type experimentalConfig struct {
AuthOpaPolicyEngine *authpolicy.OpaEngineConfig `hcl:"auth_opa_policy_engine"`
CacheReloadInterval string `hcl:"cache_reload_interval"`
EventsBasedCache bool `hcl:"events_based_cache"`
PruneEventsOlderThan string `hcl:"prune_events_older_than"`
AuthOpaPolicyEngine *authpolicy.OpaEngineConfig `hcl:"auth_opa_policy_engine"`
CacheReloadInterval string `hcl:"cache_reload_interval"`
EventsBasedCache bool `hcl:"events_based_cache"`
PruneEventsOlderThan string `hcl:"prune_events_older_than"`
SQLTransactionTimeout string `hcl:"sql_transaction_timeout"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it really required?
do you have a use case where a timeout that is used for pruning can be useful?


Flags fflag.RawConfig `hcl:"feature_flags"`

Expand Down Expand Up @@ -677,6 +678,14 @@ func NewServerConfig(c *Config, logOptions []log.Option, allowUnknownConfig bool
sc.PruneEventsOlderThan = interval
}

if c.Server.Experimental.SQLTransactionTimeout != "" {
interval, err := time.ParseDuration(c.Server.Experimental.SQLTransactionTimeout)
if err != nil {
return nil, fmt.Errorf("could not parse SQL transaction timeout interval: %w", err)
}
sc.SQLTransactionTimeout = interval
}

if c.Server.Experimental.EventsBasedCache {
sc.Log.Info("Using events based cache")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/common/telemetry/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ const (
// non-error level.
Error = "error"

// EventID tags an event ID
EventID = "event_id"

// Expect tags an expected value, as opposed to the one received. Message should clarify
// what kind of value was expected, and a different field should show the received value
Expect = "expect"
Expand Down
40 changes: 14 additions & 26 deletions pkg/common/telemetry/server/datastore/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,20 @@ func StartPruneRegistrationEntriesEventsCall(m telemetry.Metrics) *telemetry.Cal
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Prune)
}

// StartGetLatestRegistrationEntryEventIDCall return metric
// for server's datastore, on listing latest registration entry event id.
func StartGetLatestRegistrationEntryEventIDCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Fetch)
}

// StartCreateRegistrationEntryEventCall return metric
// for server's datastore, on listing registration entry events.
func StartCreateRegistrationEntryEventCall(m telemetry.Metrics) *telemetry.CallCounter {
// StartCreateRegistrationEntryEventForTestingCall return metric
// for server's datastore, on creating a registration entry event.
func StartCreateRegistrationEntryEventForTestingCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Create)
}

// StartDeleteRegistrationEntryEventCall return metric
// for server's datastore, on listing registration entry events.
func StartDeleteRegistrationEntryEventCall(m telemetry.Metrics) *telemetry.CallCounter {
// StartDeleteRegistrationEntryEventForTestingCall return metric
// for server's datastore, on deleting a registration entry event.
func StartDeleteRegistrationEntryEventForTestingCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Delete)
}

// StartFetchRegistrationEntryEventCall return metric
// for server's datastore, on listing registration entry events.
// for server's datastore, on fetching a registration entry event.
func StartFetchRegistrationEntryEventCall(m telemetry.Metrics) *telemetry.CallCounter {
faisal-memon marked this conversation as resolved.
Show resolved Hide resolved
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Fetch)
}
Expand All @@ -52,26 +46,20 @@ func StartPruneAttestedNodesEventsCall(m telemetry.Metrics) *telemetry.CallCount
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.Prune)
}

// StartGetLatestAttestedNodeEventIDCall return metric
// for server's datastore, on listing attested node event id.
func StartGetLatestAttestedNodeEventIDCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.Fetch)
}

// StartCreateAttestedNodeEventCall return metric
// for server's datastore, on listing registration entry events.
func StartCreateAttestedNodeEventCall(m telemetry.Metrics) *telemetry.CallCounter {
// StartCreateAttestedNodeEventForTestingCall return metric
// for server's datastore, on creating an attested node event.
func StartCreateAttestedNodeEventForTestingCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.Create)
}

// StartDeleteAttestedNodeEventCall return metric
// for server's datastore, on listing registration entry events.
func StartDeleteAttestedNodeEventCall(m telemetry.Metrics) *telemetry.CallCounter {
// StartDeleteAttestedNodeEventForTestingCall return metric
// for server's datastore, on deleting an attested node event.
func StartDeleteAttestedNodeEventForTestingCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.Delete)
}

// StartFetchAttestedNodeEventCall return metric
// for server's datastore, on listing registration entry events.
// for server's datastore, on fetching an attested node event.
func StartFetchAttestedNodeEventCall(m telemetry.Metrics) *telemetry.CallCounter {
faisal-memon marked this conversation as resolved.
Show resolved Hide resolved
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.Fetch)
}
36 changes: 12 additions & 24 deletions pkg/common/telemetry/server/datastore/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ func (w metricsWrapper) CreateAttestedNode(ctx context.Context, node *common.Att
return w.ds.CreateAttestedNode(ctx, node)
}

func (w metricsWrapper) CreateAttestedNodeEvent(ctx context.Context, event *datastore.AttestedNodeEvent) (_ *datastore.AttestedNodeEvent, err error) {
callCounter := StartCreateAttestedNodeEventCall(w.m)
func (w metricsWrapper) CreateAttestedNodeEventForTesting(ctx context.Context, event *datastore.AttestedNodeEvent) (err error) {
callCounter := StartCreateAttestedNodeEventForTestingCall(w.m)
defer callCounter.Done(&err)
return w.ds.CreateAttestedNodeEvent(ctx, event)
return w.ds.CreateAttestedNodeEventForTesting(ctx, event)
}

func (w metricsWrapper) CreateBundle(ctx context.Context, bundle *common.Bundle) (_ *common.Bundle, err error) {
Expand All @@ -66,10 +66,10 @@ func (w metricsWrapper) CreateOrReturnRegistrationEntry(ctx context.Context, ent
return w.ds.CreateOrReturnRegistrationEntry(ctx, entry)
}

func (w metricsWrapper) CreateRegistrationEntryEvent(ctx context.Context, event *datastore.RegistrationEntryEvent) (_ *datastore.RegistrationEntryEvent, err error) {
callCounter := StartCreateRegistrationEntryEventCall(w.m)
func (w metricsWrapper) CreateRegistrationEntryEventForTesting(ctx context.Context, event *datastore.RegistrationEntryEvent) (err error) {
callCounter := StartCreateRegistrationEntryEventForTestingCall(w.m)
defer callCounter.Done(&err)
return w.ds.CreateRegistrationEntryEvent(ctx, event)
return w.ds.CreateRegistrationEntryEventForTesting(ctx, event)
}

func (w metricsWrapper) CreateFederationRelationship(ctx context.Context, fr *datastore.FederationRelationship) (_ *datastore.FederationRelationship, err error) {
Expand All @@ -90,10 +90,10 @@ func (w metricsWrapper) DeleteAttestedNode(ctx context.Context, spiffeID string)
return w.ds.DeleteAttestedNode(ctx, spiffeID)
}

func (w metricsWrapper) DeleteAttestedNodeEvent(ctx context.Context, eventID uint) (err error) {
callCounter := StartDeleteAttestedNodeEventCall(w.m)
func (w metricsWrapper) DeleteAttestedNodeEventForTesting(ctx context.Context, eventID uint) (err error) {
callCounter := StartDeleteAttestedNodeEventForTestingCall(w.m)
defer callCounter.Done(&err)
return w.ds.DeleteAttestedNodeEvent(ctx, eventID)
return w.ds.DeleteAttestedNodeEventForTesting(ctx, eventID)
}

func (w metricsWrapper) DeleteBundle(ctx context.Context, trustDomain string, mode datastore.DeleteMode) (err error) {
Expand All @@ -120,10 +120,10 @@ func (w metricsWrapper) DeleteRegistrationEntry(ctx context.Context, entryID str
return w.ds.DeleteRegistrationEntry(ctx, entryID)
}

func (w metricsWrapper) DeleteRegistrationEntryEvent(ctx context.Context, eventID uint) (err error) {
callCounter := StartDeleteRegistrationEntryEventCall(w.m)
func (w metricsWrapper) DeleteRegistrationEntryEventForTesting(ctx context.Context, eventID uint) (err error) {
callCounter := StartDeleteRegistrationEntryEventForTestingCall(w.m)
defer callCounter.Done(&err)
return w.ds.DeleteRegistrationEntryEvent(ctx, eventID)
return w.ds.DeleteRegistrationEntryEventForTesting(ctx, eventID)
}

func (w metricsWrapper) FetchAttestedNode(ctx context.Context, spiffeID string) (_ *common.AttestedNode, err error) {
Expand Down Expand Up @@ -168,18 +168,6 @@ func (w metricsWrapper) FetchFederationRelationship(ctx context.Context, trustDo
return w.ds.FetchFederationRelationship(ctx, trustDomain)
}

func (w metricsWrapper) GetLatestAttestedNodeEventID(ctx context.Context) (_ uint, err error) {
callCounter := StartGetLatestAttestedNodeEventIDCall(w.m)
defer callCounter.Done(&err)
return w.ds.GetLatestAttestedNodeEventID(ctx)
}

func (w metricsWrapper) GetLatestRegistrationEntryEventID(ctx context.Context) (_ uint, err error) {
callCounter := StartGetLatestRegistrationEntryEventIDCall(w.m)
defer callCounter.Done(&err)
return w.ds.GetLatestRegistrationEntryEventID(ctx)
}

func (w metricsWrapper) GetNodeSelectors(ctx context.Context, spiffeID string, dataConsistency datastore.DataConsistency) (_ []*common.Selector, err error) {
callCounter := StartGetNodeSelectorsCall(w.m)
defer callCounter.Done(&err)
Expand Down
36 changes: 10 additions & 26 deletions pkg/common/telemetry/server/datastore/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestWithMetrics(t *testing.T) {
},
{
key: "datastore.node_event.create",
methodName: "CreateAttestedNodeEvent",
methodName: "CreateAttestedNodeEventForTesting",
},
{
key: "datastore.bundle.create",
Expand All @@ -84,15 +84,15 @@ func TestWithMetrics(t *testing.T) {
},
{
key: "datastore.registration_entry_event.create",
methodName: "CreateRegistrationEntryEvent",
methodName: "CreateRegistrationEntryEventForTesting",
},
{
key: "datastore.node.delete",
methodName: "DeleteAttestedNode",
},
{
key: "datastore.node_event.delete",
methodName: "DeleteAttestedNodeEvent",
methodName: "DeleteAttestedNodeEventForTesting",
},
{
key: "datastore.bundle.delete",
Expand All @@ -112,7 +112,7 @@ func TestWithMetrics(t *testing.T) {
},
{
key: "datastore.registration_entry_event.delete",
methodName: "DeleteRegistrationEntryEvent",
methodName: "DeleteRegistrationEntryEventForTesting",
},
{
key: "datastore.node.fetch",
Expand Down Expand Up @@ -142,14 +142,6 @@ func TestWithMetrics(t *testing.T) {
key: "datastore.federation_relationship.fetch",
methodName: "FetchFederationRelationship",
},
{
key: "datastore.node_event.fetch",
methodName: "GetLatestAttestedNodeEventID",
},
{
key: "datastore.registration_entry_event.fetch",
methodName: "GetLatestRegistrationEntryEventID",
},
{
key: "datastore.node.selectors.fetch",
methodName: "GetNodeSelectors",
Expand Down Expand Up @@ -358,8 +350,8 @@ func (ds *fakeDataStore) CreateAttestedNode(context.Context, *common.AttestedNod
return &common.AttestedNode{}, ds.err
}

func (ds *fakeDataStore) CreateAttestedNodeEvent(context.Context, *datastore.AttestedNodeEvent) (*datastore.AttestedNodeEvent, error) {
return &datastore.AttestedNodeEvent{}, ds.err
func (ds *fakeDataStore) CreateAttestedNodeEventForTesting(context.Context, *datastore.AttestedNodeEvent) error {
return ds.err
}

func (ds *fakeDataStore) CreateBundle(context.Context, *common.Bundle) (*common.Bundle, error) {
Expand All @@ -386,15 +378,15 @@ func (ds *fakeDataStore) CreateOrReturnRegistrationEntry(context.Context, *commo
return &common.RegistrationEntry{}, true, ds.err
}

func (ds *fakeDataStore) CreateRegistrationEntryEvent(context.Context, *datastore.RegistrationEntryEvent) (*datastore.RegistrationEntryEvent, error) {
return &datastore.RegistrationEntryEvent{}, ds.err
func (ds *fakeDataStore) CreateRegistrationEntryEventForTesting(context.Context, *datastore.RegistrationEntryEvent) error {
return ds.err
}

func (ds *fakeDataStore) DeleteAttestedNode(context.Context, string) (*common.AttestedNode, error) {
return &common.AttestedNode{}, ds.err
}

func (ds *fakeDataStore) DeleteAttestedNodeEvent(context.Context, uint) error {
func (ds *fakeDataStore) DeleteAttestedNodeEventForTesting(context.Context, uint) error {
return ds.err
}

Expand All @@ -414,7 +406,7 @@ func (ds *fakeDataStore) DeleteRegistrationEntry(context.Context, string) (*comm
return &common.RegistrationEntry{}, ds.err
}

func (ds *fakeDataStore) DeleteRegistrationEntryEvent(context.Context, uint) error {
func (ds *fakeDataStore) DeleteRegistrationEntryEventForTesting(context.Context, uint) error {
return ds.err
}

Expand Down Expand Up @@ -446,14 +438,6 @@ func (ds *fakeDataStore) FetchRegistrationEntryEvent(context.Context, uint) (*da
return &datastore.RegistrationEntryEvent{}, ds.err
}

func (ds *fakeDataStore) GetLatestAttestedNodeEventID(context.Context) (uint, error) {
return 0, ds.err
}

func (ds *fakeDataStore) GetLatestRegistrationEntryEventID(context.Context) (uint, error) {
return 0, ds.err
}

func (ds *fakeDataStore) GetNodeSelectors(context.Context, string, datastore.DataConsistency) ([]*common.Selector, error) {
return []*common.Selector{}, ds.err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ type Config struct {
// PruneEventsOlderThan controls how long events can live before they are pruned
PruneEventsOlderThan time.Duration

// SQLTransactionTimeout controls how long to wait for an event before giving up
SQLTransactionTimeout time.Duration

// AuthPolicyEngineConfig determines the config for authz policy
AuthOpaPolicyEngineConfig *authpolicy.OpaEngineConfig

Expand Down
10 changes: 4 additions & 6 deletions pkg/server/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ type DataStore interface {
// Entries Events
ListRegistrationEntriesEvents(ctx context.Context, req *ListRegistrationEntriesEventsRequest) (*ListRegistrationEntriesEventsResponse, error)
PruneRegistrationEntriesEvents(ctx context.Context, olderThan time.Duration) error
GetLatestRegistrationEntryEventID(ctx context.Context) (uint, error)
CreateRegistrationEntryEvent(ctx context.Context, event *RegistrationEntryEvent) (*RegistrationEntryEvent, error)
DeleteRegistrationEntryEvent(ctx context.Context, eventID uint) error
FetchRegistrationEntryEvent(ctx context.Context, eventID uint) (*RegistrationEntryEvent, error)
CreateRegistrationEntryEventForTesting(ctx context.Context, event *RegistrationEntryEvent) error
DeleteRegistrationEntryEventForTesting(ctx context.Context, eventID uint) error

// Nodes
CountAttestedNodes(context.Context, *CountAttestedNodesRequest) (int32, error)
Expand All @@ -59,10 +58,9 @@ type DataStore interface {
// Nodes Events
ListAttestedNodesEvents(ctx context.Context, req *ListAttestedNodesEventsRequest) (*ListAttestedNodesEventsResponse, error)
PruneAttestedNodesEvents(ctx context.Context, olderThan time.Duration) error
GetLatestAttestedNodeEventID(ctx context.Context) (uint, error)
CreateAttestedNodeEvent(ctx context.Context, event *AttestedNodeEvent) (*AttestedNodeEvent, error)
DeleteAttestedNodeEvent(ctx context.Context, eventID uint) error
FetchAttestedNodeEvent(ctx context.Context, eventID uint) (*AttestedNodeEvent, error)
CreateAttestedNodeEventForTesting(ctx context.Context, event *AttestedNodeEvent) error
DeleteAttestedNodeEventForTesting(ctx context.Context, eventID uint) error

// Node selectors
GetNodeSelectors(ctx context.Context, spiffeID string, dataConsistency DataConsistency) ([]*common.Selector, error)
Expand Down
Loading
Loading