Skip to content

Commit

Permalink
Merge branch 'master' into tarak/go1.20-rand-prep
Browse files Browse the repository at this point in the history
  • Loading branch information
tarakby authored Jul 14, 2023
2 parents a2f5f4b + 117fcc0 commit 8e21991
Show file tree
Hide file tree
Showing 21 changed files with 665 additions and 39 deletions.
14 changes: 9 additions & 5 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,11 +1191,15 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
return nil, fmt.Errorf("could not create connection manager: %w", err)
}

meshTracer := tracer.NewGossipSubMeshTracer(
builder.Logger,
networkMetrics,
builder.IdentityProvider,
builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval)
meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: builder.Logger,
Metrics: networkMetrics,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

libp2pNode, err := p2pbuilder.NewNodeBuilder(
builder.Logger,
Expand Down
14 changes: 9 additions & 5 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,11 +702,15 @@ func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
pis = append(pis, pi)
}

meshTracer := tracer.NewGossipSubMeshTracer(
builder.Logger,
builder.Metrics.Network,
builder.IdentityProvider,
builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval)
meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: builder.Logger,
Metrics: builder.Metrics.Network,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

node, err := p2pbuilder.NewNodeBuilder(
builder.Logger,
Expand Down
3 changes: 3 additions & 0 deletions config/default-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ network-config:
# The default interval at which the gossipsub score tracer logs the peer scores. This is used for debugging and forensics purposes.
# Note that we purposefully choose this logging interval high enough to avoid spamming the logs.
gossipsub-score-tracer-interval: 1m
# The default RPC sent tracker cache size. The RPC sent tracker is used to track RPC control messages sent from the local node.
# Note: this cache size must be large enough to keep a history of sent messages in a reasonable time window of past history.
gossipsub-rpc-sent-tracker-cache-size: 1_000_000
# Peer scoring is the default value for enabling peer scoring
gossipsub-peer-scoring-enabled: true
# Gossipsub rpc inspectors configs
Expand Down
1 change: 0 additions & 1 deletion engine/Readme.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Notifier

The Notifier implements the following state machine
![Notifier State Machine](/docs/NotifierStateMachine.png)

Expand Down
170 changes: 170 additions & 0 deletions engine/verification/Readme.md

Large diffs are not rendered by default.

Binary file added engine/verification/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 9 additions & 5 deletions follower/follower_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,15 @@ func (builder *FollowerServiceBuilder) initPublicLibp2pNode(networkKey crypto.Pr
pis = append(pis, pi)
}

meshTracer := tracer.NewGossipSubMeshTracer(
builder.Logger,
builder.Metrics.Network,
builder.IdentityProvider,
builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval)
meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: builder.Logger,
Metrics: builder.Metrics.Network,
IDProvider: builder.IdentityProvider,
LoggerInterval: builder.FlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
RpcSentTrackerCacheSize: builder.FlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

node, err := p2pbuilder.NewNodeBuilder(
builder.Logger,
Expand Down
9 changes: 9 additions & 0 deletions module/metrics/herocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ func GossipSubRPCInspectorQueueMetricFactory(f HeroCacheMetricsFactory, networkT
return f(namespaceNetwork, r)
}

func GossipSubRPCSentTrackerMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics {
// we don't use the public prefix for the metrics here for sake of backward compatibility of metric name.
r := ResourceNetworkingRPCSentTrackerCache
if networkType == network.PublicNetwork {
r = PrependPublicPrefix(r)
}
return f(namespaceNetwork, r)
}

func RpcInspectorNotificationQueueMetricFactory(f HeroCacheMetricsFactory, networkType network.NetworkingType) module.HeroCacheMetrics {
r := ResourceNetworkingRpcInspectorNotificationQueue
if networkType == network.PublicNetwork {
Expand Down
1 change: 1 addition & 0 deletions module/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ const (
ResourceNetworkingApplicationLayerSpamReportQueue = "application_layer_spam_report_queue"
ResourceNetworkingRpcClusterPrefixReceivedCache = "rpc_cluster_prefixed_received_cache"
ResourceNetworkingDisallowListCache = "disallow_list_cache"
ResourceNetworkingRPCSentTrackerCache = "gossipsub_rpc_sent_tracker_cache"

ResourceFollowerPendingBlocksCache = "follower_pending_block_cache" // follower engine
ResourceFollowerLoopCertifiedBlocksChannel = "follower_loop_certified_blocks_channel" // follower loop, certified blocks buffered channel
Expand Down
15 changes: 10 additions & 5 deletions network/internal/p2pfixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,16 @@ func CreateNode(t *testing.T, networkKey crypto.PrivateKey, sporkID flow.Identif
idProvider := id.NewFixedIdentityProvider(nodeIds)
defaultFlowConfig, err := config.DefaultConfig()
require.NoError(t, err)
meshTracer := tracer.NewGossipSubMeshTracer(
logger,
metrics.NewNoopCollector(),
idProvider,
defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval)

meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: logger,
Metrics: metrics.NewNoopCollector(),
IDProvider: idProvider,
LoggerInterval: defaultFlowConfig.NetworkConfig.GossipSubConfig.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.NewNoopCollector(),
RpcSentTrackerCacheSize: defaultFlowConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

builder := p2pbuilder.NewNodeBuilder(
logger,
Expand Down
10 changes: 6 additions & 4 deletions network/netconf/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ const (
gracePeriod = "libp2p-grace-period"
silencePeriod = "libp2p-silence-period"
// gossipsub
peerScoring = "gossipsub-peer-scoring-enabled"
localMeshLogInterval = "gossipsub-local-mesh-logging-interval"
scoreTracerInterval = "gossipsub-score-tracer-interval"
peerScoring = "gossipsub-peer-scoring-enabled"
localMeshLogInterval = "gossipsub-local-mesh-logging-interval"
rpcSentTrackerCacheSize = "gossipsub-rpc-sent-tracker-cache-size"
scoreTracerInterval = "gossipsub-score-tracer-interval"
// gossipsub validation inspector
gossipSubRPCInspectorNotificationCacheSize = "gossipsub-rpc-inspector-notification-cache-size"
validationInspectorNumberOfWorkers = "gossipsub-rpc-validation-inspector-workers"
Expand All @@ -66,7 +67,7 @@ func AllFlagNames() []string {
return []string{
networkingConnectionPruning, preferredUnicastsProtocols, receivedMessageCacheSize, peerUpdateInterval, unicastMessageTimeout, unicastCreateStreamRetryDelay,
dnsCacheTTL, disallowListNotificationCacheSize, dryRun, lockoutDuration, messageRateLimit, bandwidthRateLimit, bandwidthBurstLimit, memoryLimitRatio,
fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, scoreTracerInterval,
fileDescriptorsRatio, peerBaseLimitConnsInbound, highWatermark, lowWatermark, gracePeriod, silencePeriod, peerScoring, localMeshLogInterval, rpcSentTrackerCacheSize, scoreTracerInterval,
gossipSubRPCInspectorNotificationCacheSize, validationInspectorNumberOfWorkers, validationInspectorInspectMessageQueueCacheSize, validationInspectorClusterPrefixedTopicsReceivedCacheSize,
validationInspectorClusterPrefixedTopicsReceivedCacheDecay, validationInspectorClusterPrefixHardThreshold, ihaveSyncSampleSizePercentage, ihaveAsyncSampleSizePercentage,
ihaveMaxSampleSize, metricsInspectorNumberOfWorkers, metricsInspectorCacheSize, alspDisabled, alspSpamRecordCacheSize, alspSpamRecordQueueSize, alspHearBeatInterval,
Expand Down Expand Up @@ -107,6 +108,7 @@ func InitializeNetworkFlags(flags *pflag.FlagSet, config *Config) {
flags.Bool(peerScoring, config.GossipSubConfig.PeerScoring, "enabling peer scoring on pubsub network")
flags.Duration(localMeshLogInterval, config.GossipSubConfig.LocalMeshLogInterval, "logging interval for local mesh in gossipsub")
flags.Duration(scoreTracerInterval, config.GossipSubConfig.ScoreTracerInterval, "logging interval for peer score tracer in gossipsub, set to 0 to disable")
flags.Uint32(rpcSentTrackerCacheSize, config.GossipSubConfig.RPCSentTrackerCacheSize, "cache size of the rpc sent tracker used by the gossipsub mesh tracer.")
// gossipsub RPC control message validation limits used for validation configuration and rate limiting
flags.Int(validationInspectorNumberOfWorkers, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.NumberOfWorkers, "number of gossupsub RPC control message validation inspector component workers")
flags.Uint32(validationInspectorInspectMessageQueueCacheSize, config.GossipSubConfig.GossipSubRPCInspectorsConfig.GossipSubRPCValidationInspectorConfigs.CacheSize, "cache size for gossipsub RPC validation inspector events worker pool queue.")
Expand Down
4 changes: 1 addition & 3 deletions network/p2p/inspector/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ type RecordCache struct {

// NewRecordCache creates a new *RecordCache.
// Args:
// - sizeLimit: the maximum number of records that the cache can hold.
// - logger: the logger used by the cache.
// - collector: the metrics collector used by the cache.
// - config: record cache config.
// - recordEntityFactory: a factory function that creates a new spam record.
// Returns:
// - *RecordCache, the created cache.
Expand Down
12 changes: 11 additions & 1 deletion network/p2p/p2pbuilder/libp2pNodeBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
flownet "github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/netconf"
"github.com/onflow/flow-go/network/p2p"
Expand Down Expand Up @@ -494,7 +495,16 @@ func DefaultNodeBuilder(
builder.EnableGossipSubPeerScoring(nil)
}

meshTracer := tracer.NewGossipSubMeshTracer(logger, metricsCfg.Metrics, idProvider, gossipCfg.LocalMeshLogInterval)
meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: logger,
Metrics: metricsCfg.Metrics,
IDProvider: idProvider,
LoggerInterval: gossipCfg.LocalMeshLogInterval,
RpcSentTrackerCacheCollector: metrics.GossipSubRPCSentTrackerMetricFactory(metricsCfg.HeroCacheFactory, flownet.PrivateNetwork),
RpcSentTrackerCacheSize: gossipCfg.RPCSentTrackerCacheSize,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)

builder.SetGossipSubTracer(meshTracer)
builder.SetGossipSubScoreTracerInterval(gossipCfg.ScoreTracerInterval)

Expand Down
2 changes: 2 additions & 0 deletions network/p2p/p2pconf/gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type GossipSubConfig struct {
LocalMeshLogInterval time.Duration `mapstructure:"gossipsub-local-mesh-logging-interval"`
// ScoreTracerInterval is the interval at which the score tracer logs the peer scores.
ScoreTracerInterval time.Duration `mapstructure:"gossipsub-score-tracer-interval"`
// RPCSentTrackerCacheSize cache size of the rpc sent tracker used by the gossipsub mesh tracer.
RPCSentTrackerCacheSize uint32 `mapstructure:"gossipsub-rpc-sent-tracker-cache-size"`
// PeerScoring is whether to enable GossipSub peer scoring.
PeerScoring bool `mapstructure:"gossipsub-peer-scoring-enabled"`
}
40 changes: 31 additions & 9 deletions network/p2p/tracer/gossipSubMeshTracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/network/p2p/tracer/internal"
"github.com/onflow/flow-go/utils/logging"
)

Expand Down Expand Up @@ -43,23 +44,35 @@ type GossipSubMeshTracer struct {
idProvider module.IdentityProvider
loggerInterval time.Duration
metrics module.GossipSubLocalMeshMetrics
rpcSentTracker *internal.RPCSentTracker
}

var _ p2p.PubSubTracer = (*GossipSubMeshTracer)(nil)

func NewGossipSubMeshTracer(
logger zerolog.Logger,
metrics module.GossipSubLocalMeshMetrics,
idProvider module.IdentityProvider,
loggerInterval time.Duration) *GossipSubMeshTracer {
type GossipSubMeshTracerConfig struct {
Logger zerolog.Logger
Metrics module.GossipSubLocalMeshMetrics
IDProvider module.IdentityProvider
LoggerInterval time.Duration
RpcSentTrackerCacheCollector module.HeroCacheMetrics
RpcSentTrackerCacheSize uint32
}

// NewGossipSubMeshTracer creates a new *GossipSubMeshTracer.
// Args:
// - *GossipSubMeshTracerConfig: the mesh tracer config.
// Returns:
// - *GossipSubMeshTracer: new mesh tracer.
func NewGossipSubMeshTracer(config *GossipSubMeshTracerConfig) *GossipSubMeshTracer {
rpcSentTracker := internal.NewRPCSentTracker(config.Logger, config.RpcSentTrackerCacheSize, config.RpcSentTrackerCacheCollector)
g := &GossipSubMeshTracer{
RawTracer: NewGossipSubNoopTracer(),
topicMeshMap: make(map[string]map[peer.ID]struct{}),
idProvider: idProvider,
metrics: metrics,
logger: logger.With().Str("component", "gossip_sub_topology_tracer").Logger(),
loggerInterval: loggerInterval,
idProvider: config.IDProvider,
metrics: config.Metrics,
logger: config.Logger.With().Str("component", "gossipsub_topology_tracer").Logger(),
loggerInterval: config.LoggerInterval,
rpcSentTracker: rpcSentTracker,
}

g.Component = component.NewComponentManagerBuilder().
Expand Down Expand Up @@ -139,6 +152,15 @@ func (t *GossipSubMeshTracer) Prune(p peer.ID, topic string) {
lg.Info().Hex("flow_id", logging.ID(id.NodeID)).Str("role", id.Role.String()).Msg("pruned peer")
}

// SendRPC is called when a RPC is sent. Currently, the GossipSubMeshTracer tracks iHave RPC messages that have been sent.
// This function can be updated to track other control messages in the future as required.
func (t *GossipSubMeshTracer) SendRPC(rpc *pubsub.RPC, _ peer.ID) {
switch {
case len(rpc.GetControl().GetIhave()) > 0:
t.rpcSentTracker.OnIHaveRPCSent(rpc.GetControl().GetIhave())
}
}

// logLoop logs the mesh peers of the local node for each topic at a regular interval.
func (t *GossipSubMeshTracer) logLoop(ctx irrecoverable.SignalerContext) {
ticker := time.NewTicker(t.loggerInterval)
Expand Down
14 changes: 13 additions & 1 deletion network/p2p/tracer/gossipSubMeshTracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/onflow/flow-go/config"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
mockmodule "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/network/p2p"
Expand All @@ -29,6 +31,8 @@ import (
// One of the nodes is running with an unknown peer id, which the identity provider is mocked to return an error and
// the mesh tracer should log a warning message.
func TestGossipSubMeshTracer(t *testing.T) {
defaultConfig, err := config.DefaultConfig()
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
sporkId := unittest.IdentifierFixture()
Expand Down Expand Up @@ -61,7 +65,15 @@ func TestGossipSubMeshTracer(t *testing.T) {
// we only need one node with a meshTracer to test the meshTracer.
// meshTracer logs at 1 second intervals for sake of testing.
collector := mockmodule.NewGossipSubLocalMeshMetrics(t)
meshTracer := tracer.NewGossipSubMeshTracer(logger, collector, idProvider, 1*time.Second)
meshTracerCfg := &tracer.GossipSubMeshTracerConfig{
Logger: logger,
Metrics: collector,
IDProvider: idProvider,
LoggerInterval: time.Second,
RpcSentTrackerCacheCollector: metrics.NewNoopCollector(),
RpcSentTrackerCacheSize: defaultConfig.NetworkConfig.GossipSubConfig.RPCSentTrackerCacheSize,
}
meshTracer := tracer.NewGossipSubMeshTracer(meshTracerCfg)
tracerNode, tracerId := p2ptest.NodeFixture(
t,
sporkId,
Expand Down
86 changes: 86 additions & 0 deletions network/p2p/tracer/internal/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package internal

import (
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
herocache "github.com/onflow/flow-go/module/mempool/herocache/backdata"
"github.com/onflow/flow-go/module/mempool/herocache/backdata/heropool"
"github.com/onflow/flow-go/module/mempool/stdmap"
p2pmsg "github.com/onflow/flow-go/network/p2p/message"
)

// rpcCtrlMsgSentCacheConfig configuration for the rpc sent cache.
type rpcCtrlMsgSentCacheConfig struct {
logger zerolog.Logger
sizeLimit uint32
collector module.HeroCacheMetrics
}

// rpcSentCache cache that stores rpcSentEntity. These entity's represent RPC control messages sent from the local node.
type rpcSentCache struct {
// c is the underlying cache.
c *stdmap.Backend
}

// newRPCSentCache creates a new *rpcSentCache.
// Args:
// - config: record cache config.
// Returns:
// - *rpcSentCache: the created cache.
// Note that this cache is intended to track control messages sent by the local node,
// it stores a RPCSendEntity using an Id which should uniquely identifies the message being tracked.
func newRPCSentCache(config *rpcCtrlMsgSentCacheConfig) *rpcSentCache {
backData := herocache.NewCache(config.sizeLimit,
herocache.DefaultOversizeFactor,
heropool.LRUEjection,
config.logger.With().Str("mempool", "gossipsub-rpc-control-messages-sent").Logger(),
config.collector)
return &rpcSentCache{
c: stdmap.NewBackend(stdmap.WithBackData(backData)),
}
}

// add initializes the record cached for the given messageEntityID if it does not exist.
// Returns true if the record is initialized, false otherwise (i.e.: the record already exists).
// Args:
// - topic: the topic ID.
// - messageId: the message ID.
// - controlMsgType: the rpc control message type.
// Returns:
// - bool: true if the record is initialized, false otherwise (i.e.: the record already exists).
// Note that if add is called multiple times for the same messageEntityID, the record is initialized only once, and the
// subsequent calls return false and do not change the record (i.e.: the record is not re-initialized).
func (r *rpcSentCache) add(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool {
return r.c.Add(newRPCSentEntity(r.rpcSentEntityID(topic, messageId, controlMsgType), controlMsgType))
}

// has checks if the RPC message has been cached indicating it has been sent.
// Args:
// - topic: the topic ID.
// - messageId: the message ID.
// - controlMsgType: the rpc control message type.
// Returns:
// - bool: true if the RPC has been cache indicating it was sent from the local node.
func (r *rpcSentCache) has(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) bool {
return r.c.Has(r.rpcSentEntityID(topic, messageId, controlMsgType))
}

// size returns the number of records in the cache.
func (r *rpcSentCache) size() uint {
return r.c.Size()
}

// rpcSentEntityID creates an entity ID from the topic, messageID and control message type.
// Args:
// - topic: the topic ID.
// - messageId: the message ID.
// - controlMsgType: the rpc control message type.
// Returns:
// - flow.Identifier: the entity ID.
func (r *rpcSentCache) rpcSentEntityID(topic string, messageId string, controlMsgType p2pmsg.ControlMessageType) flow.Identifier {
return flow.MakeIDFromFingerPrint([]byte(fmt.Sprintf("%s%s%s", topic, messageId, controlMsgType)))
}
Loading

0 comments on commit 8e21991

Please sign in to comment.