package rabbitmq
|
|
|
|
import (
|
|
"time"
|
|
|
|
"github.com/streadway/amqp"
|
|
)
|
|
|
|
// Consumer allows you to create and connect to queues for data consumption.
|
|
type Consumer struct {
|
|
chManager *channelManager
|
|
logger logger
|
|
}
|
|
|
|
// ConsumerOptions are used to describe a consumer's configuration.
|
|
// Logging set to true will enable the consumer to print to stdout
|
|
type ConsumerOptions struct {
|
|
Logging bool
|
|
}
|
|
|
|
// Delivery captures the fields for a previously delivered message resident in
|
|
// a queue to be delivered by the server to a consumer from Channel.Consume or
|
|
// Channel.Get.
|
|
type Delivery struct {
|
|
amqp.Delivery
|
|
}
|
|
|
|
// GetConsumer returns a new Consumer connected to the given rabbitmq server
|
|
func GetConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
|
|
options := &ConsumerOptions{}
|
|
for _, optionFunc := range optionFuncs {
|
|
optionFunc(options)
|
|
}
|
|
|
|
chManager, err := newChannelManager(url, options.Logging)
|
|
if err != nil {
|
|
return Consumer{}, err
|
|
}
|
|
consumer := Consumer{
|
|
chManager: chManager,
|
|
logger: logger{logging: options.Logging},
|
|
}
|
|
return consumer, nil
|
|
}
|
|
|
|
// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
|
|
func getDefaultConsumeOptions() ConsumeOptions {
|
|
return ConsumeOptions{
|
|
QueueDurable: false,
|
|
QueueAutoDelete: false,
|
|
QueueExclusive: false,
|
|
QueueNoWait: false,
|
|
QueueArgs: nil,
|
|
BindingExchange: "",
|
|
BindingNoWait: false,
|
|
BindingArgs: nil,
|
|
Concurrency: 1,
|
|
QOSPrefetch: 0,
|
|
QOSGlobal: false,
|
|
ConsumerName: "",
|
|
ConsumerAutoAck: false,
|
|
ConsumerExclusive: false,
|
|
ConsumerNoWait: false,
|
|
ConsumerNoLocal: false,
|
|
ConsumerArgs: nil,
|
|
}
|
|
}
|
|
|
|
// ConsumeOptions are used to describe how a new consumer will be created.
|
|
type ConsumeOptions struct {
|
|
QueueDurable bool
|
|
QueueAutoDelete bool
|
|
QueueExclusive bool
|
|
QueueNoWait bool
|
|
QueueArgs Table
|
|
BindingExchange string
|
|
BindingNoWait bool
|
|
BindingArgs Table
|
|
Concurrency int
|
|
QOSPrefetch int
|
|
QOSGlobal bool
|
|
ConsumerName string
|
|
ConsumerAutoAck bool
|
|
ConsumerExclusive bool
|
|
ConsumerNoWait bool
|
|
ConsumerNoLocal bool
|
|
ConsumerArgs Table
|
|
}
|
|
|
|
// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency".
|
|
// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s).
|
|
// The provided handler is called once for each message. If the provided queue doesn't exist, it
|
|
// will be created on the cluster
|
|
func (consumer Consumer) StartConsuming(
|
|
handler func(d Delivery) bool,
|
|
queue string,
|
|
routingKeys []string,
|
|
optionFuncs ...func(*ConsumeOptions),
|
|
) error {
|
|
defaultOptions := getDefaultConsumeOptions()
|
|
options := &ConsumeOptions{}
|
|
for _, optionFunc := range optionFuncs {
|
|
optionFunc(options)
|
|
}
|
|
if options.Concurrency < 1 {
|
|
options.Concurrency = defaultOptions.Concurrency
|
|
}
|
|
|
|
err := consumer.startGoroutines(
|
|
handler,
|
|
queue,
|
|
routingKeys,
|
|
*options,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
for err := range consumer.chManager.notifyCancelOrClose {
|
|
consumer.logger.Printf("consume cancel/close handler triggered. err: %v", err)
|
|
consumer.startGoroutinesWithRetries(
|
|
handler,
|
|
queue,
|
|
routingKeys,
|
|
*options,
|
|
)
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// startGoroutinesWithRetries attempts to start consuming on a channel
|
|
// with an exponential backoff
|
|
func (consumer Consumer) startGoroutinesWithRetries(
|
|
handler func(d Delivery) bool,
|
|
queue string,
|
|
routingKeys []string,
|
|
consumeOptions ConsumeOptions,
|
|
) {
|
|
backoffTime := time.Second
|
|
for {
|
|
consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime)
|
|
time.Sleep(backoffTime)
|
|
backoffTime *= 2
|
|
err := consumer.startGoroutines(
|
|
handler,
|
|
queue,
|
|
routingKeys,
|
|
consumeOptions,
|
|
)
|
|
if err != nil {
|
|
consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err)
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
// startGoroutines declares the queue if it doesn't exist,
|
|
// binds the queue to the routing key(s), and starts the goroutines
|
|
// that will consume from the queue
|
|
func (consumer Consumer) startGoroutines(
|
|
handler func(d Delivery) bool,
|
|
queue string,
|
|
routingKeys []string,
|
|
consumeOptions ConsumeOptions,
|
|
) error {
|
|
consumer.chManager.channelMux.RLock()
|
|
defer consumer.chManager.channelMux.RUnlock()
|
|
|
|
_, err := consumer.chManager.channel.QueueDeclare(
|
|
queue,
|
|
consumeOptions.QueueDurable,
|
|
consumeOptions.QueueAutoDelete,
|
|
consumeOptions.QueueExclusive,
|
|
consumeOptions.QueueNoWait,
|
|
tableToAMQPTable(consumeOptions.QueueArgs),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, routingKey := range routingKeys {
|
|
err = consumer.chManager.channel.QueueBind(
|
|
queue,
|
|
routingKey,
|
|
consumeOptions.BindingExchange,
|
|
consumeOptions.BindingNoWait,
|
|
tableToAMQPTable(consumeOptions.BindingArgs),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
err = consumer.chManager.channel.Qos(
|
|
consumeOptions.QOSPrefetch,
|
|
0,
|
|
consumeOptions.QOSGlobal,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
msgs, err := consumer.chManager.channel.Consume(
|
|
queue,
|
|
consumeOptions.ConsumerName,
|
|
consumeOptions.ConsumerAutoAck,
|
|
consumeOptions.ConsumerExclusive,
|
|
consumeOptions.ConsumerNoLocal, // no-local is not supported by RabbitMQ
|
|
consumeOptions.ConsumerNoWait,
|
|
tableToAMQPTable(consumeOptions.ConsumerArgs),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for i := 0; i < consumeOptions.Concurrency; i++ {
|
|
go func() {
|
|
for msg := range msgs {
|
|
if consumeOptions.ConsumerAutoAck {
|
|
handler(Delivery{msg})
|
|
continue
|
|
}
|
|
if handler(Delivery{msg}) {
|
|
msg.Ack(false)
|
|
} else {
|
|
msg.Nack(false, true)
|
|
}
|
|
}
|
|
consumer.logger.Println("rabbit consumer goroutine closed")
|
|
}()
|
|
}
|
|
consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency)
|
|
return nil
|
|
}
|