Skip to content
Christian Häusler edited this page Apr 22, 2019 · 4 revisions
Note

PR #65 requested to add support for FastCGI. This requires changes to the architecture and this page is designated as the working place for the design process.

Components
Config

Collects configuration from different sources and makes them available to the other components.

The sources with precedence will be:

  1. command line options and arguments

  2. environment variables

  3. configuration file

  4. defaults

Logger

Provides logging services to all other components. It will be an implementation of github.com/bketelsen/logr.

By default, errors will be written to STDERR and informational stuff to STDOUT in the logfmt format.

Other output locations (files) and formats (JSON) might be configurable.

Consumer

Subscribes to a queue and passes received messages to a worker implementation. It also will handle message acknowledgement according to the workers return value.

Worker

A worker takes care of processing a message. There will be several implementations.

// Worker takes care of the heavy lifting to be done for each consumed message.
// The implementation must be save to be called concurrently.
//
// If the worker fails to process a message, but it is confident future
// processing can be successful, an error shall be returned. If processing will
// not be possible any more, the worker shall panic.
//
// The logger shall not be used for error handling but for informational
// purposes only. When capturing the output of the actual worker scripts, its
// STDOUT should be logged as error.
type Worker func(attr Attributes, payload io.Reader, log logr.Logger) (Acknowledgment, error)

// Attributes is a data transfer object holding the attributes of an AMQP message.
type Attributes interface {
	AppId() string
	ConsumerTag() string
	ContentEncoding() string
	ContentType() string
	CorrelationId() string
	DeliveryMode() uint8
	DeliveryTag() uint64
	Exchange() string
	Expiration() string
	Headers() map[string]interface{}
	MessageId() string
	Priority() uint8
	Redelivered() bool
	ReplyTo() string
	RoutingKey() string
	Timestamp() time.Time
	Type() string
	UserId() string

	// The JSON serialized representation of this DTO.
	JSON() io.Reader
}

// Acknowledgment represent the status of a message processing and indicates to
// the caller of a worker how the message has t obe acknowledged.
type Acknowledgment uint8

const (
	// Ack indicates the message has been successfully processed and can be
	// removed from the queue.
	Ack Acknowledgment = iota

	// Reject indicates the message could not be processed and never can and
	// therefore needs to be removed from the queue.
	Reject Acknowledgment = iota

	// Requeue indicates the message could not be processed but might be in
	// future and therefore should be redelivered to another consumer.
	Requeue Acknowledgment = iota
)
exec

Executes a script as it was done so far. Passing the payload as argument will go away in favour of pipes.

FastCGI

Delegates to a CGI server.

Clone this wiki locally