A comprehensive, production-ready Go library that provides a unified interface for publish-subscribe messaging across multiple cloud providers and messaging services. This library abstracts the complexity of different pub/sub implementations while providing advanced features like batch publishing, health monitoring, and graceful shutdown.
- Universal Interface: Single API that works with multiple pub/sub providers
- Single & Batch Publishing: Publish individual messages or batches for improved performance
- Asynchronous Consumption: Non-blocking message consumption with multiple listeners
- Message Acknowledgment: Reliable message processing with acknowledgment support
- Health Monitoring: Built-in health checks and connectivity monitoring
- Graceful Shutdown: Proper resource cleanup and coordinated shutdown
- β PubNub: Real-time messaging with global distribution
- β Google Cloud Pub/Sub: Scalable, fully-managed messaging service
- π More providers coming soon (AWS SNS/SQS, Azure Service Bus, Apache Kafka)
- Connection Management: Automatic reconnection with configurable backoff policies
- Error Handling: Comprehensive error categorization and monitoring
- Structured Logging: Integration with structured logging libraries
- Metrics Collection: Detailed publishing and consumption metrics
- Concurrent Processing: Optimized for high-throughput scenarios
- Production Ready: Battle-tested patterns for enterprise environments
go get github.com/piyushkumar96/generic-pubsub
package main
import (
"context"
"log"
ae "github.com/piyushkumar96/app-error"
l "github.com/piyushkumar96/generic-logger"
pubsub "github.com/piyushkumar96/generic-pubsub"
)
func main() {
// Create error channel
errorChan := make(chan *ae.AppError, 5)
// Create PubNub client
client := pubsub.NewPubNubClient(&pubsub.PubNubOptions{
UUID: "your-unique-client-id",
PublishKey: "your-publish-key",
SubscribeKey: "your-subscribe-key",
PublishChannel: "my-channel",
SubscriptionList: []string{"my-channel"},
IsSSLSecure: true,
MaxRetry: 3,
}, l.Logger, errorChan)
ctx := context.Background()
// Publish a message
message := []byte(`{"hello": "world"}`)
txnData, err := client.Publish(ctx, message)
if err != nil {
log.Fatalf("Failed to publish: %v", err)
}
log.Printf("Published message ID: %s", txnData.EventID)
}
package main
import (
"context"
"log"
ae "github.com/piyushkumar96/app-error"
l "github.com/piyushkumar96/generic-logger"
pubsub "github.com/piyushkumar96/generic-pubsub"
)
func main() {
// Create error channel
errorChan := make(chan *ae.AppError, 5)
// Create GCP client
client, err := pubsub.NewGCPPubSubClient(context.Background(), &pubsub.GCPPubSubOptions{
ProjectID: "your-gcp-project",
TopicID: "your-topic",
SubscriptionID: "your-subscription",
CredsFilePath: "/path/to/service-account.json",
}, l.Logger, errorChan)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer client.Teardown(context.Background())
// Publish a message
message := []byte(`{"hello": "world"}`)
txnData, err := client.Publish(context.Background(), message)
if err != nil {
log.Fatalf("Failed to publish: %v", err)
}
log.Printf("Published message ID: %s", txnData.EventID)
}
We provide detailed examples for different use cases and complexity levels:
- PubNub Example: Complete PubNub implementation with advanced features
- GCP Pub/Sub Example: Enterprise-grade GCP Pub/Sub implementation
Feature | PubNub Example | GCP Pub/Sub Example |
---|---|---|
Single Message Publishing | β | β |
Batch Message Publishing | β (5 messages) | β (8 messages) |
Multiple Listeners | β (2 listeners) | β (2 listeners) |
Message Acknowledgment | β Message Actions | β Native Ack |
Health Monitoring | β Heartbeat | β Topic Existence |
Graceful Shutdown | β Signal Handling | β Coordinated Cleanup |
Error Monitoring | β Categorized | β Production Patterns |
Metrics Collection | β Detailed Stats | β Batch Analytics |
type IPubSub interface {
// Publish sends a single message
Publish(ctx context.Context, msg []byte) (EventTxnData, *ae.AppError)
// PublishBatch sends multiple messages
PublishBatch(ctx context.Context, msgs [][]byte) ([]EventTxnData, *ae.AppError)
// Listen starts consuming messages
Listen(ctx context.Context) *ae.AppError
// ListenWithWait integrates with WaitGroup
ListenWithWait(ctx context.Context, wg *sync.WaitGroup) *ae.AppError
// AcknowledgeMessage confirms message processing
AcknowledgeMessage(ctx context.Context, msgID string) *ae.AppError
// CheckHealth verifies client connectivity
CheckHealth(ctx context.Context) (bool, *ae.AppError)
// Teardown cleans up resources
Teardown(ctx context.Context)
}
Contains comprehensive metadata about published messages:
type EventTxnData struct {
EventID string // Unique message identifier
SequenceNo int // Position in batch (for batch operations)
IsPublished bool // Success status
MessageSizeInBytes int // Message payload size
TimeTakenToPublish time.Duration // Publishing latency
Timestamp int64 // Unix timestamp
Error error // Error if publishing failed
}
Represents received messages with metadata:
type ConsumedMessage struct {
Data []byte // Raw message payload
Meta map[string]interface{} // Provider-specific metadata
}
type PubNubOptions struct {
UUID string // Client identifier
PublishKey string // PubNub publish key
SubscribeKey string // PubNub subscribe key
SecretKey string // Optional secret key
IsSSLSecure bool // Enable SSL/TLS
PublishChannel string // Default publish channel
SubscriptionList []string // Channels to subscribe to
BackoffPolicy PubnubReconnectBackoff // Reconnection strategy
ConnectTimeoutInSec int // Connection timeout
MaxRetry int32 // Max retry attempts
CloseListenersOnExit bool // Auto-close on exit
EnableDebugMode bool // Debug logging
}
type GCPPubSubOptions struct {
ProjectID string // GCP project identifier
TopicID string // Pub/Sub topic name
SubscriptionID string // Subscription name
CredsFilePath string // Service account key path
EnableDebugMode bool // Debug logging
}
- API Keys: Use publish/subscribe keys from PubNub console
- SSL/TLS: Enable
IsSSLSecure
for encrypted connections - Secret Keys: Optional additional security layer
- Service Account: Use JSON key file for authentication
- IAM Permissions: Ensure account has
pubsub.editor
or appropriate roles - Network Security: Configure VPC and firewall rules as needed
The library provides comprehensive error categorization:
PublishEvent
: Single message publishing failurePublishEventBatch
: Batch publishing failureContextCancelled
: Operation cancelledHealthCheck
: Health monitoring failure
EmptySubscriptionList
: No channels configuredAlreadyListening
: Client already in listening statePNCliBadRequest
: Invalid request parametersPNCliAccessDenied
: Authentication failure
GCPPSClientInit
: Client initialization failureGCPPSReceiveEvent
: Message receiving failureGCPPSClientTeardown
: Cleanup failure
- Publishing Metrics: Message size, latency, success rates
- Consumption Metrics: Processing time, acknowledgment rates
- Connection Metrics: Health status, reconnection events
- Error Metrics: Categorized error counts and rates
The library integrates with structured logging libraries:
import l "github.com/piyushkumar96/generic-logger"
// All operations include structured logging
client := pubsub.NewPubNubClient(options, l.Logger, errorChan)
// Always use context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Proper cleanup
defer client.Teardown(ctx)
// Monitor errors in separate goroutine
go func() {
for err := range errorChan {
log.Printf("Pub/Sub Error: %s (Code: %s)", err.GetMsg(), err.GetErrCode())
// Send to monitoring system, implement retry logic, etc.
}
}()
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
select {
case <-sigChan:
log.Println("Shutdown signal received")
client.Teardown(ctx)
}
The library includes comprehensive mocks for testing:
import "github.com/piyushkumar96/generic-pubsub/mocks"
func TestMyService(t *testing.T) {
mockPubSub := mocks.NewMockIPubSub(t)
mockPubSub.EXPECT().Publish(mock.Anything, mock.Anything).Return(
pubsub.EventTxnData{EventID: "test-id", IsPublished: true}, nil)
// Test your service with the mock
}
We welcome contributions! Here's how you can help:
- Fork the Repository
- Create a Feature Branch:
git checkout -b feature/amazing-feature
- Commit Changes:
git commit -m 'Add amazing feature'
- Push to Branch:
git push origin feature/amazing-feature
- Open a Pull Request
- Write Tests: Ensure good test coverage for new features
- Document Code: Add comprehensive comments and documentation
- Follow Conventions: Use existing code style and patterns
- Update Examples: Add examples for new features
- Go: Version 1.21 or higher
- Dependencies: See go.mod for complete list
cloud.google.com/go/pubsub
- GCP Pub/Sub clientgithub.com/pubnub/go/v7
- PubNub Go SDKgithub.com/piyushkumar96/app-error
- Structured error handlinggithub.com/piyushkumar96/generic-logger
- Logging interface
- PubNub: Up to 1000 messages/second per client
- GCP Pub/Sub: Up to 10,000 messages/second per client (batch mode)
- PubNub: ~50-100ms global latency
- GCP Pub/Sub: ~10-50ms within region
- Memory: ~10MB base + ~1KB per queued message
- CPU: Low overhead with goroutine-based concurrency
- Network: Efficient batching reduces connection overhead
- AWS SNS/SQS Support: Amazon messaging services
- Azure Service Bus: Microsoft cloud messaging
- Apache Kafka: Distributed streaming platform
- Redis Streams: Redis-based messaging
- NATS: Cloud-native messaging system
- Circuit Breaker: Fault tolerance patterns
- Rate Limiting: Built-in rate limiting capabilities
- Message Encryption: End-to-end encryption support
- Schema Registry: Message schema validation
- Dead Letter Queues: Failed message handling
This project is licensed under the MIT License - see the LICENSE file for details.
- PubNub Team: For providing excellent real-time messaging infrastructure
- Google Cloud: For the robust and scalable Pub/Sub service
- Go Community: For the amazing ecosystem and tools
- Contributors: Everyone who has contributed to making this library better
- Documentation: Check our comprehensive examples and API reference
- Issues: Report bugs or request features via GitHub Issues
- Discussions: Join community discussions for help and best practices
Made with β€οΈ for the Go community