You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

237 lines
5.8 KiB

package rabbitmq
import (
"time"
"github.com/streadway/amqp"
)
// Consumer allows you to create and connect to queues for data consumption.
type Consumer struct {
chManager *channelManager
logger logger
}
// ConsumerOptions are used to describe a consumer's configuration.
// Logging set to true will enable the consumer to print to stdout
type ConsumerOptions struct {
Logging bool
}
// Delivery captures the fields for a previously delivered message resident in
// a queue to be delivered by the server to a consumer from Channel.Consume or
// Channel.Get.
type Delivery struct {
amqp.Delivery
}
// NewConsumer returns a new Consumer connected to the given rabbitmq server
func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
options := &ConsumerOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
chManager, err := newChannelManager(url, options.Logging)
if err != nil {
return Consumer{}, err
}
consumer := Consumer{
chManager: chManager,
logger: logger{logging: options.Logging},
}
return consumer, nil
}
// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
func getDefaultConsumeOptions() ConsumeOptions {
return ConsumeOptions{
QueueDurable: false,
QueueAutoDelete: false,
QueueExclusive: false,
QueueNoWait: false,
QueueArgs: nil,
BindingExchange: "",
BindingNoWait: false,
BindingArgs: nil,
Concurrency: 1,
QOSPrefetch: 0,
QOSGlobal: false,
ConsumerName: "",
ConsumerAutoAck: false,
ConsumerExclusive: false,
ConsumerNoWait: false,
ConsumerNoLocal: false,
ConsumerArgs: nil,
}
}
// ConsumeOptions are used to describe how a new consumer will be created.
type ConsumeOptions struct {
QueueDurable bool
QueueAutoDelete bool
QueueExclusive bool
QueueNoWait bool
QueueArgs Table
BindingExchange string
BindingNoWait bool
BindingArgs Table
Concurrency int
QOSPrefetch int
QOSGlobal bool
ConsumerName string
ConsumerAutoAck bool
ConsumerExclusive bool
ConsumerNoWait bool
ConsumerNoLocal bool
ConsumerArgs Table
}
// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency".
// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s).
// The provided handler is called once for each message. If the provided queue doesn't exist, it
// will be created on the cluster
func (consumer Consumer) StartConsuming(
handler func(d Delivery) bool,
queue string,
routingKeys []string,
optionFuncs ...func(*ConsumeOptions),
) error {
defaultOptions := getDefaultConsumeOptions()
options := &ConsumeOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if options.Concurrency < 1 {
options.Concurrency = defaultOptions.Concurrency
}
err := consumer.startGoroutines(
handler,
queue,
routingKeys,
*options,
)
if err != nil {
return err
}
go func() {
for err := range consumer.chManager.notifyCancelOrClose {
consumer.logger.Printf("consume cancel/close handler triggered. err: %v", err)
consumer.startGoroutinesWithRetries(
handler,
queue,
routingKeys,
*options,
)
}
}()
return nil
}
// startGoroutinesWithRetries attempts to start consuming on a channel
// with an exponential backoff
func (consumer Consumer) startGoroutinesWithRetries(
handler func(d Delivery) bool,
queue string,
routingKeys []string,
consumeOptions ConsumeOptions,
) {
backoffTime := time.Second
for {
consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime)
time.Sleep(backoffTime)
backoffTime *= 2
err := consumer.startGoroutines(
handler,
queue,
routingKeys,
consumeOptions,
)
if err != nil {
consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err)
continue
}
break
}
}
// startGoroutines declares the queue if it doesn't exist,
// binds the queue to the routing key(s), and starts the goroutines
// that will consume from the queue
func (consumer Consumer) startGoroutines(
handler func(d Delivery) bool,
queue string,
routingKeys []string,
consumeOptions ConsumeOptions,
) error {
consumer.chManager.channelMux.RLock()
defer consumer.chManager.channelMux.RUnlock()
_, err := consumer.chManager.channel.QueueDeclare(
queue,
consumeOptions.QueueDurable,
consumeOptions.QueueAutoDelete,
consumeOptions.QueueExclusive,
consumeOptions.QueueNoWait,
tableToAMQPTable(consumeOptions.QueueArgs),
)
if err != nil {
return err
}
for _, routingKey := range routingKeys {
err = consumer.chManager.channel.QueueBind(
queue,
routingKey,
consumeOptions.BindingExchange,
consumeOptions.BindingNoWait,
tableToAMQPTable(consumeOptions.BindingArgs),
)
if err != nil {
return err
}
}
err = consumer.chManager.channel.Qos(
consumeOptions.QOSPrefetch,
0,
consumeOptions.QOSGlobal,
)
if err != nil {
return err
}
msgs, err := consumer.chManager.channel.Consume(
queue,
consumeOptions.ConsumerName,
consumeOptions.ConsumerAutoAck,
consumeOptions.ConsumerExclusive,
consumeOptions.ConsumerNoLocal, // no-local is not supported by RabbitMQ
consumeOptions.ConsumerNoWait,
tableToAMQPTable(consumeOptions.ConsumerArgs),
)
if err != nil {
return err
}
for i := 0; i < consumeOptions.Concurrency; i++ {
go func() {
for msg := range msgs {
if consumeOptions.ConsumerAutoAck {
handler(Delivery{msg})
continue
}
if handler(Delivery{msg}) {
msg.Ack(false)
} else {
msg.Nack(false, true)
}
}
consumer.logger.Println("rabbit consumer goroutine closed")
}()
}
consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency)
return nil
}