diff --git a/CHANGELOG.md b/CHANGELOG.md index 335b18684..5007af74d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,48 @@ ## What's New -* Moved `ziti ops verify-network` to `ziti ops verify network` -* Moved `ziti ops verify traffic` to `ziti ops verify traffic` -* Added `ziti ops verify ext-jwt-signer oidc` to help users with configuring OIDC external auth * Use `cluster` consistently for cluster operations -* Bug Fixes +* Event Doc and Consistency +* ziti ops verify changes + * Moved `ziti ops verify-network` to `ziti ops verify network` + * Moved `ziti ops verify traffic` to `ziti ops verify traffic` + * Added `ziti ops verify ext-jwt-signer oidc` to help users with configuring OIDC external auth +* Bug fixes + +## Event Doc and Consistency + +The event types are now exhaustively documented as part of the [OpenZiti Reference Documentation](https://openziti.io/docs/reference/events). + +During documentation, some inconsistencies were found the following changes were made. + +### Namespace Cleanup +Namespaces have been cleaned up, with the following changes: + +* edge.apiSessions -> apiSession +* fabric.circuits -> circuit +* edge.entityCount -> entityCount +* fabric.links -> link +* fabric.routers -> router +* services -> service +* edge.sessions -> session +* fabric.terminators -> terminator +* fabric.usage -> usage + +Note that service events used `services` in the config file, but `service.events` in the event itself. +The old namespaces still work. If the old event type is used in the config file, the old namespace will be in the events as well + +### Timestamp field + +The following event types now have a timestamp field: + +* service +* usage + +This timestamp is the time the event was generated. + +### Event Source ID +All event types now have a new field: `event_src_id`. This field is the id of the controller +which emitted the event. This may be useful in HA environments, during event processing. ## Cluster Operations Naming @@ -17,11 +54,13 @@ The Raft APIs available in the fabric management API are now namespaced under Cl ## Component Updates and Bug Fixes * github.com/openziti/ziti: [v1.3.3 -> v1.4.0](https://github.com/openziti/ziti/compare/v1.3.3...v1.4.0) + * [Issue #2720](https://github.com/openziti/ziti/issues/2720) - new verify oidc command on prints usage * [Issue #2546](https://github.com/openziti/ziti/issues/2546) - Use consistent terminology for HA + * [Issue #2713](https://github.com/openziti/ziti/issues/2713) - Routers with no edge components shouldn't subscribe to RDM updates # Release 1.3.3 -# What's New +## What's New * Bug Fixes @@ -33,7 +72,7 @@ The Raft APIs available in the fabric management API are now namespaced under Cl # Release 1.3.2 -# What's New +## What's New * Bug Fixes @@ -45,7 +84,7 @@ The Raft APIs available in the fabric management API are now namespaced under Cl # Release 1.3.1 -# What's New +## What's New * Bug Fixes diff --git a/controller/event/api_session.go b/controller/event/api_session.go index e0c4de295..25a0e08c3 100644 --- a/controller/event/api_session.go +++ b/controller/event/api_session.go @@ -25,21 +25,60 @@ const ApiSessionEventTypeCreated = "created" const ApiSessionEventTypeDeleted = "deleted" const ApiSessionEventTypeRefreshed = "refreshed" const ApiSessionEventTypeExchanged = "exchanged" -const ApiSessionEventNS = "edge.apiSessions" +const ApiSessionEventNS = "apiSession" const ApiSessionTypeLegacy = "legacy" const ApiSessionTypeJwt = "jwt" +// An ApiSessionEvent is emitted whenever an api session is created, deleted, refreshed or exchanged. +// Legacy sessions are only ever created or deleted. JWT sessions are created, refreshed and exchanged. +// +// Note: In version prior to 1.4.0, the namespace was `edge.apiSessions` +// +// Valid api session event types are: +// - created +// - deleted +// - refreshed +// - exchanged +// +// Valid api session types are: +// - jwt +// - legacy +// +// Example: Api Session Created Event +// +// { +// "namespace": "apiSession", +// "event_src_id" : "ctrl1", +// "timestamp": "2021-11-08T14:45:45.785561479-05:00", +// "event_type": "created", +// "id": "ckvr2r4fs0001oigd6si4akc8", +// "token": "77cffde5-f68e-4ef0-bbb5-731db36145f5", +// "identity_id": "76BB.shC0", +// "ip_address": "127.0.0.1" +// } type ApiSessionEvent struct { Namespace string `json:"namespace"` - EventType string `json:"event_type"` EventSrcId string `json:"event_src_id"` - Id string `json:"id"` - Type string `json:"type"` Timestamp time.Time `json:"timestamp"` - Token string `json:"token"` - IdentityId string `json:"identity_id"` - IpAddress string `json:"ip_address"` + + // The type api session event. See above for valid values. + EventType string `json:"event_type"` + + // Id is the api session id. + Id string `json:"id"` + + // Type is the api session type. See above for valid values. + Type string `json:"type"` + + // The api session token. + Token string `json:"token"` + + // The id of the identity that the api session belongs to. + IdentityId string `json:"identity_id"` + + // The IP address from which the identity to connected to require the api session. + IpAddress string `json:"ip_address"` } func (event *ApiSessionEvent) String() string { diff --git a/controller/event/circuits.go b/controller/event/circuits.go index 164f8d5d2..fdb090a26 100644 --- a/controller/event/circuits.go +++ b/controller/event/circuits.go @@ -24,7 +24,8 @@ import ( type CircuitEventType string const ( - CircuitEventsNs = "fabric.circuits" + CircuitEventNS = "circuit" + CircuitEventsVersion = 2 CircuitCreated CircuitEventType = "created" CircuitUpdated CircuitEventType = "pathUpdated" @@ -34,15 +35,32 @@ const ( var CircuitEventTypes = []CircuitEventType{CircuitCreated, CircuitUpdated, CircuitDeleted, CircuitFailed} +// A CircuitPath encapsulates information about the circuit's path. type CircuitPath struct { - Nodes []string `json:"nodes"` - Links []string `json:"links"` - IngressId string `json:"ingress_id"` - EgressId string `json:"egress_id"` - InitiatorLocalAddr string `json:"initiator_local_addr,omitempty"` - InitiatorRemoteAddr string `json:"initiator_remote_addr,omitempty"` - TerminatorLocalAddr string `json:"terminator_local_addr,omitempty"` - TerminatorRemoteAddr string `json:"terminator_remote_addr,omitempty"` + // The routers traversed in the path, going from initiating router to terminating router. + Nodes []string `json:"nodes"` + + // The links traversed in the path, going from initiating router to terminating router. + // If the initiating and terminating routers are the same, this will be empty. + Links []string `json:"links"` + + // The xgress identifier used on the initiating router. + IngressId string `json:"ingress_id"` + + // The xgress identifier used on the terminating router. + EgressId string `json:"egress_id"` + + // The local address of the connection to the first ziti component. + InitiatorLocalAddr string `json:"initiator_local_addr,omitempty"` + + // The remote address of the connection to the first ziti component. + InitiatorRemoteAddr string `json:"initiator_remote_addr,omitempty"` + + // The local address on the terminating ziti component. + TerminatorLocalAddr string `json:"terminator_local_addr,omitempty"` + + // The remote address on the terminating ziti component. + TerminatorRemoteAddr string `json:"terminator_remote_addr,omitempty"` } func (self *CircuitPath) String() string { @@ -60,24 +78,151 @@ func (self *CircuitPath) String() string { return out } +// A CircuitEvent is emitted for various stages of a circuit lifecycle. +// +// Note: In version prior to 1.4.0, the namespace was `fabric.circuits` +// +// Valid circuit event types are: +// - created +// - pathUpdated +// - deleted +// - failed +// +// Example: Circuit Created Event +// +// { +// "namespace": "circuit", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T14:09:13.603009739-05:00", +// "version": 2, +// "event_type": "created", +// "circuit_id": "rqrucElFe", +// "client_id": "cm614ve9h00fb1xj9dfww20le", +// "service_id": "3pjMOKY2icS8fkQ1lfHmrP", +// "terminator_id": "7JgrjMgEAis7V5q1wjvoB4", +// "instance_id": "", +// "creation_timespan": 1035941, +// "path": { +// "nodes": [ +// "5g2QrZxFcw" +// ], +// "links": null, +// "ingress_id": "8dN7", +// "egress_id": "ZnXG" +// }, +// "link_count": 0, +// "path_cost": 262140, +// "tags": { +// "clientId": "haxn9lB0uc", +// "hostId": "IahyE.5Scw", +// "serviceId": "3pjMOKY2icS8fkQ1lfHmrP" +// } +// } +// +// Example: Circuit Deleted Event +// +// { +// "namespace": "circuit", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T14:09:15.138049308-05:00", +// "version": 2, +// "event_type": "deleted", +// "circuit_id": "rqrucElFe", +// "client_id": "cm614ve9h00fb1xj9dfww20le", +// "service_id": "3pjMOKY2icS8fkQ1lfHmrP", +// "terminator_id": "7JgrjMgEAis7V5q1wjvoB4", +// "instance_id": "", +// "path": { +// "nodes": [ +// "5g2QrZxFcw" +// ], +// "links": null, +// "ingress_id": "8dN7", +// "egress_id": "ZnXG" +// }, +// "link_count": 0, +// "duration": 1535040544, +// "tags": { +// "clientId": "haxn9lB0uc", +// "hostId": "IahyE.5Scw", +// "serviceId": "3pjMOKY2icS8fkQ1lfHmrP" +// } +// } +// +// Example: Circuit Failed Event +// +// { +// "namespace": "circuit", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T14:09:30.563045771-05:00", +// "version": 2, +// "event_type": "failed", +// "circuit_id": "JvIucEQHe", +// "client_id": "cm614vrcd00fu1xj931hzepec", +// "service_id": "3pjMOKY2icS8fkQ1lfHmrP", +// "terminator_id": "", +// "instance_id": "", +// "creation_timespan": 20701, +// "path": { +// "nodes": null, +// "links": null, +// "ingress_id": "", +// "egress_id": "" +// }, +// "link_count": 0, +// "failure_cause": "NO_TERMINATORS", +// "tags": { +// "clientId": "haxn9lB0uc", +// "serviceId": "3pjMOKY2icS8fkQ1lfHmrP" +// } +// } type CircuitEvent struct { - Namespace string `json:"namespace"` - Version uint32 `json:"version"` - EventType CircuitEventType `json:"event_type"` - EventSrcId string `json:"event_src_id"` - CircuitId string `json:"circuit_id"` - Timestamp time.Time `json:"timestamp"` - ClientId string `json:"client_id"` - ServiceId string `json:"service_id"` - TerminatorId string `json:"terminator_id"` - InstanceId string `json:"instance_id"` - CreationTimespan *time.Duration `json:"creation_timespan,omitempty"` - Path CircuitPath `json:"path"` - LinkCount int `json:"link_count"` - Cost *uint32 `json:"path_cost,omitempty"` - FailureCause *string `json:"failure_cause,omitempty"` - Duration *time.Duration `json:"duration,omitempty"` - Tags map[string]string `json:"tags"` + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // The event format version. Currently 2. + Version uint32 `json:"version"` + + // The circuit event type. See above for valid circuit event types. + EventType CircuitEventType `json:"event_type"` + + // The circuit id. + CircuitId string `json:"circuit_id"` + + // Who the circuit was created for. Usually an edge session id. + ClientId string `json:"client_id"` + + // The id of the circuit's service. + ServiceId string `json:"service_id"` + + // The terminator the circuit is using. + TerminatorId string `json:"terminator_id"` + + // The instance id of the terminator. + InstanceId string `json:"instance_id"` + + // How long it look to create the circuit. + CreationTimespan *time.Duration `json:"creation_timespan,omitempty"` + + // The circuit's path. + Path CircuitPath `json:"path"` + + // How many links the circuit is using. + LinkCount int `json:"link_count"` + + // The circuit's cost at the time of creation or update. + Cost *uint32 `json:"path_cost,omitempty"` + + // The reason the circuit failed. Only populated for circuit failures. + FailureCause *string `json:"failure_cause,omitempty"` + + // How long the circuit has been up. Not populated for circuit creates. + Duration *time.Duration `json:"duration,omitempty"` + + // Contains circuit enrichment data. May contain information like the client and/or host + // identity ids. + Tags map[string]string `json:"tags"` } func (event *CircuitEvent) String() string { diff --git a/controller/event/cluster.go b/controller/event/cluster.go index b4cf256f3..96d86fdff 100644 --- a/controller/event/cluster.go +++ b/controller/event/cluster.go @@ -25,7 +25,7 @@ import ( type ClusterEventType string const ( - ClusterEventsNs = "cluster" + ClusterEventNS = "cluster" ClusterPeerConnected ClusterEventType = "peer.connected" ClusterPeerDisconnected ClusterEventType = "peer.disconnected" @@ -38,16 +38,31 @@ const ( ClusterStateReadWrite ClusterEventType = "state.rw" ) +// A ClusterPeer represents a controller which is a member of the cluster. type ClusterPeer struct { - Id string `json:"id,omitempty"` - Addr string `json:"addr,omitempty"` - Version string `json:"version,omitempty"` - ServerCert []*x509.Certificate `json:"-"` + // The controller id. + Id string `json:"id,omitempty"` + + // The address at which the controller can be reached. + Addr string `json:"addr,omitempty"` + + // The version of the controller. + Version string `json:"version,omitempty"` + + ServerCert []*x509.Certificate `json:"-"` + + // The set of api addresses presented by the controller. ApiAddresses map[string][]ApiAddress `json:"apiAddresses"` } +// An ApiAddress represents an endpoint on a controller. This may include things +// like REST management services and health checks. type ApiAddress struct { - Url string `json:"url"` + // The URL of the API endpoint. + Url string `json:"url"` + + // The version of the API endpoint. Endpoints are versioned independently of + // the controller version as these are expected to be stable over long periods. Version string `json:"version"` } @@ -55,14 +70,105 @@ func (self *ClusterPeer) String() string { return fmt.Sprintf("id=%v addr=%v version=%v", self.Id, self.Addr, self.Version) } +// A ClusterEvent marks a change to the controller HA cluster. +// ClusterEvents can be of the following types: +// - peer.connected +// - peer.disconnected +// - members.changed +// - leadership.gained +// - leadership.lost +// - state.has_leader +// - state.is_leaderless +// - state.ro +// - state.rw +// +// Example: Cluster Members Changed Event +// +// { +// "namespace": "cluster", +// "event_src_id": "ctrl1", +// "timestamp": "2025-01-17T13:41:25.817205826-05:00", +// "eventType": "members.changed", +// "index": 7, +// "peers": [ +// { +// "id": "ctrl1", +// "addr": "tls:localhost:6262", +// "apiAddresses": null +// }, +// { +// "id": "ctrl2", +// "addr": "tls:localhost:6363", +// "apiAddresses": null +// } +// ] +// } +// +// Example: Peer Connected Event +// +// { +// "namespace": "cluster", +// "event_src_id": "ctrl1", +// "timestamp": "2025-01-17T13:41:25.838625953-05:00", +// "eventType": "peer.connected", +// "peers": [ +// { +// "id": "ctrl2", +// "addr": "tls:localhost:6363", +// "version": "v0.0.0", +// "apiAddresses": { +// "edge-client": [ +// { +// "url": "https://127.0.0.1:1380/edge/client/v1", +// "version": "v1" +// } +// ], +// "edge-management": [ +// { +// "url": "https://127.0.0.1:1380/edge/management/v1", +// "version": "v1" +// } +// ], +// "edge-oidc": [ +// { +// "url": "https://127.0.0.1:1380/oidc", +// "version": "v1" +// } +// ], +// "fabric": [ +// { +// "url": "https://127.0.0.1:1380/fabric/v1", +// "version": "v1" +// } +// ], +// "health-checks": [ +// { +// "url": "https://127.0.0.1:1380/health-checks", +// "version": "v1" +// } +// ] +// } +// } +// ] +// } type ClusterEvent struct { - Namespace string `json:"namespace"` - EventType ClusterEventType `json:"eventType"` - EventSrcId string `json:"event_src_id"` - Timestamp time.Time `json:"timestamp"` - Index uint64 `json:"index,omitempty"` - Peers []*ClusterPeer `json:"peers,omitempty"` - LeaderId string `json:"leaderId,omitempty"` + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // The cluster event type. See above for set of valid types. + EventType ClusterEventType `json:"eventType"` + + // The raft index associated with the event. + Index uint64 `json:"index,omitempty"` + + // This field is populated with all peers when membership change events + // or leadership is gained. It is populated with the connecting peer for connect events and the + // disconnecting peer for disconnect events. For other types it is omitted. + Peers []*ClusterPeer `json:"peers,omitempty"` + + // The leader id. Only populated for state.has_leader events. + LeaderId string `json:"leaderId,omitempty"` } func (event *ClusterEvent) String() string { @@ -81,7 +187,7 @@ func (f ClusterEventHandlerF) AcceptClusterEvent(event *ClusterEvent) { func NewClusterEvent(eventType ClusterEventType) *ClusterEvent { return &ClusterEvent{ - Namespace: ClusterEventsNs, + Namespace: ClusterEventNS, EventType: eventType, Timestamp: time.Now(), } diff --git a/controller/event/connect.go b/controller/event/connect.go index 7ae309267..c8bf22f4c 100644 --- a/controller/event/connect.go +++ b/controller/event/connect.go @@ -33,16 +33,77 @@ const ( ConnectDestinationRouter ConnectDestination = "router" ) +// A ConnectEvent is emitted when a connection is made to a ziti controller or router. +// +// Valid source types are: +// - router - router connecting to a controller or another router) +// - peer - controller connecting to another controller +// - identity - identity connecting to a router or controller +// +// Valid destination types are: +// - ctrl - connection is being made to a controller +// - router - connection is being made to a router +// +// Example: Identity Connected to Controller Event +// +// { +// "namespace": "connect", +// "event_src_id": "ctrl_client", +// "timestamp": "2024-10-02T12:17:39.501821249-04:00" +// "src_type": "identity", +// "src_id": "ji2Rt8KJ4", +// "src_addr": "127.0.0.1:59336", +// "dst_id": "ctrl_client", +// "dst_addr": "localhost:1280/edge/management/v1/edge-routers/2L7NeVuGBU", +// } +// +// Example: Router Connected to Controller Event +// +// { +// "namespace": "connect", +// "event_src_id": "ctrl_client", +// "timestamp": "2024-10-02T12:17:40.529865849-04:00" +// "src_type": "router", +// "src_id": "2L7NeVuGBU", +// "src_addr": "127.0.0.1:42702", +// "dst_id": "ctrl_client", +// "dst_addr": "127.0.0.1:6262", +// } +// +// Example: Controller Connected to Controller Event +// +// { +// "namespace": "connect", +// "event_src_id": "ctrl1", +// "timestamp": "2024-10-02T12:37:04.490859197-04:00" +// "src_type": "peer", +// "src_id": "ctrl2", +// "src_addr": "127.0.0.1:40056", +// "dst_id": "ctrl1", +// "dst_addr": "127.0.0.1:6262", +// } type ConnectEvent struct { - Namespace string `json:"namespace"` - EventSrcId string `json:"event_src_id"` - SrcType ConnectSource `json:"src_type"` - DstType ConnectDestination `json:"dst_type"` - SrcId string `json:"src_id"` - SrcAddr string `json:"src_addr"` - DstId string `json:"dst_id"` - DstAddr string `json:"dst_addr"` - Timestamp time.Time `json:"timestamp"` + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // The type of software initiating the connection. + SrcType ConnectSource `json:"src_type"` + + // The type of software receiving the connection. + DstType ConnectDestination `json:"dst_type"` + + // The id of the initiating component. + SrcId string `json:"src_id"` + + // The source address of the connection. + SrcAddr string `json:"src_addr"` + + // The id of the receiving component. + DstId string `json:"dst_id"` + + // The destination address of the connection. + DstAddr string `json:"dst_addr"` } type ConnectEventHandler interface { diff --git a/controller/event/dispatcher.go b/controller/event/dispatcher.go index 42bad05d0..8ed9ed62d 100644 --- a/controller/event/dispatcher.go +++ b/controller/event/dispatcher.go @@ -27,7 +27,7 @@ import ( type TypeRegistrar interface { // Register takes a handler, which may implement multiple event handler // interfaces, and configure it using the configuration map provided - Register(handler interface{}, config map[string]interface{}) error + Register(eventType string, handler interface{}, config map[string]interface{}) error // Unregister will remove give handler, if implements the interface for // this event type and is registered to receive events of this type @@ -36,7 +36,7 @@ type TypeRegistrar interface { // A RegistrationHandler can take a handler, which may implement multiple event handler // interfaces, and configure it using the configuration map provided -type RegistrationHandler func(handler interface{}, config map[string]interface{}) error +type RegistrationHandler func(eventType string, handler interface{}, config map[string]interface{}) error // A UnregistrationHandler will remove give handler, if implements the interface for // this event type and is registered to receive events of this type diff --git a/controller/event/dispatcher_mock.go b/controller/event/dispatcher_mock.go index 14516cef3..d7066d7d5 100644 --- a/controller/event/dispatcher_mock.go +++ b/controller/event/dispatcher_mock.go @@ -127,7 +127,7 @@ func (d DispatcherMock) AcceptServiceEvent(*ServiceEvent) {} func (d DispatcherMock) AcceptTerminatorEvent(*TerminatorEvent) {} -func (d DispatcherMock) AcceptUsageEvent(*UsageEvent) {} +func (d DispatcherMock) AcceptUsageEvent(*UsageEventV2) {} func (d DispatcherMock) AddClusterEventHandler(ClusterEventHandler) {} diff --git a/controller/event/entity_change.go b/controller/event/entity_change.go index e87298553..3fcb817ef 100644 --- a/controller/event/entity_change.go +++ b/controller/event/entity_change.go @@ -16,14 +16,12 @@ package event -import ( - "time" -) +import "time" type EntityChangeEventType string const ( - EntityChangeEventsNs = "entityChange" + EntityChangeEventNS = "entityChange" EntityChangeTypeEntityCreated EntityChangeEventType = "created" EntityChangeTypeEntityUpdated EntityChangeEventType = "updated" @@ -31,19 +29,102 @@ const ( EntityChangeTypeCommitted EntityChangeEventType = "committed" ) +// An EntityChangeEvent is emitted when a entity in the data model changes. +// +// Valid entity change event types are: +// - created +// - updated +// - deleted +// - committed +// +// Entity change events happen in two parts. First the created,updated or deleted event is +// emitted. This happens inside a transaction. After the transaction is committed, a +// committed event with the same event id is generated. This lets the event consumer know +// that the event is finalized. All changes within a transaction will share the same event +// id. If a new event id is seen before the previous event is committed, that indicates +// that the transaction was rolled back. +// +// Example: Service Created Event +// +// { +// "namespace": "entityChange", +// "event_src_id": "ctrl1", +// "timestamp": "2023-05-11T21:41:47.128588927-04:00", +// "eventId": "326faf6c-8123-42ae-9ed8-6fd9560eb567", +// "eventType": "created", +// "metadata": { +// "author": { +// "type": "identity", +// "id": "ji2Rt8KJ4", +// "name": "Default Admin" +// }, +// "source": { +// "type": "rest", +// "auth": "edge", +// "localAddr": "localhost:1280", +// "remoteAddr": "127.0.0.1:37578", +// "method": "POST" +// }, +// "version": "v0.0.0" +// }, +// "entityType": "services", +// "isParentEvent": false, +// "initialState": null, +// "finalState": { +// "id": "6S0bCGWb6yrAutXwSQaLiv", +// "createdAt": "2023-05-12T01:41:47.128138887Z", +// "updatedAt": "2023-05-12T01:41:47.128138887Z", +// "tags": {}, +// "isSystem": false, +// "name": "test", +// "terminatorStrategy": "smartrouting", +// "roleAttributes": [ +// "goodbye", +// "hello" +// ], +// "configs": null, +// "encryptionRequired": true +// } +// } +// +// Example: Change Committed Event +// +// { +// "namespace": "entityChange", +// "event_src_id": "ctrl1", +// "timestamp": "2023-05-11T21:41:47.129235443-04:00" +// "eventId": "326faf6c-8123-42ae-9ed8-6fd9560eb567", +// "eventType": "committed", +// } type EntityChangeEvent struct { - Namespace string `json:"namespace"` - EventId string `json:"eventId"` - EventType EntityChangeEventType `json:"eventType"` - EventSrcId string `json:"event_src_id"` - Timestamp time.Time `json:"timestamp"` - Metadata map[string]any `json:"metadata,omitempty"` - EntityType string `json:"entityType,omitempty"` - IsParentEvent *bool `json:"isParentEvent,omitempty"` - InitialState any `json:"initialState,omitempty"` - FinalState any `json:"finalState,omitempty"` - PropagateIndicator bool `json:"-"` - IsRecoveryEvent bool `json:"-"` + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // An identifier shared by all changes in a given transaction. + EventId string `json:"eventId"` + + // The entity change event type. See above for valid values. + EventType EntityChangeEventType `json:"eventType"` + + // Metadata will include information about who initiated the change and how. + Metadata map[string]any `json:"metadata,omitempty"` + + // The type of the entity being changed. + EntityType string `json:"entityType,omitempty"` + + // True if the entity type has a parent type (like services and routers), and + // this event only contains the parent data. + IsParentEvent *bool `json:"isParentEvent,omitempty"` + + // The state before the change. Will be empty for creates. + InitialState any `json:"initialState,omitempty"` + + // The state after the change. Will be empty for deletes. + FinalState any `json:"finalState,omitempty"` + + PropagateIndicator bool `json:"-"` + IsRecoveryEvent bool `json:"-"` } type EntityChangeEventHandler interface { diff --git a/controller/event/entity_counts.go b/controller/event/entity_counts.go index 6e7db6d9d..b057b8cca 100644 --- a/controller/event/entity_counts.go +++ b/controller/event/entity_counts.go @@ -21,14 +21,60 @@ import ( "time" ) -const EntityCountEventNS = "edge.entityCounts" +const EntityCountEventNS = "entityCount" +// A EntityCountEvent is emitted on a configurable interval. It contains +// the entity counts for all the entity types in the data model. +// +// Note: In version prior to 1.4.0, the namespace was `edge.entityCounts` +// +// Example: Entity Count Event +// +// { +// "namespace": "entityCount", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-16T20:34:47.281325752-05:00", +// "counts": { +// "apiSessionCertificates": 0, +// "apiSessions": 1, +// "authPolicies": 1, +// "authenticators": 5, +// "cas": 0, +// "configTypes": 6, +// "configs": 4, +// "controllers": 0, +// "edgeRouterPolicies": 7, +// "enrollments": 11, +// "eventualEvents": 0, +// "externalJwtSigners": 0, +// "identities": 16, +// "identityTypes": 2, +// "mfas": 0, +// "postureCheckTypes": 5, +// "postureChecks": 0, +// "revocations": 0, +// "routers": 7, +// "routers.edge": 7, +// "serviceEdgeRouterPolicies": 1, +// "servicePolicies": 4, +// "services": 2, +// "services.edge": 2, +// "sessions": 0, +// "terminators": 0 +// }, +// "error": "" +// } type EntityCountEvent struct { - Namespace string `json:"namespace"` - EventSrcId string `json:"event_src_id"` - Timestamp time.Time `json:"timestamp"` - Counts map[string]int64 `json:"counts"` - Error string `json:"error"` + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // Map of entity type to the number of entities of that type that currently exist in the data model. + Counts map[string]int64 `json:"counts"` + + // If an error is encountered while collecting entity counts, + // it will be reported here. + Error string `json:"error,omitempty"` } func (event *EntityCountEvent) String() string { @@ -39,3 +85,8 @@ func (event *EntityCountEvent) String() string { type EntityCountEventHandler interface { AcceptEntityCountEvent(event *EntityCountEvent) } + +type EntityCountEventHandlerWrapper interface { + EntityCountEventHandler + IsWrapping(value EntityCountEventHandler) bool +} diff --git a/controller/event/links.go b/controller/event/links.go index a6c3083f2..173a8c3ff 100644 --- a/controller/event/links.go +++ b/controller/event/links.go @@ -24,34 +24,123 @@ import ( type LinkEventType string const ( - LinkEventsNs = "fabric.links" + LinkEventNS = "link" - LinkDialed LinkEventType = "dialed" LinkFault LinkEventType = "fault" LinkDuplicate LinkEventType = "duplicate" - LinkConnected LinkEventType = "connected" LinkFromRouterNew LinkEventType = "routerLinkNew" LinkFromRouterKnown LinkEventType = "routerLinkKnown" LinkFromRouterDisconnectedDest LinkEventType = "routerLinkDisconnectedDest" + + // LinkDialed is only used when legacy controller link management is enabled + LinkDialed LinkEventType = "dialed" + + // LinkConnected is only used when legacy controller link management is enabled + LinkConnected LinkEventType = "connected" ) +// A LinkConnection describes a physical connection that forms a link. A Link may be made +// up of multiple LinkConnections. +// +// Link ids currently have three valid values: +// - single - meaning the link has a single connection +// - payload - a connection used only for payloads +// - ack - a connection used only for acks type LinkConnection struct { - Id string `json:"id"` - LocalAddr string `json:"local_addr"` + // The connection identifier. + Id string `json:"id"` + + // The connection address on dialing router side. + LocalAddr string `json:"local_addr"` + + // The connection address on accepting router side. RemoteAddr string `json:"remote_addr"` } +// A LinkEvent will be emitted for various link lifecycle events. +// +// Note: In version prior to 1.4.0, the namespace was `fabric.links` +// +// Valid values for link event types are: +// - routerLinkNew - A router established a new link, or is syncing the controller with link information after a restart/reconnect. +// - fault - a link has closed due to a link fault +// - duplicate - a link was removed because it was a duplicate. Happens when routers dial each other at the same time. +// - routerLinkKnown - A router informed the controller of a link, but the controller already knew about it. +// - routerLinkDisconnectedDest - A router created a link, but the destination router isn't currently connected to the controller. +// - dialed - Deprecated. Happens when a link listener has been dialed. Only relevant if using legacy controller managed links. +// - connected - Deprecated. Happens when a link is connected. Only generated when using legacy controller managed links. +// +// Example: Router Link New Event +// +// { +// "namespace": "fabric.links", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-23T16:19:26.879417243-05:00", +// "event_type": "routerLinkNew", +// "link_id": "2Wea35TgSfWbh3P1wmEyLd", +// "src_router_id": "5g2QrZxFcw", +// "dst_router_id": "E0IAgBVdmn", +// "protocol": "dtls", +// "dial_address": "dtls:127.0.0.1:4024", +// "cost": 1 +// } +// +// Example: Link Faulted Event +// +// { +// "namespace": "link", +// "event_src_id": "ctrl1", +// "timestamp": "2022-07-15T18:10:19.973867809-04:00", +// "event_type": "fault", +// "link_id": "6slUYCqOB85YTfdiD8I5pl", +// "src_router_id": "YPpTEd8JP", +// "dst_router_id": "niY.XmLArx", +// "protocol": "tls", +// "dial_address": "tls:127.0.0.1:4023", +// "cost": 1 +// } +// +// Example: Router Link Known Event +// +// { +// "namespace": "link", +// "event_src_id": "ctrl1", +// "timestamp": "2022-07-15T18:10:19.974177638-04:00", +// "event_type": "routerLinkKnown", +// "link_id": "47kGIApCXI29VQoCA1xXWI", +// "src_router_id": "niY.XmLArx", +// "dst_router_id": "YPpTEd8JP", +// "protocol": "tls", +// "dial_address": "tls:127.0.0.1:4024", +// "cost": 1 +// } type LinkEvent struct { - Namespace string `json:"namespace"` - EventType LinkEventType `json:"event_type"` - EventSrcId string `json:"event_src_id"` - Timestamp time.Time `json:"timestamp"` - LinkId string `json:"link_id"` - SrcRouterId string `json:"src_router_id"` - DstRouterId string `json:"dst_router_id"` - Protocol string `json:"protocol"` - DialAddress string `json:"dial_address"` - Cost int32 `json:"cost"` + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // The link event type. See above for valid values. + EventType LinkEventType `json:"event_type"` + + // The link identifier. + LinkId string `json:"link_id"` + + // The id of the dialing router. + SrcRouterId string `json:"src_router_id"` + + // The id of the accepting router. + DstRouterId string `json:"dst_router_id"` + + // The link protocol. + Protocol string `json:"protocol"` + + // The address dialed. + DialAddress string `json:"dial_address"` + + // The link cost. + Cost int32 `json:"cost"` + + // The connections making up the link. Connections []*LinkConnection `json:"connections,omitempty"` } @@ -63,3 +152,8 @@ func (event *LinkEvent) String() string { type LinkEventHandler interface { AcceptLinkEvent(event *LinkEvent) } + +type LinkEventHandlerWrapper interface { + LinkEventHandler + IsWrapping(value LinkEventHandler) bool +} diff --git a/controller/event/metrics.go b/controller/event/metrics.go index 6f3d20827..63f0bc902 100644 --- a/controller/event/metrics.go +++ b/controller/event/metrics.go @@ -22,22 +22,100 @@ import ( ) const ( - MetricsEventsNs = "metrics" + MetricsEventNS = "metrics" MetricsEventsVersion = 3 ) +// A MetricsEvent represents a point in time snapshot of a metric from a controller or router. +// +// Valid values for metric type are: +// - intValue +// - floatValue +// - meter +// - histogram +// - timer +// +// Example: The service policy enforcer deletes meter +// +// { +// "namespace": "metrics", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T02:45:21.890823877Z", +// "metric_type": "meter", +// "source_id": "ctrl_client", +// "version": 3, +// "metric": "service.policy.enforcer.run.deletes", +// "metrics": { +// "count": 0, +// "m15_rate": 0, +// "m1_rate": 0, +// "m5_rate": 0, +// "mean_rate": 0 +// }, +// "source_event_id": "c41fbf8d-cd14-4b8b-ae7b-0f0e93e2021d" +// } +// +// Example: The api session create timer +// +// { +// "namespace": "metrics", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T02:45:21.890823877Z", +// "metric_type": "timer", +// "source_id": "ctrl_client", +// "version": 3, +// "metric": "api-session.create", +// "metrics": { +// "count": 1, +// "m15_rate": 0.0006217645754885097, +// "m1_rate": 0.000002754186169011774, +// "m5_rate": 0.0005841004612303224, +// "max": 7598246, +// "mean": 7598246, +// "mean_rate": 0.0018542395091967903, +// "min": 7598246, +// "p50": 7598246, +// "p75": 7598246, +// "p95": 7598246, +// "p99": 7598246, +// "p999": 7598246, +// "p9999": 7598246, +// "std_dev": 0, +// "variance": 0 +// }, +// "source_event_id": "c41fbf8d-cd14-4b8b-ae7b-0f0e93e2021d" +// } type MetricsEvent struct { - MetricType string `json:"metric_type" mapstructure:"metric_type"` - Namespace string `json:"namespace"` - EventSrcId string `json:"event_src_id"` - SourceAppId string `json:"source_id" mapstructure:"source_id"` - SourceEntityId string `json:"source_entity_id,omitempty" mapstructure:"source_entity_id,omitempty"` - Version uint32 `json:"version"` - Timestamp time.Time `json:"timestamp"` - Metric string `json:"metric"` - Metrics map[string]interface{} `json:"metrics"` - Tags map[string]string `json:"tags,omitempty"` - SourceEventId string `json:"source_event_id" mapstructure:"source_event_id"` + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // The type of metrics event. See above for valid values. + MetricType string `json:"metric_type" mapstructure:"metric_type"` + + // The id of the router or controller which emitted the metric. + SourceAppId string `json:"source_id" mapstructure:"source_id"` + + // If this metric is associated with an entity, such as link, this will + // contain the entity id. + SourceEntityId string `json:"source_entity_id,omitempty" mapstructure:"source_entity_id,omitempty"` + + // The version of the metrics format. The current version is 3. + Version uint32 `json:"version"` + + // The name of the metric. + Metric string `json:"metric"` + + // The values that copmrise the metrics. + Metrics map[string]any `json:"metrics"` + + // Some metrics include additional metadata. + // For example link metrics may include source and destination. + Tags map[string]string `json:"tags,omitempty"` + + // Events will often be collected together on a schedule. This is a correlation id + // so that events can be tied together with other events emitted at the same time. + SourceEventId string `json:"source_event_id" mapstructure:"source_event_id"` } type MetricsEventHandler interface { diff --git a/controller/event/routers.go b/controller/event/routers.go index 21ca24e48..587b39766 100644 --- a/controller/event/routers.go +++ b/controller/event/routers.go @@ -24,19 +24,55 @@ import ( type RouterEventType string const ( - RouterEventsNs = "fabric.routers" + RouterEventNS = "router" RouterOnline RouterEventType = "router-online" RouterOffline RouterEventType = "router-offline" ) +// A RouterEvent is generated when a router comes online or goes offline. +// +// Note: In version prior to 1.4.0, the namespace was `fabric.routers` +// +// Valid values for router event type are: +// - router-online +// - router-offline +// +// Example: Router online event +// +// { +// "namespace": "router", +// "event_src_id": "ctrl1", +// "timestamp": "2021-04-22T11:26:31.99299884-04:00", +// "event_type": "router-online", +// "router_id": "JAoyjafljO", +// "router_online": true +// } +// +// Example: Router offline event +// +// { +// "namespace": "router", +// "event_src_id": "ctrl1", +// "timestamp": "2021-04-22T11:26:41.335114358-04:00", +// "event_type": "router-offline", +// "router_id": "JAoyjafljO", +// "router_online": false +// } type RouterEvent struct { - Namespace string `json:"namespace"` - EventType RouterEventType `json:"event_type"` - EventSrcId string `json:"event_src_id"` - Timestamp time.Time `json:"timestamp"` - RouterId string `json:"router_id"` - RouterOnline bool `json:"router_online"` + Namespace string `json:"namespace"` + Timestamp time.Time `json:"timestamp"` + EventSrcId string `json:"event_src_id"` + + // The router event type. + EventType RouterEventType `json:"event_type"` + + // The router identifier. + RouterId string `json:"router_id"` + + // Indicates whether the router is online or not. Redundant given + // the router event type. Should likely be removed. + RouterOnline bool `json:"router_online"` } func (event *RouterEvent) String() string { @@ -47,3 +83,8 @@ func (event *RouterEvent) String() string { type RouterEventHandler interface { AcceptRouterEvent(event *RouterEvent) } + +type RouterEventHandlerWrapper interface { + RouterEventHandler + IsWrapping(value RouterEventHandler) bool +} diff --git a/controller/event/sdk.go b/controller/event/sdk.go index 669a6ba96..9d4934805 100644 --- a/controller/event/sdk.go +++ b/controller/event/sdk.go @@ -24,19 +24,59 @@ import ( type SdkEventType string const ( - SdkEventsNs = "sdk" + SdkEventNS = "sdk" SdkOnline SdkEventType = "sdk-online" SdkOffline SdkEventType = "sdk-offline" SdkStatusUnknown SdkEventType = "sdk-status-unknown" ) +// An SdkEvent is emitted when an sdk's connectivity to routers changes. +// +// Valid values for sdk event type are: +// - sdk-online - identity is online +// - sdk-offline - identity is offline +// - sdk-status-unknown - status is unknown because the routers that reported the identity online are not connected to the controller +// +// Example: SDK identity is online +// +// { +// "namespace": "sdk", +// "event_src_id": "ctrl1", +// "event_type" : "sdk-online", +// "identity_id": "ji2Rt8KJ4", +// "timestamp": "2024-10-02T12:17:39.501821249-04:00" +// } +// +// Example: SDK identity online status is unknown +// +// { +// "namespace": "sdk", +// "event_src_id": "ctrl1", +// "event_type" : "sdk-status-unknown", +// "identity_id": "ji2Rt8KJ4", +// "timestamp": "2024-10-02T12:17:40.501821249-04:00" +// } +// +// Example: SDK identit is offline +// +// { +// "namespace": "sdk", +// "event_src_id": "ctrl1", +// "event_type" : "sdk-offline", +// "identity_id": "ji2Rt8KJ4", +// "timestamp": "2024-10-02T12:17:41.501821249-04:00" +// } type SdkEvent struct { - Namespace string `json:"namespace"` - EventType SdkEventType `json:"event_type"` - EventSrcId string `json:"event_src_id"` - Timestamp time.Time `json:"timestamp"` - IdentityId string `json:"identity_id"` + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // The sdk event type. See above for valid values. + EventType SdkEventType `json:"event_type"` + + // The id of the identity whose connectivity state has changed. + IdentityId string `json:"identity_id"` } func (event *SdkEvent) String() string { diff --git a/controller/event/services.go b/controller/event/services.go index d6bbdc9a2..dc10781b3 100644 --- a/controller/event/services.go +++ b/controller/event/services.go @@ -16,22 +16,84 @@ package event -import "fmt" +import ( + "fmt" + "time" +) const ( - ServiceEventsNs = "services" + ServiceEventNS = "service" ) +// A ServiceEvent is emitted for service and terminator level metrics which are collected per some interval. +// +// Note: In version prior to 1.4.0, the config key was `services`, but the namespace was `service.events` +// +// Value values for the service event type are: +// - service.dial.success +// - service.dial.fail +// - service.dial.timeout +// - service.dial.error_other +// - service.dial.terminator.timeout +// - service.dial.terminator.connection_refused +// - service.dial.terminator.invalid +// - service.dial.terminator.misconfigured +// +// Example: Dial Success for a specific service and terminator +// +// { +// "namespace": "service", +// "event_src_id": "ctrl_client", +// "timestamp": "2024-10-02T12:17:40.501821249-04:00" +// "version": 2, +// "event_type": "service.dial.success", +// "terminator_id": "2xFBuwwzJzAXuw5lOPnDwr", +// "service_id": "3pjMOKY2icS8fkQ1lfHmrP", +// "count": 1, +// "interval_start_utc": 1737140460, +// "interval_length": 60 +// } +// +// Example: Dail failures or type 'Other' for a specific service +// +// { +// "namespace": "service", +// "event_src_id": "ctrl_client", +// "timestamp": "2024-10-02T12:17:40.501821249-04:00" +// "version": 2, +// "event_type": "service.dial.error_other", +// "terminator_id": "", +// "service_id": "3pjMOKY2icS8fkQ1lfHmrP", +// "count": 1, +// "interval_start_utc": 1737140580, +// "interval_length": 60 +// } type ServiceEvent struct { - Namespace string `json:"namespace"` - Version uint32 `json:"version"` - EventType string `json:"event_type"` - EventSrcId string `json:"event_src_id"` - ServiceId string `json:"service_id"` - TerminatorId string `json:"terminator_id"` - Count uint64 `json:"count"` - IntervalStartUTC int64 `json:"interval_start_utc"` - IntervalLength uint64 `json:"interval_length"` + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // The event version. The current version is 2. + Version uint32 `json:"version"` + + // The services event type. See above for valid values. + EventType string `json:"event_type"` + + // The terminator id, if this is representing a terminator specific metric. + TerminatorId string `json:"terminator_id"` + + // The service identifier. + ServiceId string `json:"service_id"` + + // The number of events that have happened in the given time interval + Count uint64 `json:"count"` + + // The start time of the interval. It is represented as Unix time, number of seconds + // since the beginning of the current epoch. + IntervalStartUTC int64 `json:"interval_start_utc"` + + // The interval length in seconds. + IntervalLength uint64 `json:"interval_length"` } func (event *ServiceEvent) String() string { @@ -42,3 +104,8 @@ func (event *ServiceEvent) String() string { type ServiceEventHandler interface { AcceptServiceEvent(event *ServiceEvent) } + +type ServiceEventHandlerWrapper interface { + ServiceEventHandler + IsWrapping(value ServiceEventHandler) bool +} diff --git a/controller/event/session.go b/controller/event/session.go index a63703f97..6911dcb38 100644 --- a/controller/event/session.go +++ b/controller/event/session.go @@ -21,21 +21,73 @@ import ( "time" ) +const SessionEventNS = "session" + const SessionEventTypeCreated = "created" const SessionEventTypeDeleted = "deleted" -const SessionEventNS = "edge.sessions" +// A SessionEvent is emitted when a session is created or deleted. +// +// Note: In version prior to 1.4.0, the namespace was `edge.sessions` +// +// Valid values for session type are: +// - created +// - deleted +// +// Example: Bind Session created event +// +// { +// "namespace": "session", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T12:29:53.204988284-05:00", +// "event_type": "created", +// "session_type": "Bind", +// "id": "cm611bn75000jdhj9s5xrwynr", +// "token": "4ed77b84-650e-4b4b-9fbb-2466c9c94abb", +// "api_session_id": "cm611bn6l000hdhj9urp9xlnw", +// "identity_id": "IahyE.5Scw", +// "service_id": "3pjMOKY2icS8fkQ1lfHmrP" +// } +// +// Example: Bind session deleted event +// +// { +// "namespace": "session", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T12:30:08.593650693-05:00", +// "event_type": "deleted", +// "session_type": "Bind", +// "id": "cm611bgxa0008dhj9k3yo3vox", +// "token": "f8a447a2-2bd0-4821-8142-27c1770d06ab", +// "api_session_id": "cm611bgwn0006dhj9m4cv1obf", +// "identity_id": "IahyE.5Scw", +// "service_id": "3pjMOKY2icS8fkQ1lfHmrP" +// } type SessionEvent struct { - Namespace string `json:"namespace"` - EventType string `json:"event_type"` - EventSrcId string `json:"event_src_id"` - SessionType string `json:"session_type"` - Id string `json:"id"` - Timestamp time.Time `json:"timestamp"` - Token string `json:"token"` - ApiSessionId string `json:"api_session_id"` - IdentityId string `json:"identity_id"` - ServiceId string `json:"service_id"` + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // The type of session event. See above for valid values. + EventType string `json:"event_type"` + + // The session type (dial or bind). + SessionType string `json:"session_type"` + + // The session identifier. + Id string `json:"id"` + + // The session token. + Token string `json:"token"` + + // The id of the api session used to create this session. + ApiSessionId string `json:"api_session_id"` + + // The if of the identity on whose behalf the session was created. + IdentityId string `json:"identity_id"` + + // The id of the service that this session is claiming access to. + ServiceId string `json:"service_id"` } func (event *SessionEvent) String() string { diff --git a/controller/event/terminators.go b/controller/event/terminators.go index dd2870379..97c0742c7 100644 --- a/controller/event/terminators.go +++ b/controller/event/terminators.go @@ -24,7 +24,7 @@ import ( type TerminatorEventType string const ( - TerminatorEventsNs = "fabric.terminators" + TerminatorEventNS = "terminator" TerminatorCreated TerminatorEventType = "created" TerminatorUpdated TerminatorEventType = "updated" @@ -33,23 +33,139 @@ const ( TerminatorRouterOffline TerminatorEventType = "router-offline" ) +// A TerminatorEvent is emitted at various points in the terminator lifecycle. +// +// Note: In version prior to 1.4.0, the namespace was `fabric.terminators` +// +// Valid values for terminator event types are: +// - created - Note: replaced by entity change events +// - updated - Note: replaced by entity changes events +// - deleted - Note: replaced by entity change events +// - router-online +// - router-offline +// +// Example: Terminator created event +// +// { +// "namespace": "terminator", +// "event_type": "created", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T12:35:33.691240129-05:00", +// "service_id": "3pjMOKY2icS8fkQ1lfHmrP", +// "terminator_id": "2c9DGllUFx2GIFWFF5g5FP", +// "router_id": "5g2QrZxFcw", +// "host_id": "IahyE.5Scw", +// "router_online": true, +// "precedence": "default", +// "static_cost": 0, +// "dynamic_cost": 0, +// "total_terminators": 1, +// "usable_default_terminators": 1, +// "usable_required_terminators": 0 +// } +// +// Example: Terminator router offline event +// +// { +// "namespace": "terminator", +// "event_type": "router-offline", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T12:35:41.120951142-05:00", +// "service_id": "3pjMOKY2icS8fkQ1lfHmrP", +// "terminator_id": "2c9DGllUFx2GIFWFF5g5FP", +// "router_id": "5g2QrZxFcw", +// "host_id": "IahyE.5Scw", +// "router_online": false, +// "precedence": "default", +// "static_cost": 0, +// "dynamic_cost": 0, +// "total_terminators": 1, +// "usable_default_terminators": 0, +// "usable_required_terminators": 0 +// } +// +// Example: Terminator router online event +// +// { +// "namespace": "terminator", +// "event_type": "router-online", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T12:35:42.438815052-05:00", +// "service_id": "3pjMOKY2icS8fkQ1lfHmrP", +// "terminator_id": "2c9DGllUFx2GIFWFF5g5FP", +// "router_id": "5g2QrZxFcw", +// "host_id": "IahyE.5Scw", +// "router_online": true, +// "precedence": "default", +// "static_cost": 0, +// "dynamic_cost": 0, +// "total_terminators": 1, +// "usable_default_terminators": 1, +// "usable_required_terminators": 0 +// } +// +// Example: Terminator Deleted Event +// +// { +// "namespace": "terminator", +// "event_type": "deleted", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T12:35:42.448238469-05:00", +// "service_id": "3pjMOKY2icS8fkQ1lfHmrP", +// "terminator_id": "2c9DGllUFx2GIFWFF5g5FP", +// "router_id": "5g2QrZxFcw", +// "host_id": "IahyE.5Scw", +// "router_online": true, +// "precedence": "default", +// "static_cost": 0, +// "dynamic_cost": 0, +// "total_terminators": 0, +// "usable_default_terminators": 0, +// "usable_required_terminators": 0 +// } type TerminatorEvent struct { - Namespace string `json:"namespace"` - EventType TerminatorEventType `json:"event_type"` - EventSrcId string `json:"event_src_id"` - Timestamp time.Time `json:"timestamp"` - ServiceId string `json:"service_id"` - TerminatorId string `json:"terminator_id"` - RouterId string `json:"router_id"` - HostId string `json:"host_id"` - RouterOnline bool `json:"router_online"` - Precedence string `json:"precedence"` - StaticCost uint16 `json:"static_cost"` - DynamicCost uint16 `json:"dynamic_cost"` - TotalTerminators int `json:"total_terminators"` - UsableDefaultTerminators int `json:"usable_default_terminators"` - UsableRequiredTerminators int `json:"usable_required_terminators"` - PropagateIndicator bool `json:"-"` + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // The type of terminator event. For valid values see above. + EventType TerminatorEventType `json:"event_type"` + + // The id of the service that this terminator belongs to. + ServiceId string `json:"service_id"` + + // The terminator's identifier. + TerminatorId string `json:"terminator_id"` + + // The id of router that the terminator lives on. + RouterId string `json:"router_id"` + + // Optional identifier indicating what is hosting the terminator. If hosted by the edge, will be an identity id. + HostId string `json:"host_id"` + + // Indicates if the terminator's router is online. + RouterOnline bool `json:"router_online"` + + // The terminator precedence. + Precedence string `json:"precedence"` + + // The terminator's static cost. + StaticCost uint16 `json:"static_cost"` + + // The terminator's dynamic cost (usually based on the number of circuits currently on the terminator). + DynamicCost uint16 `json:"dynamic_cost"` + + // The total number of terminators that the service has. + TotalTerminators int `json:"total_terminators"` + + // The number of online terminators with a default precedence for the service. + UsableDefaultTerminators int `json:"usable_default_terminators"` + + // The number of online terminators with a required precedence for the service. + UsableRequiredTerminators int `json:"usable_required_terminators"` + + // For internal use. + PropagateIndicator bool `json:"-"` } func (event *TerminatorEvent) IsModelEvent() bool { diff --git a/controller/event/usage.go b/controller/event/usage.go index 1fded0b39..496ec1c00 100644 --- a/controller/event/usage.go +++ b/controller/event/usage.go @@ -1,44 +1,185 @@ package event -import "fmt" +import ( + "fmt" + "time" +) const ( - UsageEventsNs = "fabric.usage" + UsageEventNS = "usage" UsageEventsVersion = 2 ) -type UsageEvent struct { - Namespace string `json:"namespace"` - Version uint32 `json:"version"` - EventType string `json:"event_type"` - EventSrcId string `json:"event_src_id"` - SourceId string `json:"source_id"` - CircuitId string `json:"circuit_id"` - Usage uint64 `json:"usage"` - IntervalStartUTC int64 `json:"interval_start_utc"` - IntervalLength uint64 `json:"interval_length"` - Tags map[string]string `json:"tags"` +// A UsageEventV2 is emitted for service usage interval metrics in the v2 format. +// +// Note: In version prior to 1.4.0, the namespace was `fabric.usage` +// +// Valid values for the usage event v2 type are: +// - usage.ingress.rx - A read from an external connection to an initiating router +// - usage.ingress.tx - A write to an external connection from an initiating router +// - usage.egress.rx - A read from an external connection to an egress router +// - usage.egress.tx - A write to an external connection from an egress router +// - usage.fabric.rx - A read from a fabric link to a router +// - usage.fabric.tx - A write to a fabric link from a router +// +// Example: Ingress Data Received Usage Event +// +// { +// "namespace": "usage", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T12:35:42.448238469-05:00", +// "version": 2, +// "event_type": "usage.ingress.rx", +// "source_id": "5g2QrZxFcw", +// "circuit_id": "gZrStElHY", +// "usage": 47, +// "interval_start_utc": 1737145920, +// "interval_length": 60, +// "tags": { +// "clientId": "haxn9lB0uc", +// "hostId": "IahyE.5Scw", +// "serviceId": "3pjMOKY2icS8fkQ1lfHmrP" +// } +// } +// +// Example: Fabric Data Sent Usage Event +// +// { +// "namespace": "usage", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T12:35:42.448238469-05:00", +// "version": 2, +// "event_type": "usage.fabric.tx", +// "source_id": "5g2QrZxFcw", +// "circuit_id": "gZrStElHY", +// "usage": 47, +// "interval_start_utc": 1737145920, +// "interval_length": 60, +// "tags": null +// } +type UsageEventV2 struct { + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // The usage events version, which will always be 2 for this format. + Version uint32 `json:"version"` + + // The type of usage. For valid values see above. + EventType string `json:"event_type"` + + // The id of the router reporting the usage + SourceId string `json:"source_id"` + + // The circuit id whose usage is being reported. + CircuitId string `json:"circuit_id"` + + // The number of bytes of usage in the interval. + Usage uint64 `json:"usage"` + + // The start time of the interval. It is represented as Unix time, number of seconds + // since the beginning of the current epoch. + IntervalStartUTC int64 `json:"interval_start_utc"` + + // The interval length in seconds. + IntervalLength uint64 `json:"interval_length"` + + // Metadata, which may include things like the client and hosting identities and the service id. + Tags map[string]string `json:"tags"` } -func (event *UsageEvent) String() string { +func (event *UsageEventV2) String() string { return fmt.Sprintf("%v source=%v session=%v usage=%v intervalStart=%v intervalLength=%v", event.EventType, event.SourceId, event.CircuitId, event.Usage, event.IntervalStartUTC, event.IntervalLength) } type UsageEventHandler interface { - AcceptUsageEvent(event *UsageEvent) + AcceptUsageEvent(event *UsageEventV2) } +type UsageEventHandlerWrapper interface { + UsageEventHandler + IsWrapping(value UsageEventHandler) bool +} + +// A UsageEventV3 is emitted for service usage interval metrics in the v3 format. +// +// Note: In version prior to 1.4.0, the namespace was `fabric.usage` +// +// Valid values for the usage types are: +// - ingress.rx - A read from an external connection to an initiating router +// - ingress.tx - A write to an external connection from an initiating router +// - egress.rx - A read from an external connection to an egress router +// - egress.tx - A write to an external connection from an egress router +// - fabric.rx - A read from a fabric link to a router +// - fabric.tx - A write to a fabric link from a router +// +// Example: Untagged Usage Data Event +// +// { +// "namespace": "usage", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T12:35:42.448238469-05:00", +// "version": 3, +// "source_id": "5g2QrZxFcw", +// "circuit_id": "bcRu0EQFe", +// "usage": { +// "fabric.rx": 47, +// "fabric.tx": 47 +// }, +// "interval_start_utc": 1737146220, +// "interval_length": 60, +// "tags": null +// } +// +// Example: Tagged Usage Data Event +// +// { +// "namespace": "usage", +// "event_src_id": "ctrl_client", +// "timestamp": "2025-01-17T12:35:42.448238469-05:00", +// "version": 3, +// "source_id": "5g2QrZxFcw", +// "circuit_id": "bcRu0EQFe", +// "usage": { +// "ingress.rx": 47, +// "ingress.tx": 47 +// }, +// "interval_start_utc": 1737146220, +// "interval_length": 60, +// "tags": { +// "clientId": "haxn9lB0uc", +// "hostId": "IahyE.5Scw", +// "serviceId": "3pjMOKY2icS8fkQ1lfHmrP" +// } +// } type UsageEventV3 struct { - Namespace string `json:"namespace"` - Version uint32 `json:"version"` - EventSrcId string `json:"event_src_id"` - SourceId string `json:"source_id"` - CircuitId string `json:"circuit_id"` - Usage map[string]uint64 `json:"usage"` - IntervalStartUTC int64 `json:"interval_start_utc"` - IntervalLength uint64 `json:"interval_length"` - Tags map[string]string `json:"tags"` + Namespace string `json:"namespace"` + EventSrcId string `json:"event_src_id"` + Timestamp time.Time `json:"timestamp"` + + // The usage events version, which will always be 3 for this format. + Version uint32 `json:"version"` + + // The id of the router reporting the usage + SourceId string `json:"source_id"` + + // The circuit id whose usage is being reported. + CircuitId string `json:"circuit_id"` + + // Map of usage type to amount number of bytes used in the given interval. + // For valid values for usage type, see above. + Usage map[string]uint64 `json:"usage"` + + // The start time of the interval. It is represented as Unix time, number of seconds + // since the beginning of the current epoch. + IntervalStartUTC int64 `json:"interval_start_utc"` + + // The interval length in seconds. + IntervalLength uint64 `json:"interval_length"` + + // Metadata, which may include things like the client and hosting identities and the service id. + Tags map[string]string `json:"tags"` } func (event *UsageEventV3) String() string { diff --git a/controller/events/dispatcher.go b/controller/events/dispatcher.go index 3752d1635..f259edf06 100644 --- a/controller/events/dispatcher.go +++ b/controller/events/dispatcher.go @@ -35,8 +35,8 @@ type delegatingRegistrar struct { UnregistrationHandler event.UnregistrationHandler } -func (self *delegatingRegistrar) Register(handler interface{}, config map[string]interface{}) error { - return self.RegistrationHandler(handler, config) +func (self *delegatingRegistrar) Register(eventType string, handler interface{}, config map[string]interface{}) error { + return self.RegistrationHandler(eventType, handler, config) } func (self *delegatingRegistrar) Unregister(handler interface{}) { @@ -54,21 +54,30 @@ func NewDispatcher(closeNotify <-chan struct{}) *Dispatcher { } result.entityChangeEventsDispatcher.dispatcher = result - result.RegisterEventTypeFunctions(event.CircuitEventsNs, result.registerCircuitEventHandler, result.unregisterCircuitEventHandler) - result.RegisterEventTypeFunctions(event.EntityChangeEventsNs, result.registerEntityChangeEventHandler, result.unregisterEntityChangeEventHandler) - result.RegisterEventTypeFunctions(event.LinkEventsNs, result.registerLinkEventHandler, result.unregisterLinkEventHandler) - result.RegisterEventTypeFunctions(event.MetricsEventsNs, result.registerMetricsEventHandler, result.unregisterMetricsEventHandler) - result.RegisterEventTypeFunctions(event.RouterEventsNs, result.registerRouterEventHandler, result.unregisterRouterEventHandler) - result.RegisterEventTypeFunctions(event.ServiceEventsNs, result.registerServiceEventHandler, result.unregisterServiceEventHandler) - result.RegisterEventTypeFunctions(event.TerminatorEventsNs, result.registerTerminatorEventHandler, result.unregisterTerminatorEventHandler) - result.RegisterEventTypeFunctions(event.UsageEventsNs, result.registerUsageEventHandler, result.unregisterUsageEventHandler) - result.RegisterEventTypeFunctions(event.ClusterEventsNs, result.registerClusterEventHandler, result.unregisterClusterEventHandler) - result.RegisterEventTypeFunctions(event.ConnectEventNS, result.registerConnectEventHandler, result.unregisterConnectEventHandler) - result.RegisterEventTypeFunctions(event.SdkEventsNs, result.registerSdkEventHandler, result.unregisterSdkEventHandler) + result.RegisterEventTypeFunctions("edge.apiSessions", result.registerApiSessionEventHandler, result.unregisterApiSessionEventHandler) + result.RegisterEventTypeFunctions("fabric.circuits", result.registerCircuitEventHandler, result.unregisterCircuitEventHandler) + result.RegisterEventTypeFunctions("edge.entityCounts", result.registerEntityCountEventHandler, result.unregisterEntityCountEventHandler) + result.RegisterEventTypeFunctions("fabric.links", result.registerLinkEventHandler, result.unregisterLinkEventHandler) + result.RegisterEventTypeFunctions("fabric.routers", result.registerRouterEventHandler, result.unregisterRouterEventHandler) + result.RegisterEventTypeFunctions("services", result.registerServiceEventHandler, result.unregisterServiceEventHandler) // V2 Handler + result.RegisterEventTypeFunctions("edge.sessions", result.registerSessionEventHandler, result.unregisterSessionEventHandler) + result.RegisterEventTypeFunctions("fabric.terminators", result.registerTerminatorEventHandler, result.unregisterTerminatorEventHandler) + result.RegisterEventTypeFunctions("fabric.usage", result.registerUsageEventHandler, result.unregisterUsageEventHandler) result.RegisterEventTypeFunctions(event.ApiSessionEventNS, result.registerApiSessionEventHandler, result.unregisterApiSessionEventHandler) + result.RegisterEventTypeFunctions(event.CircuitEventNS, result.registerCircuitEventHandler, result.unregisterCircuitEventHandler) + result.RegisterEventTypeFunctions(event.ClusterEventNS, result.registerClusterEventHandler, result.unregisterClusterEventHandler) + result.RegisterEventTypeFunctions(event.ConnectEventNS, result.registerConnectEventHandler, result.unregisterConnectEventHandler) + result.RegisterEventTypeFunctions(event.EntityChangeEventNS, result.registerEntityChangeEventHandler, result.unregisterEntityChangeEventHandler) result.RegisterEventTypeFunctions(event.EntityCountEventNS, result.registerEntityCountEventHandler, result.unregisterEntityCountEventHandler) + result.RegisterEventTypeFunctions(event.LinkEventNS, result.registerLinkEventHandler, result.unregisterLinkEventHandler) + result.RegisterEventTypeFunctions(event.MetricsEventNS, result.registerMetricsEventHandler, result.unregisterMetricsEventHandler) + result.RegisterEventTypeFunctions(event.RouterEventNS, result.registerRouterEventHandler, result.unregisterRouterEventHandler) + result.RegisterEventTypeFunctions(event.ServiceEventNS, result.registerServiceEventHandler, result.unregisterServiceEventHandler) result.RegisterEventTypeFunctions(event.SessionEventNS, result.registerSessionEventHandler, result.unregisterSessionEventHandler) + result.RegisterEventTypeFunctions(event.SdkEventNS, result.registerSdkEventHandler, result.unregisterSdkEventHandler) + result.RegisterEventTypeFunctions(event.TerminatorEventNS, result.registerTerminatorEventHandler, result.unregisterTerminatorEventHandler) + result.RegisterEventTypeFunctions(event.UsageEventNS, result.registerUsageEventHandler, result.unregisterUsageEventHandler) result.RegisterFormatterFactory("json", event.FormatterFactoryF(func(sink event.FormattedEventSink) io.Closer { return NewJsonFormatter(16, sink) @@ -298,7 +307,7 @@ func (self *Dispatcher) ProcessSubscriptions(handler interface{}, subscriptions logger.WithField("type", sub.Type).Info("Processing subscriptions for event type") if registrar, ok := eventTypes[sub.Type]; ok { - if err := registrar.Register(handler, sub.Options); err != nil { + if err := registrar.Register(sub.Type, handler, sub.Options); err != nil { return err } logger.WithField("type", sub.Type).Info("Registration of event handler succeeded") diff --git a/controller/events/dispatcher_api_session.go b/controller/events/dispatcher_api_session.go index 6bcb33564..f1eb76720 100644 --- a/controller/events/dispatcher_api_session.go +++ b/controller/events/dispatcher_api_session.go @@ -86,13 +86,20 @@ func (self *Dispatcher) apiSessionDeleted(apiSession *db.ApiSession) { self.AcceptApiSessionEvent(evt) } -func (self *Dispatcher) registerApiSessionEventHandler(val interface{}, config map[string]interface{}) error { +func (self *Dispatcher) registerApiSessionEventHandler(eventType string, val interface{}, config map[string]interface{}) error { handler, ok := val.(event.ApiSessionEventHandler) if !ok { return errors.Errorf("type %v doesn't implement github.com/openziti/ziti/controller/event/ApiSessionEventHandler interface.", reflect.TypeOf(val)) } + if eventType != event.ApiSessionEventNS { + handler = &apiSessionEventOldNsAdapter{ + namespace: eventType, + wrapped: handler, + } + } + var includeList []string if includeVar, ok := config["include"]; ok { if includeStr, ok := includeVar.(string); ok { @@ -135,9 +142,9 @@ type apiSessionEventAdapter struct { includeList []string } -func (adapter *apiSessionEventAdapter) AcceptApiSessionEvent(event *event.ApiSessionEvent) { - if stringz.Contains(adapter.includeList, event.EventType) { - adapter.wrapped.AcceptApiSessionEvent(event) +func (adapter *apiSessionEventAdapter) AcceptApiSessionEvent(evt *event.ApiSessionEvent) { + if stringz.Contains(adapter.includeList, evt.EventType) { + adapter.wrapped.AcceptApiSessionEvent(evt) } } @@ -150,3 +157,24 @@ func (self *apiSessionEventAdapter) IsWrapping(value event.ApiSessionEventHandle } return false } + +type apiSessionEventOldNsAdapter struct { + namespace string + wrapped event.ApiSessionEventHandler +} + +func (self *apiSessionEventOldNsAdapter) AcceptApiSessionEvent(evt *event.ApiSessionEvent) { + nsEvent := *evt + nsEvent.Namespace = self.namespace + self.wrapped.AcceptApiSessionEvent(&nsEvent) +} + +func (self *apiSessionEventOldNsAdapter) IsWrapping(value event.ApiSessionEventHandler) bool { + if self.wrapped == value { + return true + } + if w, ok := self.wrapped.(event.ApiSessionEventHandlerWrapper); ok { + return w.IsWrapping(value) + } + return false +} diff --git a/controller/events/dispatcher_circuit.go b/controller/events/dispatcher_circuit.go index e8416568d..42e71e33f 100644 --- a/controller/events/dispatcher_circuit.go +++ b/controller/events/dispatcher_circuit.go @@ -47,13 +47,20 @@ func (self *Dispatcher) AcceptCircuitEvent(event *event.CircuitEvent) { }() } -func (self *Dispatcher) registerCircuitEventHandler(val interface{}, config map[string]interface{}) error { +func (self *Dispatcher) registerCircuitEventHandler(eventType string, val interface{}, config map[string]interface{}) error { handler, ok := val.(event.CircuitEventHandler) if !ok { return errors.Errorf("type %v doesn't implement github.com/openziti/ziti/controller/event/CircuitEventHandler interface.", reflect.TypeOf(val)) } + if eventType != event.CircuitEventNS { + handler = &circuitEventOldNsAdapter{ + namespace: eventType, + wrapped: handler, + } + } + var includeList []string if includeVar, ok := config["include"]; ok { if includeStr, ok := includeVar.(string); ok { @@ -115,8 +122,29 @@ func (self *filteredCircuitEventHandler) IsWrapping(value event.CircuitEventHand return false } -func (self *filteredCircuitEventHandler) AcceptCircuitEvent(event *event.CircuitEvent) { - if _, found := self.accepted[event.EventType]; found { - self.wrapped.AcceptCircuitEvent(event) +func (self *filteredCircuitEventHandler) AcceptCircuitEvent(evt *event.CircuitEvent) { + if _, found := self.accepted[evt.EventType]; found { + self.wrapped.AcceptCircuitEvent(evt) + } +} + +type circuitEventOldNsAdapter struct { + namespace string + wrapped event.CircuitEventHandler +} + +func (self *circuitEventOldNsAdapter) AcceptCircuitEvent(evt *event.CircuitEvent) { + nsEvent := *evt + nsEvent.Namespace = self.namespace + self.wrapped.AcceptCircuitEvent(&nsEvent) +} + +func (self *circuitEventOldNsAdapter) IsWrapping(value event.CircuitEventHandler) bool { + if self.wrapped == value { + return true } + if w, ok := self.wrapped.(event.CircuitEventHandlerWrapper); ok { + return w.IsWrapping(value) + } + return false } diff --git a/controller/events/dispatcher_cluster.go b/controller/events/dispatcher_cluster.go index 1c299546a..2b1948826 100644 --- a/controller/events/dispatcher_cluster.go +++ b/controller/events/dispatcher_cluster.go @@ -39,7 +39,7 @@ func (self *Dispatcher) AcceptClusterEvent(event *event.ClusterEvent) { }() } -func (self *Dispatcher) registerClusterEventHandler(val interface{}, _ map[string]interface{}) error { +func (self *Dispatcher) registerClusterEventHandler(_ string, val interface{}, _ map[string]interface{}) error { handler, ok := val.(event.ClusterEventHandler) if !ok { diff --git a/controller/events/dispatcher_connect.go b/controller/events/dispatcher_connect.go index 54519a3e3..82b37295d 100644 --- a/controller/events/dispatcher_connect.go +++ b/controller/events/dispatcher_connect.go @@ -45,7 +45,7 @@ func (self *Dispatcher) AcceptConnectEvent(evt *event.ConnectEvent) { } } -func (self *Dispatcher) registerConnectEventHandler(val interface{}, _ map[string]interface{}) error { +func (self *Dispatcher) registerConnectEventHandler(_ string, val interface{}, _ map[string]interface{}) error { handler, ok := val.(event.ConnectEventHandler) if !ok { diff --git a/controller/events/dispatcher_entity_change.go b/controller/events/dispatcher_entity_change.go index 32b661208..6b3825694 100644 --- a/controller/events/dispatcher_entity_change.go +++ b/controller/events/dispatcher_entity_change.go @@ -61,7 +61,7 @@ func (self *Dispatcher) AcceptEntityChangeEvent(event *event.EntityChangeEvent) } } -func (self *Dispatcher) registerEntityChangeEventHandler(val interface{}, options map[string]interface{}) error { +func (self *Dispatcher) registerEntityChangeEventHandler(_ string, val interface{}, options map[string]interface{}) error { handler, ok := val.(event.EntityChangeEventHandler) if !ok { @@ -232,7 +232,7 @@ func (self *entityChangeEventDispatcher) ProcessPreCommit(state boltz.UntypedEnt isParentEvent := state.IsParentEvent() evt := &event.EntityChangeEvent{ - Namespace: event.EntityChangeEventsNs, + Namespace: event.EntityChangeEventNS, EventId: state.GetEventId(), EventType: changeType, EventSrcId: self.dispatcher.ctrlId, @@ -265,7 +265,7 @@ func (self *entityChangeEventDispatcher) ProcessPreCommit(state boltz.UntypedEnt func (self *entityChangeEventDispatcher) emitRecoveryEvent(eventId string, entityType string) { evt := &event.EntityChangeEvent{ - Namespace: event.EntityChangeEventsNs, + Namespace: event.EntityChangeEventNS, EventType: event.EntityChangeTypeCommitted, EventSrcId: self.dispatcher.ctrlId, EventId: eventId, @@ -279,7 +279,7 @@ func (self *entityChangeEventDispatcher) emitRecoveryEvent(eventId string, entit func (self *entityChangeEventDispatcher) ProcessPostCommit(state boltz.UntypedEntityChangeState) { isParentEvent := state.IsParentEvent() evt := &event.EntityChangeEvent{ - Namespace: event.EntityChangeEventsNs, + Namespace: event.EntityChangeEventNS, EventId: state.GetEventId(), EventType: event.EntityChangeTypeCommitted, EventSrcId: self.dispatcher.ctrlId, diff --git a/controller/events/dispatcher_entity_counts.go b/controller/events/dispatcher_entity_counts.go index b7a08cfa5..e0eb64c62 100644 --- a/controller/events/dispatcher_entity_counts.go +++ b/controller/events/dispatcher_entity_counts.go @@ -34,11 +34,15 @@ func (self *Dispatcher) AddEntityCountEventHandler(handler event.EntityCountEven } func (self *Dispatcher) RemoveEntityCountEventHandler(handler event.EntityCountEventHandler) { - for _, state := range self.entityCountEventHandlers.Value() { - if state.handler == handler { - self.entityCountEventHandlers.Delete(state) + self.entityCountEventHandlers.DeleteIf(func(val *entityCountState) bool { + if val.handler == handler { + return true } - } + if w, ok := val.handler.(event.EntityCountEventHandlerWrapper); ok { + return w.IsWrapping(handler) + } + return false + }) } func (self *Dispatcher) initEntityEvents() { @@ -52,15 +56,15 @@ func (self *Dispatcher) generateEntityEvents() { for { select { case t := <-ticker.C: - var event *event.EntityCountEvent + var evt *event.EntityCountEvent leader := self.network.Dispatcher.IsLeaderOrLeaderless() for _, state := range self.entityCountEventHandlers.Value() { if !state.onlyLeaderEvents || leader { if t.After(state.nextRun) { - if event == nil { - event = self.generateEntityCountEvent() + if evt == nil { + evt = self.generateEntityCountEvent() } - state.handler.AcceptEntityCountEvent(event) + state.handler.AcceptEntityCountEvent(evt) state.nextRun = state.nextRun.Add(state.interval) } } @@ -72,7 +76,7 @@ func (self *Dispatcher) generateEntityEvents() { } func (self *Dispatcher) generateEntityCountEvent() *event.EntityCountEvent { - event := &event.EntityCountEvent{ + evt := &event.EntityCountEvent{ Namespace: event.EntityCountEventNS, EventSrcId: self.ctrlId, Timestamp: time.Now(), @@ -80,21 +84,28 @@ func (self *Dispatcher) generateEntityCountEvent() *event.EntityCountEvent { data, err := self.stores.GetEntityCounts(self.network.GetDb()) if err != nil { - event.Error = err.Error() + evt.Error = err.Error() } else { - event.Counts = data + evt.Counts = data } - return event + return evt } -func (self *Dispatcher) registerEntityCountEventHandler(val interface{}, config map[string]interface{}) error { +func (self *Dispatcher) registerEntityCountEventHandler(eventType string, val interface{}, config map[string]interface{}) error { handler, ok := val.(event.EntityCountEventHandler) if !ok { return errors.Errorf("type %v doesn't implement github.com/openziti/ziti/controller/events/EntityCountEventHandler interface.", reflect.TypeOf(val)) } + if eventType != event.EntityCountEventNS { + handler = &entityCountEventOldNsAdapter{ + namespace: eventType, + wrapped: handler, + } + } + interval := time.Minute * 5 if val, ok := config["interval"]; ok { @@ -137,3 +148,24 @@ type entityCountState struct { interval time.Duration nextRun time.Time } + +type entityCountEventOldNsAdapter struct { + namespace string + wrapped event.EntityCountEventHandler +} + +func (self *entityCountEventOldNsAdapter) AcceptEntityCountEvent(evt *event.EntityCountEvent) { + nsEvent := *evt + nsEvent.Namespace = self.namespace + self.wrapped.AcceptEntityCountEvent(&nsEvent) +} + +func (self *entityCountEventOldNsAdapter) IsWrapping(value event.EntityCountEventHandler) bool { + if self.wrapped == value { + return true + } + if w, ok := self.wrapped.(event.EntityCountEventHandlerWrapper); ok { + return w.IsWrapping(value) + } + return false +} diff --git a/controller/events/dispatcher_link.go b/controller/events/dispatcher_link.go index 6196af598..f5b11ca88 100644 --- a/controller/events/dispatcher_link.go +++ b/controller/events/dispatcher_link.go @@ -27,7 +27,15 @@ func (self *Dispatcher) AddLinkEventHandler(handler event.LinkEventHandler) { } func (self *Dispatcher) RemoveLinkEventHandler(handler event.LinkEventHandler) { - self.linkEventHandlers.Delete(handler) + self.linkEventHandlers.DeleteIf(func(val event.LinkEventHandler) bool { + if val == handler { + return true + } + if w, ok := val.(event.LinkEventHandlerWrapper); ok { + return w.IsWrapping(handler) + } + return false + }) } func (self *Dispatcher) AcceptLinkEvent(event *event.LinkEvent) { @@ -38,14 +46,20 @@ func (self *Dispatcher) AcceptLinkEvent(event *event.LinkEvent) { }() } -func (self *Dispatcher) registerLinkEventHandler(val interface{}, _ map[string]interface{}) error { +func (self *Dispatcher) registerLinkEventHandler(eventType string, val interface{}, _ map[string]interface{}) error { handler, ok := val.(event.LinkEventHandler) if !ok { return errors.Errorf("type %v doesn't implement github.com/openziti/ziti/controller/event/LinkEventHandler interface.", reflect.TypeOf(val)) } - self.linkEventHandlers.Append(handler) + if eventType != event.LinkEventNS { + handler = &linkEventOldNsAdapter{ + namespace: eventType, + wrapped: handler, + } + } + self.AddLinkEventHandler(handler) return nil } @@ -55,3 +69,24 @@ func (self *Dispatcher) unregisterLinkEventHandler(val interface{}) { self.RemoveLinkEventHandler(handler) } } + +type linkEventOldNsAdapter struct { + namespace string + wrapped event.LinkEventHandler +} + +func (self *linkEventOldNsAdapter) AcceptLinkEvent(evt *event.LinkEvent) { + nsEvent := *evt + nsEvent.Namespace = self.namespace + self.wrapped.AcceptLinkEvent(&nsEvent) +} + +func (self *linkEventOldNsAdapter) IsWrapping(value event.LinkEventHandler) bool { + if self.wrapped == value { + return true + } + if w, ok := self.wrapped.(event.LinkEventHandlerWrapper); ok { + return w.IsWrapping(value) + } + return false +} diff --git a/controller/events/dispatcher_metrics.go b/controller/events/dispatcher_metrics.go index b0dc16402..cc0290713 100644 --- a/controller/events/dispatcher_metrics.go +++ b/controller/events/dispatcher_metrics.go @@ -84,7 +84,7 @@ func (self *Dispatcher) relayMessagesToEventsUnfiltered(msg *metrics_pb.MetricsM } } -func (self *Dispatcher) registerMetricsEventHandler(val interface{}, config map[string]interface{}) error { +func (self *Dispatcher) registerMetricsEventHandler(_ string, val interface{}, config map[string]interface{}) error { handler, ok := val.(event.MetricsEventHandler) if !ok { return errors.Errorf("type %v doesn't implement github.com/openziti/ziti/controller/event/MetricsEventHandler interface.", reflect.TypeOf(val)) @@ -134,7 +134,7 @@ func (self *Dispatcher) unregisterMetricsEventHandler(val interface{}) { func (self *Dispatcher) newMetricEvent(msg *metrics_pb.MetricsMessage, metricType string, name string, id string) *event.MetricsEvent { result := &event.MetricsEvent{ - Namespace: event.MetricsEventsNs, + Namespace: event.MetricsEventNS, EventSrcId: self.ctrlId, MetricType: metricType, SourceAppId: msg.SourceId, diff --git a/controller/events/dispatcher_router.go b/controller/events/dispatcher_router.go index 5f2b0991e..1ea3cb4d4 100644 --- a/controller/events/dispatcher_router.go +++ b/controller/events/dispatcher_router.go @@ -30,7 +30,15 @@ func (self *Dispatcher) AddRouterEventHandler(handler event.RouterEventHandler) } func (self *Dispatcher) RemoveRouterEventHandler(handler event.RouterEventHandler) { - self.routerEventHandlers.Delete(handler) + self.routerEventHandlers.DeleteIf(func(val event.RouterEventHandler) bool { + if val == handler { + return true + } + if w, ok := val.(event.RouterEventHandlerWrapper); ok { + return w.IsWrapping(handler) + } + return false + }) } func (self *Dispatcher) AcceptRouterEvent(event *event.RouterEvent) { @@ -48,13 +56,20 @@ func (self *Dispatcher) initRouterEvents(n *network.Network) { n.AddRouterPresenceHandler(routerEvtAdapter) } -func (self *Dispatcher) registerRouterEventHandler(val interface{}, _ map[string]interface{}) error { +func (self *Dispatcher) registerRouterEventHandler(eventType string, val interface{}, _ map[string]interface{}) error { handler, ok := val.(event.RouterEventHandler) if !ok { return errors.Errorf("type %v doesn't implement github.com/openziti/ziti/controller/event/RouterEventHandler interface.", reflect.TypeOf(val)) } + if eventType != event.RouterEventNS { + handler = &routerEventOldNsAdapter{ + namespace: eventType, + wrapped: handler, + } + } + self.AddRouterEventHandler(handler) return nil @@ -66,6 +81,27 @@ func (self *Dispatcher) unregisterRouterEventHandler(val interface{}) { } } +type routerEventOldNsAdapter struct { + namespace string + wrapped event.RouterEventHandler +} + +func (self *routerEventOldNsAdapter) AcceptRouterEvent(evt *event.RouterEvent) { + nsEvent := *evt + nsEvent.Namespace = self.namespace + self.wrapped.AcceptRouterEvent(&nsEvent) +} + +func (self *routerEventOldNsAdapter) IsWrapping(value event.RouterEventHandler) bool { + if self.wrapped == value { + return true + } + if w, ok := self.wrapped.(event.RouterEventHandlerWrapper); ok { + return w.IsWrapping(value) + } + return false +} + // routerEventAdapter converts network router presence events to event.RouterEvent type routerEventAdapter struct { *Dispatcher @@ -81,7 +117,7 @@ func (self *routerEventAdapter) RouterDisconnected(r *model.Router) { func (self *routerEventAdapter) routerChange(eventType event.RouterEventType, r *model.Router, online bool) { evt := &event.RouterEvent{ - Namespace: event.RouterEventsNs, + Namespace: event.RouterEventNS, EventSrcId: self.ctrlId, EventType: eventType, Timestamp: time.Now(), diff --git a/controller/events/dispatcher_sdk.go b/controller/events/dispatcher_sdk.go index 70221fe00..44b557fda 100644 --- a/controller/events/dispatcher_sdk.go +++ b/controller/events/dispatcher_sdk.go @@ -45,7 +45,7 @@ func (self *Dispatcher) AcceptSdkEvent(evt *event.SdkEvent) { } } -func (self *Dispatcher) registerSdkEventHandler(val interface{}, _ map[string]interface{}) error { +func (self *Dispatcher) registerSdkEventHandler(_ string, val interface{}, _ map[string]interface{}) error { handler, ok := val.(event.SdkEventHandler) if !ok { diff --git a/controller/events/dispatcher_service.go b/controller/events/dispatcher_service.go index fd019017d..4bee67ad0 100644 --- a/controller/events/dispatcher_service.go +++ b/controller/events/dispatcher_service.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" "reflect" "strings" + "time" ) func (self *Dispatcher) AddServiceEventHandler(handler event.ServiceEventHandler) { @@ -30,7 +31,15 @@ func (self *Dispatcher) AddServiceEventHandler(handler event.ServiceEventHandler } func (self *Dispatcher) RemoveServiceEventHandler(handler event.ServiceEventHandler) { - self.serviceEventHandlers.Delete(handler) + self.serviceEventHandlers.DeleteIf(func(val event.ServiceEventHandler) bool { + if val == handler { + return true + } + if w, ok := val.(event.ServiceEventHandlerWrapper); ok { + return w.IsWrapping(handler) + } + return false + }) } func (self *Dispatcher) AcceptServiceEvent(event *event.ServiceEvent) { @@ -41,13 +50,20 @@ func (self *Dispatcher) AcceptServiceEvent(event *event.ServiceEvent) { }() } -func (self *Dispatcher) registerServiceEventHandler(val interface{}, _ map[string]interface{}) error { +func (self *Dispatcher) registerServiceEventHandler(eventType string, val interface{}, _ map[string]interface{}) error { handler, ok := val.(event.ServiceEventHandler) if !ok { - return errors.Errorf("type %v doesn't implement github.com/openziti/edge/event/ServiceEventHandler interface.", reflect.TypeOf(val)) + return errors.Errorf("type %v doesn't implement github.com/openziti/ziti/controller/event/ServiceEventHandler interface.", reflect.TypeOf(val)) } + if eventType == "services" { + handler = &serviceEventOldNsAdapter{ + namespace: "service.events", + wrapped: handler, + } + } self.AddServiceEventHandler(handler) + return nil } @@ -63,6 +79,27 @@ func (self *Dispatcher) initServiceEvents(n *network.Network) { }) } +type serviceEventOldNsAdapter struct { + namespace string + wrapped event.ServiceEventHandler +} + +func (self *serviceEventOldNsAdapter) AcceptServiceEvent(evt *event.ServiceEvent) { + nsEvent := *evt + nsEvent.Namespace = self.namespace + self.wrapped.AcceptServiceEvent(&nsEvent) +} + +func (self *serviceEventOldNsAdapter) IsWrapping(value event.ServiceEventHandler) bool { + if self.wrapped == value { + return true + } + if w, ok := self.wrapped.(event.ServiceEventHandlerWrapper); ok { + return w.IsWrapping(value) + } + return false +} + // serviceEventAdapter converts service interval counters into service events type serviceEventAdapter struct { *Dispatcher @@ -79,10 +116,11 @@ func (self *serviceEventAdapter) AcceptMetrics(message *metrics_pb.MetricsMessag terminatorId = ids[1] } evt := &event.ServiceEvent{ - Namespace: "service.events", + Namespace: event.ServiceEventNS, + EventSrcId: self.ctrlId, + Timestamp: time.Now(), Version: 2, EventType: name, - EventSrcId: self.ctrlId, ServiceId: serviceId, TerminatorId: terminatorId, Count: count, diff --git a/controller/events/dispatcher_session.go b/controller/events/dispatcher_session.go index af438c142..606f8e362 100644 --- a/controller/events/dispatcher_session.go +++ b/controller/events/dispatcher_session.go @@ -77,6 +77,7 @@ func (self *Dispatcher) sessionDeleted(session *db.Session) { Timestamp: time.Now(), Token: session.Token, ApiSessionId: session.ApiSessionId, + IdentityId: session.IdentityId, ServiceId: session.ServiceId, } @@ -85,13 +86,20 @@ func (self *Dispatcher) sessionDeleted(session *db.Session) { } } -func (self *Dispatcher) registerSessionEventHandler(val interface{}, config map[string]interface{}) error { +func (self *Dispatcher) registerSessionEventHandler(eventType string, val interface{}, config map[string]interface{}) error { handler, ok := val.(event.SessionEventHandler) if !ok { return errors.Errorf("type %v doesn't implement github.com/openziti/edge/events/SessionEventHandler interface.", reflect.TypeOf(val)) } + if eventType != event.SessionEventNS { + handler = &sessionEventOldNsAdapter{ + namespace: eventType, + wrapped: handler, + } + } + var includeList []string if includeVar, ok := config["include"]; ok { if includeStr, ok := includeVar.(string); ok { @@ -134,9 +142,9 @@ type sessionEventAdapter struct { includeList []string } -func (adapter *sessionEventAdapter) AcceptSessionEvent(event *event.SessionEvent) { - if stringz.Contains(adapter.includeList, event.EventType) { - adapter.wrapped.AcceptSessionEvent(event) +func (adapter *sessionEventAdapter) AcceptSessionEvent(evt *event.SessionEvent) { + if stringz.Contains(adapter.includeList, evt.EventType) { + adapter.wrapped.AcceptSessionEvent(evt) } } @@ -149,3 +157,24 @@ func (self *sessionEventAdapter) IsWrapping(value event.SessionEventHandler) boo } return false } + +type sessionEventOldNsAdapter struct { + namespace string + wrapped event.SessionEventHandler +} + +func (self *sessionEventOldNsAdapter) AcceptSessionEvent(evt *event.SessionEvent) { + nsEvent := *evt + nsEvent.Namespace = self.namespace + self.wrapped.AcceptSessionEvent(&nsEvent) +} + +func (self *sessionEventOldNsAdapter) IsWrapping(value event.SessionEventHandler) bool { + if self.wrapped == value { + return true + } + if w, ok := self.wrapped.(event.SessionEventHandlerWrapper); ok { + return w.IsWrapping(value) + } + return false +} diff --git a/controller/events/dispatcher_terminator.go b/controller/events/dispatcher_terminator.go index d16e32eab..2124bd25a 100644 --- a/controller/events/dispatcher_terminator.go +++ b/controller/events/dispatcher_terminator.go @@ -55,13 +55,20 @@ func (self *Dispatcher) AcceptTerminatorEvent(event *event.TerminatorEvent) { }() } -func (self *Dispatcher) registerTerminatorEventHandler(val interface{}, options map[string]interface{}) error { +func (self *Dispatcher) registerTerminatorEventHandler(eventType string, val interface{}, options map[string]interface{}) error { handler, ok := val.(event.TerminatorEventHandler) if !ok { return errors.Errorf("type %v doesn't implement github.com/openziti/ziti/controller/event/TerminatorEventHandler interface.", reflect.TypeOf(val)) } + if eventType != event.TerminatorEventNS { + handler = &terminatorEventOldNsAdapter{ + namespace: eventType, + wrapped: handler, + } + } + propagateAlways := false if val, found := options["propagateAlways"]; found { if b, ok := val.(bool); ok { @@ -101,6 +108,27 @@ func (self *Dispatcher) initTerminatorEvents(n *network.Network) { n.AddRouterPresenceHandler(terminatorEvtAdapter) } +type terminatorEventOldNsAdapter struct { + namespace string + wrapped event.TerminatorEventHandler +} + +func (self *terminatorEventOldNsAdapter) AcceptTerminatorEvent(evt *event.TerminatorEvent) { + nsEvent := *evt + nsEvent.Namespace = self.namespace + self.wrapped.AcceptTerminatorEvent(&nsEvent) +} + +func (self *terminatorEventOldNsAdapter) IsWrapping(value event.TerminatorEventHandler) bool { + if self.wrapped == value { + return true + } + if w, ok := self.wrapped.(event.TerminatorEventHandlerWrapper); ok { + return w.IsWrapping(value) + } + return false +} + type terminatorEventFilter struct { event.TerminatorEventHandler } @@ -202,7 +230,7 @@ func (self *terminatorEventAdapter) createTerminatorEvent(eventType event.Termin } evt := &event.TerminatorEvent{ - Namespace: event.TerminatorEventsNs, + Namespace: event.TerminatorEventNS, EventType: eventType, EventSrcId: self.Dispatcher.ctrlId, Timestamp: time.Now(), diff --git a/controller/events/dispatcher_usage.go b/controller/events/dispatcher_usage.go index 08eec8739..f3c29f356 100644 --- a/controller/events/dispatcher_usage.go +++ b/controller/events/dispatcher_usage.go @@ -22,6 +22,7 @@ import ( "github.com/openziti/ziti/controller/event" "github.com/pkg/errors" "reflect" + "time" ) func (self *Dispatcher) AddUsageEventHandler(handler event.UsageEventHandler) { @@ -29,7 +30,15 @@ func (self *Dispatcher) AddUsageEventHandler(handler event.UsageEventHandler) { } func (self *Dispatcher) RemoveUsageEventHandler(handler event.UsageEventHandler) { - self.usageEventHandlers.Delete(handler) + self.usageEventHandlers.DeleteIf(func(val event.UsageEventHandler) bool { + if val == handler { + return true + } + if w, ok := val.(event.UsageEventHandlerWrapper); ok { + return w.IsWrapping(handler) + } + return false + }) } func (self *Dispatcher) AddUsageEventV3Handler(handler event.UsageEventV3Handler) { @@ -48,7 +57,7 @@ func (self *Dispatcher) RemoveUsageEventV3Handler(handler event.UsageEventV3Hand }) } -func (self *Dispatcher) AcceptUsageEvent(event *event.UsageEvent) { +func (self *Dispatcher) AcceptUsageEvent(event *event.UsageEventV2) { go func() { for _, handler := range self.usageEventHandlers.Value() { handler.AcceptUsageEvent(event) @@ -64,7 +73,7 @@ func (self *Dispatcher) AcceptUsageEventV3(event *event.UsageEventV3) { }() } -func (self *Dispatcher) registerUsageEventHandler(val interface{}, config map[string]interface{}) error { +func (self *Dispatcher) registerUsageEventHandler(eventType string, val interface{}, config map[string]interface{}) error { version := 2 if configVal, found := config["version"]; found { @@ -83,13 +92,26 @@ func (self *Dispatcher) registerUsageEventHandler(val interface{}, config map[st if !ok { return errors.Errorf("type %v doesn't implement github.com/openziti/ziti/controller/event/UsageEventHandler interface.", reflect.TypeOf(val)) } + if eventType != event.UsageEventNS { + handler = &usageEventV2OldNsAdapter{ + namespace: eventType, + wrapped: handler, + } + } self.AddUsageEventHandler(handler) - } else if version == 3 { + } else { handler, ok := val.(event.UsageEventV3Handler) if !ok { return errors.Errorf("type %v doesn't implement github.com/openziti/ziti/controller/event/UsageEventV3Handler interface.", reflect.TypeOf(val)) } + if eventType != event.UsageEventNS { + handler = &usageEventV3OldNsAdapter{ + namespace: eventType, + wrapped: handler, + } + } + if includeListVal, found := config["include"]; found { includes := map[string]struct{}{} if list, ok := includeListVal.([]interface{}); ok { @@ -115,9 +137,8 @@ func (self *Dispatcher) registerUsageEventHandler(val interface{}, config map[st } self.AddUsageEventV3Handler(handler) - } else { - return errors.Errorf("unsupported usage version: %v", version) } + return nil } @@ -150,10 +171,11 @@ func (self *usageEventAdapter) AcceptMetricsMsg(message *metrics_pb.MetricsMessa for name, interval := range message.IntervalCounters { for _, bucket := range interval.Buckets { for circuitId, usage := range bucket.Values { - evt := &event.UsageEvent{ - Namespace: event.UsageEventsNs, - Version: event.UsageEventsVersion, + evt := &event.UsageEventV2{ + Namespace: event.UsageEventNS, EventSrcId: self.dispatcher.ctrlId, + Timestamp: time.Now(), + Version: event.UsageEventsVersion, EventType: name, SourceId: message.SourceId, CircuitId: circuitId, @@ -168,11 +190,12 @@ func (self *usageEventAdapter) AcceptMetricsMsg(message *metrics_pb.MetricsMessa for _, interval := range message.UsageCounters { for circuitId, bucket := range interval.Buckets { for usageType, usage := range bucket.Values { - evt := &event.UsageEvent{ - Namespace: event.UsageEventsNs, + evt := &event.UsageEventV2{ + Namespace: event.UsageEventNS, + EventSrcId: self.dispatcher.ctrlId, + Timestamp: time.Now(), Version: event.UsageEventsVersion, EventType: "usage." + usageType, - EventSrcId: self.dispatcher.ctrlId, SourceId: message.SourceId, CircuitId: circuitId, Usage: usage, @@ -191,9 +214,10 @@ func (self *usageEventAdapter) AcceptMetricsMsg(message *metrics_pb.MetricsMessa for _, bucket := range interval.Buckets { for circuitId, usage := range bucket.Values { evt := &event.UsageEventV3{ - Namespace: event.UsageEventsNs, - Version: 3, + Namespace: event.UsageEventNS, EventSrcId: self.dispatcher.ctrlId, + Timestamp: time.Now(), + Version: 3, SourceId: message.SourceId, CircuitId: circuitId, Usage: map[string]uint64{ @@ -210,10 +234,11 @@ func (self *usageEventAdapter) AcceptMetricsMsg(message *metrics_pb.MetricsMessa for _, interval := range message.UsageCounters { for circuitId, bucket := range interval.Buckets { evt := &event.UsageEventV3{ - Namespace: event.UsageEventsNs, + Namespace: event.UsageEventNS, + EventSrcId: self.dispatcher.ctrlId, + Timestamp: time.Now(), Version: 3, SourceId: message.SourceId, - EventSrcId: self.dispatcher.ctrlId, CircuitId: circuitId, Usage: bucket.Values, IntervalStartUTC: interval.IntervalStartUTC, @@ -263,3 +288,45 @@ func (self *filteredUsageV3EventHandler) AcceptUsageEventV3(event *event.UsageEv newEvent.Usage = usage self.wrapped.AcceptUsageEventV3(&newEvent) } + +type usageEventV2OldNsAdapter struct { + namespace string + wrapped event.UsageEventHandler +} + +func (self *usageEventV2OldNsAdapter) AcceptUsageEvent(event *event.UsageEventV2) { + nsEvent := *event + nsEvent.Namespace = self.namespace + self.wrapped.AcceptUsageEvent(&nsEvent) +} + +func (self *usageEventV2OldNsAdapter) IsWrapping(value event.UsageEventHandler) bool { + if self.wrapped == value { + return true + } + if w, ok := self.wrapped.(event.UsageEventHandlerWrapper); ok { + return w.IsWrapping(value) + } + return false +} + +type usageEventV3OldNsAdapter struct { + namespace string + wrapped event.UsageEventV3Handler +} + +func (self *usageEventV3OldNsAdapter) AcceptUsageEventV3(evt *event.UsageEventV3) { + nsEvent := *evt + nsEvent.Namespace = self.namespace + self.wrapped.AcceptUsageEventV3(&nsEvent) +} + +func (self *usageEventV3OldNsAdapter) IsWrapping(value event.UsageEventV3Handler) bool { + if self.wrapped == value { + return true + } + if w, ok := self.wrapped.(event.UsageEventV3HandlerWrapper); ok { + return w.IsWrapping(value) + } + return false +} diff --git a/controller/events/formatter.go b/controller/events/formatter.go index 06318def1..0119ec557 100644 --- a/controller/events/formatter.go +++ b/controller/events/formatter.go @@ -155,7 +155,7 @@ func (event *JsonTerminatorEvent) Format() ([]byte, error) { return MarshalJson(event) } -type JsonUsageEvent event.UsageEvent +type JsonUsageEvent event.UsageEventV2 func (event *JsonUsageEvent) GetEventType() string { return "usage" @@ -285,7 +285,7 @@ func (formatter *JsonFormatter) AcceptRouterEvent(evt *event.RouterEvent) { formatter.AcceptLoggingEvent((*JsonRouterEvent)(evt)) } -func (formatter *JsonFormatter) AcceptUsageEvent(evt *event.UsageEvent) { +func (formatter *JsonFormatter) AcceptUsageEvent(evt *event.UsageEventV2) { formatter.AcceptLoggingEvent((*JsonUsageEvent)(evt)) } diff --git a/controller/handler_mgmt/stream_events.go b/controller/handler_mgmt/stream_events.go index dfc0c0f2b..b6ce706e0 100644 --- a/controller/handler_mgmt/stream_events.go +++ b/controller/handler_mgmt/stream_events.go @@ -21,10 +21,10 @@ import ( "fmt" "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v3" - "github.com/openziti/ziti/controller/event" - "github.com/openziti/ziti/controller/network" "github.com/openziti/ziti/common/handler_common" "github.com/openziti/ziti/common/pb/mgmt_pb" + "github.com/openziti/ziti/controller/event" + "github.com/openziti/ziti/controller/network" "io" ) diff --git a/controller/model/identity_manager.go b/controller/model/identity_manager.go index 31c9044d3..01bda8111 100644 --- a/controller/model/identity_manager.go +++ b/controller/model/identity_manager.go @@ -1108,7 +1108,7 @@ func (self *ConnectionTracker) SendSdkOnlineStatusChangeEvent(identityId string, } self.eventDispatcher.AcceptSdkEvent(&event.SdkEvent{ - Namespace: event.SdkEventsNs, + Namespace: event.SdkEventNS, EventType: eventType, Timestamp: time.Now(), IdentityId: identityId, diff --git a/controller/network/assembly.go b/controller/network/assembly.go index 7529080ba..203ae5bdf 100644 --- a/controller/network/assembly.go +++ b/controller/network/assembly.go @@ -75,7 +75,7 @@ func (network *Network) assemble() { func (network *Network) NotifyLinkEvent(link *model.Link, eventType event.LinkEventType) { linkEvent := &event.LinkEvent{ - Namespace: event.LinkEventsNs, + Namespace: event.LinkEventNS, EventType: eventType, EventSrcId: network.GetAppId(), Timestamp: time.Now(), @@ -91,7 +91,7 @@ func (network *Network) NotifyLinkEvent(link *model.Link, eventType event.LinkEv func (network *Network) NotifyLinkConnected(link *model.Link, msg *ctrl_pb.LinkConnected) { linkEvent := &event.LinkEvent{ - Namespace: event.LinkEventsNs, + Namespace: event.LinkEventNS, EventType: event.LinkConnected, EventSrcId: network.GetAppId(), Timestamp: time.Now(), @@ -116,7 +116,7 @@ func (network *Network) NotifyLinkConnected(link *model.Link, msg *ctrl_pb.LinkC func (network *Network) NotifyLinkIdEvent(linkId string, eventType event.LinkEventType) { linkEvent := &event.LinkEvent{ - Namespace: event.LinkEventsNs, + Namespace: event.LinkEventNS, EventType: eventType, EventSrcId: network.GetAppId(), Timestamp: time.Now(), diff --git a/controller/network/circuit_lifecycle.go b/controller/network/circuit_lifecycle.go index de3e7b7f9..3eac49052 100644 --- a/controller/network/circuit_lifecycle.go +++ b/controller/network/circuit_lifecycle.go @@ -55,7 +55,7 @@ func (network *Network) CircuitEvent(eventType event.CircuitEventType, circuit * } circuitEvent := &event.CircuitEvent{ - Namespace: event.CircuitEventsNs, + Namespace: event.CircuitEventNS, Version: event.CircuitEventsVersion, EventType: eventType, EventSrcId: network.GetAppId(), @@ -156,7 +156,7 @@ func (network *Network) CircuitFailedEvent( elapsed := time.Since(startTime) circuitEvent := &event.CircuitEvent{ - Namespace: event.CircuitEventsNs, + Namespace: event.CircuitEventNS, Version: event.CircuitEventsVersion, EventType: event.CircuitFailed, EventSrcId: network.GetAppId(), diff --git a/etc/ctrl.with.edge.yml b/etc/ctrl.with.edge.yml index fb35ff743..564d153f8 100644 --- a/etc/ctrl.with.edge.yml +++ b/etc/ctrl.with.edge.yml @@ -79,31 +79,33 @@ ctrl: # connections. The value of newListener must be resolvable both via DNS and validate via certificates #newListener: tls:localhost:6262 -#events: -# jsonLogger: -# subscriptions: +events: + jsonLogger: + subscriptions: +# - type: apiSession +# - type: circuit # - type: connect # - type: sdk # - type: entityChange # include: # - services # - identities -# - type: fabric.circuits -# - type: fabric.links -# - type: fabric.routers -# - type: fabric.terminators +# - type: entityCount +# - type: link # - type: metrics # sourceFilter: .* # metricFilter: .* -# - type: edge.sessions -# - type: edge.apiSessions -# - type: fabric.usage +# - type: router +# - type: session +# - type: services +# - type: terminator +# - type: usage # version: 3 +# - type: usage +# version: 2 # include: # - ingress.rx # - egress.rx -# - type: services -# - type: edge.entityCounts # interval: 5s handler: type: file diff --git a/tests/.gitignore b/tests/.gitignore index 74b967cfb..846fb49d8 100644 --- a/tests/.gitignore +++ b/tests/.gitignore @@ -1 +1,2 @@ *.json.gzip +*.proto.gzip diff --git a/tests/authenticate.go b/tests/authenticate.go index b01be30f0..0e5a1bf89 100644 --- a/tests/authenticate.go +++ b/tests/authenticate.go @@ -1136,7 +1136,7 @@ func (request *authenticatedRequests) newTerminatorWatcher() *terminatorWatcher req := &subscriptionRequest{ Subscriptions: []*event.Subscription{ - {Type: event.TerminatorEventsNs}, + {Type: event.TerminatorEventNS}, }, Callback: watcher.HandleMessage, } diff --git a/tests/events_test.go b/tests/events_test.go index a9b45e75a..178b3ca96 100644 --- a/tests/events_test.go +++ b/tests/events_test.go @@ -39,7 +39,7 @@ func (self *eventsCollector) acceptEvent(event interface{}) { fmt.Printf("\nNEXT EVENT: %v: %v %+v\n", reflect.TypeOf(event), event, event) } -func (self *eventsCollector) AcceptUsageEvent(event *event.UsageEvent) { +func (self *eventsCollector) AcceptUsageEvent(event *event.UsageEventV2) { self.acceptEvent(event) } @@ -114,24 +114,24 @@ func Test_EventsTest(t *testing.T) { // TODO: Figure out how to make this test faster. Was using ctx.router.GetMetricsRegistry().Flush(), but it's not ideal ctx.Req.NoError(err) - evt := ec.PopNextEvent(ctx, "edge.sessions.created", time.Second) + evt := ec.PopNextEvent(ctx, "sessions.created", time.Second) edgeSession, ok := evt.(*event.SessionEvent) ctx.Req.True(ok) - ctx.Req.Equal("edge.sessions", edgeSession.Namespace) + ctx.Req.Equal("session", edgeSession.Namespace) ctx.Req.Equal("created", edgeSession.EventType) ctx.Req.Equal(hostIdentity.Id, edgeSession.IdentityId) evt = ec.PopNextEvent(ctx, "edge.sessions.created", time.Second) edgeSession, ok = evt.(*event.SessionEvent) ctx.Req.True(ok) - ctx.Req.Equal("edge.sessions", edgeSession.Namespace) + ctx.Req.Equal("session", edgeSession.Namespace) ctx.Req.Equal("created", edgeSession.EventType) ctx.Req.Equal(clientIdentity.Id, edgeSession.IdentityId) - evt = ec.PopNextEvent(ctx, "fabric.circuits.created", time.Second) + evt = ec.PopNextEvent(ctx, "circuits.created", time.Second) circuitEvent, ok := evt.(*event.CircuitEvent) ctx.Req.True(ok) - ctx.Req.Equal("fabric.circuits", circuitEvent.Namespace) + ctx.Req.Equal("circuit", circuitEvent.Namespace) ctx.Req.Equal("created", string(circuitEvent.EventType)) ctx.Req.Equal(service.Id, circuitEvent.ServiceId) ctx.Req.Equal(edgeSession.Id, circuitEvent.ClientId) @@ -139,9 +139,9 @@ func Test_EventsTest(t *testing.T) { timeout := time.Minute * 2 for i := 0; i < 3; i++ { evt = ec.PopNextEvent(ctx, fmt.Sprintf("usage or circuits deleted %v", i+1), timeout) - if usage, ok := evt.(*event.UsageEvent); ok { + if usage, ok := evt.(*event.UsageEventV2); ok { timeout = time.Second * 10 - ctx.Req.Equal("fabric.usage", usage.Namespace) + ctx.Req.Equal("usage", usage.Namespace) ctx.Req.Equal(uint32(2), usage.Version) ctx.Req.Equal(circuitEvent.CircuitId, usage.CircuitId) expected := []string{"usage.ingress.rx", "usage.egress.tx"} @@ -149,7 +149,7 @@ func Test_EventsTest(t *testing.T) { ctx.Req.Equal(ctx.edgeRouterEntity.id, usage.SourceId) ctx.Req.Equal(uint64(26), usage.Usage) } else if circuitEvent, ok := evt.(*event.CircuitEvent); ok { - ctx.Req.Equal("fabric.circuits", circuitEvent.Namespace) + ctx.Req.Equal("circuit", circuitEvent.Namespace) ctx.Req.Equal("deleted", string(circuitEvent.EventType)) ctx.Req.Equal(edgeSession.Id, circuitEvent.ClientId) } else { diff --git a/ziti/cmd/fabric/stream_events.go b/ziti/cmd/fabric/stream_events.go index 900c6063b..8a4694128 100644 --- a/ziti/cmd/fabric/stream_events.go +++ b/ziti/cmd/fabric/stream_events.go @@ -35,6 +35,8 @@ type streamEventsAction struct { all bool apiSessions bool circuits bool + cluster bool + connect bool entityChange bool entityCounts bool links bool @@ -42,6 +44,7 @@ type streamEventsAction struct { routers bool services bool sessions bool + sdk bool terminators bool usage bool @@ -70,11 +73,14 @@ func NewStreamEventsCmd(p common.OptionsProvider) *cobra.Command { streamEventsCmd.Flags().BoolVar(&action.all, "all", false, "Include all events") streamEventsCmd.Flags().BoolVar(&action.apiSessions, "api-sessions", false, "Include api-session events") streamEventsCmd.Flags().BoolVar(&action.circuits, "circuits", false, "Include circuit events") + streamEventsCmd.Flags().BoolVar(&action.cluster, "cluster", false, "Include cluster events") + streamEventsCmd.Flags().BoolVar(&action.connect, "connect", false, "Include connect events") streamEventsCmd.Flags().BoolVar(&action.entityChange, "entity-change", false, "Include entity change events") streamEventsCmd.Flags().BoolVar(&action.entityCounts, "entity-counts", false, "Include entity count events") streamEventsCmd.Flags().BoolVar(&action.links, "links", false, "Include link events") streamEventsCmd.Flags().BoolVar(&action.metrics, "metrics", false, "Include metrics events") streamEventsCmd.Flags().BoolVar(&action.routers, "routers", false, "Include router events") + streamEventsCmd.Flags().BoolVar(&action.sdk, "sdk", false, "Include sdk events") streamEventsCmd.Flags().BoolVar(&action.services, "services", false, "Include service events") streamEventsCmd.Flags().BoolVar(&action.sessions, "sessions", false, "Include session events") streamEventsCmd.Flags().BoolVar(&action.terminators, "terminators", false, "Include terminators events") @@ -91,26 +97,38 @@ func (self *streamEventsAction) buildSubscriptions(cmd *cobra.Command) []*event. if self.apiSessions || (self.all && !cmd.Flags().Changed("api-sessions")) { subscriptions = append(subscriptions, &event.Subscription{ - Type: "edge.apiSessions", + Type: event.ApiSessionEventNS, }) } if self.circuits || (self.all && !cmd.Flags().Changed("circuits")) { subscriptions = append(subscriptions, &event.Subscription{ - Type: "fabric.circuits", + Type: event.CircuitEventNS, + }) + } + + if self.cluster || (self.all && !cmd.Flags().Changed("cluster")) { + subscriptions = append(subscriptions, &event.Subscription{ + Type: event.ClusterEventNS, + }) + } + + if self.connect || (self.all && !cmd.Flags().Changed("connect")) { + subscriptions = append(subscriptions, &event.Subscription{ + Type: event.ConnectEventNS, }) } if self.entityChange || (self.all && !cmd.Flags().Changed("entity-change")) { subscription := &event.Subscription{ - Type: "entityChange", + Type: event.EntityChangeEventNS, } subscriptions = append(subscriptions, subscription) } if self.entityCounts || (self.all && !cmd.Flags().Changed("entity-counts")) { subscription := &event.Subscription{ - Type: "edge.entityCounts", + Type: event.EntityCountEventNS, } if cmd.Flags().Changed("entity-counts-interval") { subscription.Options = map[string]interface{}{ @@ -122,13 +140,13 @@ func (self *streamEventsAction) buildSubscriptions(cmd *cobra.Command) []*event. if self.links || (self.all && !cmd.Flags().Changed("links")) { subscriptions = append(subscriptions, &event.Subscription{ - Type: "fabric.links", + Type: event.LinkEventNS, }) } if self.metrics || (self.all && !cmd.Flags().Changed("metrics")) { subscription := &event.Subscription{ - Type: "metrics", + Type: event.MetricsEventNS, Options: map[string]interface{}{}, } @@ -145,31 +163,37 @@ func (self *streamEventsAction) buildSubscriptions(cmd *cobra.Command) []*event. if self.routers || (self.all && !cmd.Flags().Changed("routers")) { subscriptions = append(subscriptions, &event.Subscription{ - Type: "fabric.routers", + Type: event.RouterEventNS, + }) + } + + if self.sdk || (self.all && !cmd.Flags().Changed("sdk")) { + subscriptions = append(subscriptions, &event.Subscription{ + Type: event.SdkEventNS, }) } - if self.sessions || (self.all && !cmd.Flags().Changed("services")) { + if self.services || (self.all && !cmd.Flags().Changed("services")) { subscriptions = append(subscriptions, &event.Subscription{ - Type: "services", + Type: event.ServiceEventNS, }) } if self.sessions || (self.all && !cmd.Flags().Changed("sessions")) { subscriptions = append(subscriptions, &event.Subscription{ - Type: "edge.sessions", + Type: event.SessionEventNS, }) } if self.terminators || (self.all && !cmd.Flags().Changed("terminators")) { subscriptions = append(subscriptions, &event.Subscription{ - Type: "fabric.terminators", + Type: event.TerminatorEventNS, }) } if self.usage || (self.all && !cmd.Flags().Changed("usage")) { subscription := &event.Subscription{ - Type: "fabric.usage", + Type: event.UsageEventNS, Options: map[string]interface{}{ "version": self.usageVersion, }, diff --git a/ziti/eventdoc/main.go b/ziti/eventdoc/main.go new file mode 100644 index 000000000..5d63d2d23 --- /dev/null +++ b/ziti/eventdoc/main.go @@ -0,0 +1,676 @@ +//go:build all + +package main + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "github.com/openziti/foundation/v2/stringz" + "go/ast" + "go/parser" + "go/token" + "os" + "regexp" + "sort" + "strconv" + "strings" +) + +type TypeDef struct { + baseName string + name string + doc string + fieldNames []string + fields map[string]*typeField + namespace string + isEventType bool + extraTypes map[string]struct{} + isVersioned bool + version uint +} + +func (self *TypeDef) GetTitle() string { + return strings.ReplaceAll(self.name, "Event", "") +} + +func (self *TypeDef) FormatDoc() { + leftBraceRegex, err := regexp.Compile(`^\s+\{`) + if err != nil { + panic(err) + } + rightBraceRegex, err := regexp.Compile(`^\s+[}\]]`) + if err != nil { + panic(err) + } + + result := &bytes.Buffer{} + scanner := bufio.NewScanner(strings.NewReader(self.doc)) + + indent := 0 + + detailSummary := "" + + for scanner.Scan() { + line := strings.TrimRight(scanner.Text(), " ") + if indent > 0 && rightBraceRegex.MatchString(line) { + indent -= 4 + self.writeIndented(result, indent, line) + if indent == 0 { + result.WriteString("```\n\n") + } + } else if indent > 0 { + self.writeIndented(result, indent, line) + if strings.HasSuffix(line, "{") || strings.HasSuffix(line, "[") { + indent += 4 + } + } else if leftBraceRegex.MatchString(line) { + if indent == 0 { + result.WriteString("
\n") + result.WriteString("") + if detailSummary != "" { + result.WriteString(detailSummary) + detailSummary = "" + } else { + result.WriteString("Example") + } + result.WriteString("\n") + } + result.WriteString("```text\n") + self.writeIndented(result, indent, line) + indent += 4 + } else if strings.Contains(line, "Example: ") { + detailSummary = strings.TrimSpace(line) + } else { + result.WriteString(line) + result.WriteString("\n") + } + } + self.doc = result.String() + + for _, field := range self.fields { + field.doc = strings.ReplaceAll(field.doc, "\n", " ") + } +} + +func (self *TypeDef) writeIndented(buf *bytes.Buffer, indent int, line string) { + line = strings.TrimSpace(line) + for range indent { + buf.WriteString(" ") + } + buf.WriteString(line) + buf.WriteString("\n") +} + +func (self *TypeDef) getExtraTypes(v *visitor, m map[string]struct{}) []string { + for name := range self.extraTypes { + if _, ok := m[name]; !ok { + m[name] = struct{}{} + extraType := v.extraTypes[name] + extraType.getExtraTypes(v, m) + } + } + + var result []string + for name := range m { + result = append(result, name) + } + sort.Strings(result) + return result +} + +func (self *TypeDef) GetDoc(v *visitor) string { + buf := &bytes.Buffer{} + buf.WriteString("## ") + buf.WriteString(self.GetTitle()) + buf.WriteString("\n\n**Namespace:** `") + buf.WriteString(self.namespace) + buf.WriteString("` \n\n") + buf.WriteString(self.doc) + buf.WriteString("\n\n*******************\n\n**Fields**\n\n") + + self.AppendFields(buf) + + extraTypes := self.getExtraTypes(v, map[string]struct{}{}) + for _, name := range extraTypes { + extraType := v.extraTypes[name] + buf.WriteString("### ") + buf.WriteString(name) + buf.WriteString("\n\n") + buf.WriteString(extraType.doc) + buf.WriteString("\n\n*******************\n\n**Fields**\n\n") + + extraType.AppendFields(buf) + } + + return buf.String() +} + +//func (self *TypeDef) AppendFields(buf *bytes.Buffer) { +// for _, name := range self.fieldNames { +// field := self.fields[name] +// buf.WriteString(fmt.Sprintf("**`%s`**\n\n", field.jsName)) +// buf.WriteString("The " + field.doc) +// buf.WriteString("\n\n* Type: ") +// buf.WriteString(field.jsType) +// buf.WriteString("\n\n") +// } +// buf.WriteString("\n") +//} + +func (self *TypeDef) AppendFields(buf *bytes.Buffer) { + buf.WriteString("| Field | Description | Type |\n") + buf.WriteString("| ----- | ----------- | ---- |\n") + for _, name := range self.fieldNames { + field := self.fields[name] + buf.WriteString("| **") + buf.WriteString(field.jsName) + buf.WriteString("** |") + buf.WriteString(field.doc) + buf.WriteString("|") + buf.WriteString(field.jsType) + buf.WriteString("|\n") + } + buf.WriteString("\n") +} + +func (self *TypeDef) HasErrors() bool { + hasErrors := false + if self.isEventType { + if self.namespace == "" { + _, _ = fmt.Fprintf(os.Stderr, "%s has no namespace\n", self.name) + hasErrors = true + } + if _, ok := self.fields["Timestamp"]; !ok { + _, _ = fmt.Fprintf(os.Stderr, "%s has no timestamp field\n", self.name) + } + } + + if self.doc == "" { + _, _ = fmt.Fprintf(os.Stderr, "%s has no doc\n", self.name) + hasErrors = true + } + + for _, field := range self.fields { + if len(field.doc) < 10 { + _, _ = fmt.Fprintf(os.Stderr, "%s.%s has no doc\n", self.name, field.name) + hasErrors = true + } + } + + return hasErrors +} + +type typeField struct { + name string + jsType string + jsName string + doc string +} + +type visitor struct { + extraTypes map[string]*TypeDef + eventTypes map[string]*TypeDef + namespaces map[string]string + typeMappings map[string]string + currentDoc string + currentType *TypeDef +} + +func (self *visitor) postProcess() { + for _, eventDesc := range self.eventTypes { + for _, fieldDesc := range eventDesc.fields { + if fieldDesc.name == "Namespace" { + ns := self.namespaces[eventDesc.baseName] + eventDesc.namespace = ns + if fieldDesc.doc == "" { + fieldDesc.doc = fmt.Sprintf("The event group. The namespace for %ss is %s", eventDesc.name, ns) + } + } else if fieldDesc.name == "Timestamp" { + if fieldDesc.doc == "" { + fieldDesc.doc = "The datetime that the event was generated" + } + } else if fieldDesc.name == "EventSrcId" { + if fieldDesc.doc == "" { + fieldDesc.doc = "The identifier of the controller which emitted the event" + } + } + fieldDesc.doc = strings.ReplaceAll(fieldDesc.doc, fieldDesc.name, fieldDesc.jsName) + fieldDesc.doc = strings.TrimSuffix(fieldDesc.doc, "\n") + } + } + + for _, t := range self.eventTypes { + t.FormatDoc() + } + + for _, t := range self.extraTypes { + t.FormatDoc() + } +} + +func (self *visitor) GetTypeMappingsAndAliases(node ast.Node) bool { + switch n := node.(type) { + case *ast.ValueSpec: + self.VisitValueDecl(n) + case *ast.TypeSpec: + name := n.Name.String() + if ident, ok := n.Type.(*ast.Ident); ok { + self.typeMappings[name] = ident.Name + } + } + return true +} + +func (self *visitor) VisitEventTypes(node ast.Node) bool { + switch n := node.(type) { + + case *ast.GenDecl: + self.VisitGenDecl(n) + case *ast.TypeSpec: + name := n.Name.String() + baseName, isEvent, version := self.getTypeName(n) + if !isEvent { + return false + } + if strct, ok := n.Type.(*ast.StructType); ok { + typeDef := self.getTypeDef(n, strct) + typeDef.baseName = baseName + if version != nil { + typeDef.isVersioned = true + typeDef.version = *version + } + typeDef.isEventType = true + self.eventTypes[name] = typeDef + } + return false + } + return true +} + +func (self *visitor) getTypeName(node *ast.TypeSpec) (string, bool, *uint) { + name := node.Name.String() + re := regexp.MustCompile(`(\w+Event)(V(\d+))?`) + parts := re.FindStringSubmatch(name) + if len(parts) == 0 { + return name, false, nil + } + + //for x, part := range parts { + // _, _ = fmt.Fprintf(os.Stderr, "%d:%s\n", x, part) + //} + + var version *uint + + if len(parts) == 4 && parts[3] != "" { + versionNumber, err := strconv.Atoi(parts[3]) + if err != nil { + panic(err) + } + v := uint(versionNumber) + version = &v + } + + return parts[1], true, version +} + +func (self *visitor) VisitOtherTypesFirstPass(node ast.Node) bool { + switch n := node.(type) { + + case *ast.TypeSpec: + name := n.Name.String() + _, isEvent, _ := self.getTypeName(n) + if isEvent { + return false + } + if _, ok := n.Type.(*ast.StructType); ok { + self.extraTypes[name] = nil + } + } + return true +} + +func (self *visitor) VisitOtherTypes(node ast.Node) bool { + switch n := node.(type) { + + case *ast.GenDecl: + self.VisitGenDecl(n) + case *ast.TypeSpec: + name := n.Name.String() + _, isEvent, _ := self.getTypeName(n) + if isEvent { + return false + } + if strct, ok := n.Type.(*ast.StructType); ok { + typeDef := self.getTypeDef(n, strct) + self.extraTypes[name] = typeDef + } + } + return true +} + +func (self *visitor) getTypeDef(n *ast.TypeSpec, strct *ast.StructType) *TypeDef { + name := n.Name.String() + + typeDef := &TypeDef{ + name: name, + fields: map[string]*typeField{}, + extraTypes: map[string]struct{}{}, + } + self.currentType = typeDef + + typeDef.doc = self.currentDoc + + // iterate over and append field names, types, and documentation. + for _, field := range strct.Fields.List { + if len(field.Names) == 0 { + // embedded field + continue + } + fieldName := field.Names[0].String() + + jsonName := fieldName + if field.Tag != nil { + tags, err := self.parseTags(field.Tag.Value) + if err != nil { + fmt.Printf("error parsing tag: %v\n", field.Tag.Value) + panic(err) + } + for _, tag := range tags { + if tag.key == "json" { + jsonName = tag.values[0] + } + } + } + + if jsonName == "-" { + continue + } + + fieldType := self.getType(field.Type) + if strings.Contains(fieldType, "unhandled") { + fmt.Printf("unhandled field type %T (%s) for %s.%s\n", field.Type, fieldType, name, fieldName) + } + + var fieldDoc string + if fd := field.Doc; fd != nil { + fieldDoc = fd.Text() + } + + typeDef.fieldNames = append(typeDef.fieldNames, fieldName) + typeDef.fields[fieldName] = &typeField{ + name: fieldName, + jsType: fieldType, + jsName: jsonName, + doc: fieldDoc, + } + } + + self.currentType = nil + + return typeDef +} + +func (self *visitor) getType(goFieldType ast.Expr) string { + if identType, ok := goFieldType.(*ast.Ident); ok { + return self.getJsTypeName(identType.Name) + } + + if selType, ok := goFieldType.(*ast.SelectorExpr); ok { + return self.getJsTypeName(selType.X.(*ast.Ident).Name + "." + selType.Sel.Name) + } + + if starType, ok := goFieldType.(*ast.StarExpr); ok { + return self.getType(starType.X) + } + + if arrayType, ok := goFieldType.(*ast.ArrayType); ok { + return "list of " + self.getType(arrayType.Elt) + } + + if _, ok := goFieldType.(*ast.InterfaceType); ok { + return "object" + } + + if mapType, ok := goFieldType.(*ast.MapType); ok { + return "map of " + self.getType(mapType.Key) + " -> " + self.getType(mapType.Value) + } + + fmt.Printf("unhandled type: %T\n", goFieldType) + return "unhandled" +} + +func (self *visitor) getJsTypeName(name string) string { + if alias, ok := self.typeMappings[name]; ok { + name = alias + } + + if _, ok := self.extraTypes[name]; ok { + self.currentType.extraTypes[name] = struct{}{} + return fmt.Sprintf("[%s](#%s)", name, strings.ToLower(name)) + } + + if name == "string" { + return "string" + } + if name == "bool" { + return "boolean" + } + + if stringz.Contains([]string{"uint32", "int32", "uint64", "int64", "uint16", "int16", "int"}, name) { + return fmt.Sprintf("number (%s)", name) + } + + if name == "time.Time" { + return "timestamp" + } + + if name == "time.Duration" { + return "duration" + } + + if name == "any" { + return "object" + } + + return fmt.Sprintf("unhandled: %s", name) +} + +func (self *visitor) VisitGenDecl(n *ast.GenDecl) { + if n.Doc != nil { + self.currentDoc = n.Doc.Text() + } else { + self.currentDoc = "" + } +} + +func (self *visitor) VisitValueDecl(node *ast.ValueSpec) { + self.ExtractNamespace(node) +} + +func (self *visitor) ExtractNamespace(node *ast.ValueSpec) { + for _, nameIdent := range node.Names { + name := nameIdent.String() + if strings.HasSuffix(name, "EventNS") { + if len(node.Values) == 1 { + v := node.Values[0] + if l, ok := v.(*ast.BasicLit); ok { + ns := strings.ReplaceAll(l.Value, `"`, "") + eventType := strings.TrimSuffix(name, "NS") + self.namespaces[eventType] = ns + } + } + } + } +} + +func (self *visitor) parseTags(s string) ([]*Tag, error) { + var result []*Tag + s = strings.ReplaceAll(s, "`", "") + tagStrings := strings.Split(s, " ") + for _, tagString := range tagStrings { + if strings.TrimSpace(tagString) == "" { + continue + } + parts := strings.Split(tagString, ":") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid tag: %s", s) + } + key := strings.TrimSpace(parts[0]) + valuesString := strings.ReplaceAll(parts[1], `"`, "") + values := strings.Split(valuesString, ",") + result = append(result, &Tag{ + key: key, + values: values, + }) + } + return result, nil +} + +func (self *visitor) validate() bool { + var hasErrors bool + extraTypes := make(map[string]struct{}) + for _, typeDef := range self.eventTypes { + if typeDef.HasErrors() { + hasErrors = true + } + for k := range typeDef.extraTypes { + extraTypes[k] = struct{}{} + } + } + + for k := range extraTypes { + typeDef := self.extraTypes[k] + if typeDef.HasErrors() { + hasErrors = true + } + } + + return hasErrors +} + +func main() { + if len(os.Args) < 2 { + panic(errors.New("GOFILE environment variable not set and no filename passed in")) + } + fileName := os.Args[1] + + fileSet := token.NewFileSet() + pkgMap, err := parser.ParseDir(fileSet, fileName, nil, parser.ParseComments) + if err != nil { + panic(err) + } + + v := &visitor{ + eventTypes: map[string]*TypeDef{}, + extraTypes: map[string]*TypeDef{}, + namespaces: map[string]string{}, + typeMappings: map[string]string{}, + } + + for _, pkg := range pkgMap { + ast.Inspect(pkg, v.GetTypeMappingsAndAliases) + } + + for _, pkg := range pkgMap { + ast.Inspect(pkg, v.VisitOtherTypesFirstPass) + } + + for _, pkg := range pkgMap { + ast.Inspect(pkg, v.VisitOtherTypes) + } + + for _, pkg := range pkgMap { + ast.Inspect(pkg, v.VisitEventTypes) + } + + v.postProcess() + v.validate() + + var eventTypes []string + for k := range v.eventTypes { + eventTypes = append(eventTypes, k) + } + sort.Strings(eventTypes) + + fmt.Printf(`--- +title: Events +--- + +# Events + +## Introduction +The controller can emit many kinds of events, useful for monitoring, management and integration with other systems. +They can be enabled in the controller configuration. + +### Common Fields + +All events have the following fields: + +| Type | Description | Type | +|------|-------------|----------| +| **namespace** | The name indicating the overall event type | string | +| **timestamp** | The date/time when the event was generated | timestamp | +| **event_src_id** | The id of the controller which emitted the event | string | + +### Time Related Types + +| Type | Description | Examples | +|------|-------------|----------| +| **timestamp** | RFC3339 formatted timestamp string | "2024-10-02T12:17:39.501821249-04:00" | +| **duration** | Number representing a duration in nanoseconds | 104100 | + +## Event Types + +`) + + for _, eventType := range eventTypes { + def := v.eventTypes[eventType] + title := def.GetTitle() + fmt.Printf("* [%s](#%s)\n", title, strings.ToLower(title)) + } + fmt.Printf("\n\n") + + fmt.Printf(` +## Event Configuration + +For a complete event configuration reference, please refer to the +[controller event configuration](configuration/controller#events). + +**Note**: Many namespaces changed in OpenZiti v1.4.0. Old namespaces are noted below. + +Example Configuration +`) + fmt.Printf("\n```\n") + fmt.Printf(`events: + jsonLogger: + subscriptions: +`) + ns := map[string]struct{}{} + + for _, eventType := range v.eventTypes { + ns[eventType.namespace] = struct{}{} + } + + for _, eventType := range eventTypes { + def := v.eventTypes[eventType] + if _, ok := ns[def.namespace]; ok { + fmt.Printf(" - %s\n", def.namespace) + delete(ns, def.namespace) + } + } + + fmt.Printf("```\n\n") + + for _, eventType := range eventTypes { + def := v.eventTypes[eventType] + fmt.Print(def.GetDoc(v)) + fmt.Println() + } +} + +type Tag struct { + key string + values []string +} diff --git a/zititest/zitilab/runlevel/5_operation/circuit_metrics.go b/zititest/zitilab/runlevel/5_operation/circuit_metrics.go index bab5bc387..b832121f1 100644 --- a/zititest/zitilab/runlevel/5_operation/circuit_metrics.go +++ b/zititest/zitilab/runlevel/5_operation/circuit_metrics.go @@ -50,7 +50,7 @@ func (self *circuitMetrics) Execute(run model.Run) error { streamEventsRequest := map[string]interface{}{ "format": "json", - "subscriptions": []*event.Subscription{{Type: event.CircuitEventsNs}}, + "subscriptions": []*event.Subscription{{Type: event.CircuitEventNS}}, } msgBytes, err := json.Marshal(streamEventsRequest) diff --git a/zititest/zitilab/runlevel/5_operation/model_metrics.go b/zititest/zitilab/runlevel/5_operation/model_metrics.go index 632d9df9d..ada0c0ecf 100644 --- a/zititest/zitilab/runlevel/5_operation/model_metrics.go +++ b/zititest/zitilab/runlevel/5_operation/model_metrics.go @@ -63,7 +63,7 @@ func (self *modelMetrics) Execute(run model.Run) error { streamEventsRequest := map[string]interface{}{ "format": "json", - "subscriptions": []*event.Subscription{{Type: event.MetricsEventsNs}}, + "subscriptions": []*event.Subscription{{Type: event.MetricsEventNS}}, } msgBytes, err := json.Marshal(streamEventsRequest)