Skip to content
This repository has been archived by the owner on Oct 25, 2021. It is now read-only.
/ quark Public archive

⚡ A Reliable and fully customizable event router for Event-Driven systems written in Go.

License

Notifications You must be signed in to change notification settings

NeutrinoCorp/quark

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

80 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

⚡ Quark GoDoc Build Status Coverage Status Go Version

Reliable Event-Driven mechanisms for reactive ecosystems written in Go.

Based on reliable mechanisms from companies such as Uber who serve 1M+ requests per-hour, Quark offers a fine-tuned set of tools to ease overall complexity.

In Addition, Quark fans-out processes per-consumer to parallelize blocking I/O tasks (as consuming from a queue/topic would be) and isolate them. Thread-safe and graceful shutdown are a very important part of Quark.

Furthermore, Quark stores specific data (e.g. event id, correlation id, span context, etc) into messages headers in binary format to ease disk consumption on the infrastructure and it lets users use their own encoding preferred library. (JSON, Apache Avro, etc.)

Therefore, Quark lets developers take advantage of those mechanisms with its default configuration and a gorilla/mux-like router/mux to keep them in ease and get befenits without complex configurations and handling. You can either choose use global configurations specified in the broker or use an specific configuration for an specific consumer.

We currently offer Apache Kafka and AWS SNS/SQS implementations, yet we will be adding more implementations such as RabbitMQ and NATS in a near future.

Installation

go get github.com/neutrinocorp/quark

Note that quark only supports the two most recent minor versions of Go.

Quick Start

Before we set up our consumers, we must define our Broker and its required configuration to work as desired.

The following example demostrates how to set up an Apache Kafka Broker with an error handler (hook).

When using Apache Kafka, Shopify/sarama package is required as we rely on its mechanisms.

// Create broker
b := quark.NewKafkaBroker(newSaramaCfg(), "localhost:9092")

b.ErrorHandler = func(ctx context.Context, err error) {
  log.Print(err)
}

Quark is very straight foward as is based on the net/http and gorilla/mux packages. This example demostrates how to listen to an asynchronous topic using the Topic function.

If no pool-size was specified, Quark will set up to 5 workers per-consumer node.

b.Topic("chat.1").HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  log.Print(e.Topic, e.RawValue)
  // publish messages to given topics
  _, _ = w.Write(e.Context, e.RawValue, "chat.2", "chat.3") // returns how many messages were published
  return true // this indicates if the consumer should mark the message or not (Ack or NAck)
})

Quark parallelize consumers tasks into a pool of workers using goroutines and executes graceful shutdown by default.

The pool size can be defined by the user with a PoolSize attribute.

b.Topic("chat.1").PoolSize(10).HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  log.Print(e.Topic, e.RawValue)
  // publish messages to given topics
  _, _ = w.Write(e.Context, e.RawValue, "chat.2", "chat.3")
  return true
})

Quark is based on reliable mechanisms. To make use of them, one needs to specify on either the Broker or on an specific topic.

This method relies on defaultEventWriter as it contains preconfigured reliable mechanisms to avoid message loops and more functionalities.

b.Topic("cosmos.payments").MaxRetries(3).RetryBackoff(time.Second*3).HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  // ... something failed in our processing
  if e.Body.Metadata.RedeliveryCount > 3 {
  	// executed if our message has been processed too much, send to Dead Letter Queue
  	w.Header().Set(quark.HeaderMessageRedeliveryCount, 0)
  	_, _ = w.Write(e.Context, e.RawValue, "dlq.cosmos.payment")
	return true
  }
  
  // publish messages to retry queue, message loop will be avoided by defaultEventWritter
  e.Body.Metadata.RedeliveryCount++
  w.Header().Set(quark.HeaderMessageRedeliveryCount, strconv.Itoa(e.Body.Metadata.RedeliveryCount))
  _, _ = w.Write(e.Context, e.RawValue, "retry.cosmos.payment")
  return true
})

To conclude, after setting up all of our consumers, we must start the Broker up to trigger and rise all the specified Consumer.

Dont forget to graceful shutdown as if you were shutting down a net/http server.

// graceful shutdown
stop := make(chan os.Signal)
signal.Notify(stop, os.Interrupt)
go func() {
  if err = b.ListenAndServe(); err != nil && err != quark.ErrBrokerClosed {
    log.Fatal(err)
  }
}()

<-stop

log.Printf("stopping %d nodes and %d workers", b.RunningNodes(), b.RunningWorkers())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

if err = b.Shutdown(ctx); err != nil {
  log.Fatal(err)
}

log.Print(b.RunningNodes(), b.RunningWorkers()) // should be 0,0

Advanced techniques

Grouping consumer nodes

One might want to take advantage of specific features like Apache Kafka's Consumer Group as this might help to process messages one-at-the-time.

This can be done using the default mux by setting the Group attribute.

By default, Quark uses Partition Consumer when using Apache Kafka.

b.Topic("chat.1").Group("awesome-group").HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  log.Print(e.Topic, e.RawValue)
  // publish messages to given topics
  _, _ = w.Write(e.Context, e.RawValue, "chat.2", "chat.3")
  return true
})

Fanning-in messages/queues into a single consumer

The following example demostrates how to do the current case using the mux and the Topics function.

When fan-in is configured, the Consumer must be inside a Group

b.Topics("chat.0", "chat.1").Group("chat-group").HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
  log.Print(e.Topic, e.RawValue)
  // publish messages to given topics
  _, _ = w.Write(e.Context, e.RawValue, "chat.2", "chat.3")
  return true
})

Custom publisher per-consumer

Say you were listening topics from Kafka, yet you want to publish the output into AWS SNS instead Kafka (specified in the global configuration).

The following example demostrates how to tackle the previous scenario with Quark.

Therefore, the use of Group is crucial here since Partition Consumer is treated as single unit of processing and it would publish the message N-times (the pool size since Consumer workers are running in parallel).

type AWSPublisher struct{}

func (a AWSPublisher) Publish(ctx context.Context, msgs ...*quark.Message) error {
	for _, msg := range msgs {
		log.Printf("publishing - message: %s", msg.Kind)
	}
	return nil
}

// ...

b.Topic("alex.trades").Group("alex.trades").Publisher(AWSPublisher{}).
  HandleFunc(func(w quark.EventWriter, e *quark.Event) bool {
    _, _ = w.Write(e.Context, []byte("alex has traded in a new index fund"),
      "aws.alex.trades", "aws.analytics.trades")
    return true
  })

See the documentation, examples and FAQ for more details.

Performance

For applications that log in the hot path, reflection-based serialization and string formatting are prohibitively expensive — they're CPU-intensive and make many small allocations. Put differently, using encoding/json and fmt.Fprintf to log tons of interface{}s makes your application slow.

Quark takes a different approach. It includes a reflection-free, zero-allocation JSON encoder, and the base Logger strives to avoid serialization overhead and allocations wherever possible. By building the high-level SugaredLogger on that foundation, zap lets users choose when they need to count every allocation and when they'd prefer a more familiar, loosely typed API.

As measured by its own benchmarking suite, not only is zap more performant than comparable structured logging packages — it's also faster than the standard library. Like all benchmarks, take these with a grain of salt.1

Supported Infrastructure

  • Apache Kafka
  • Amazon Web Services SNS and SQS (Separated or using both)

Maintenance

This library is currently maintained by

Development Status: Alpha

All APIs are under development, no breaking changes will be made in the 1.x series of releases. Users of semver-aware dependency management systems should pin quark to ^1.

Contributing

We encourage and support an active, healthy community of contributors — including you! Details are in the contribution guide and the code of conduct. The quark maintainers keep an eye on issues and pull requests, but you can also report any negative conduct to oss-conduct@neutrinocorp.org. That email list is a private, safe space; even the zap maintainers don't have access, so don't hesitate to hold us to a high standard.


Released under the MIT License.

1 In particular, keep in mind that we may be benchmarking against slightly older versions of other packages. Versions are pinned in the benchmarks/go.mod file.