You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

264 lines
6.1 KiB

package rabbitmq
import (
"log"
"time"
"github.com/streadway/amqp"
)
// Consumer allows you to create and connect to queues for data consumption.
type Consumer struct {
chManager *channelManager
logger logger
}
// Delivery captures the fields for a previously delivered message resident in
// a queue to be delivered by the server to a consumer from Channel.Consume or
// Channel.Get.
type Delivery struct {
amqp.Delivery
}
// ConsumeOptions are used to describe how a new consumer will be created.
type ConsumeOptions struct {
QueueOptions QueueOptions
BindingOptions BindingOptions
QosOptions QosOptions
ConsumerOptions ConsumerOptions
Logging bool
}
// QueueOptions -
type QueueOptions struct {
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Args Table
}
// BindingOptions -
type BindingOptions struct {
Exchange string
NoWait bool
Args Table
}
// QosOptions -
type QosOptions struct {
Concurrency int
Prefetch int
Global bool
}
// ConsumerOptions -
type ConsumerOptions struct {
Name string
AutoAck bool
Exclusive bool
NoWait bool
NoLocal bool
Args Table
}
// GetConsumer returns a new Consumer connected to the given rabbitmq server
func GetConsumer(url string, logging bool) (Consumer, error) {
chManager, err := newChannelManager(url, logging)
if err != nil {
return Consumer{}, err
}
consumer := Consumer{
chManager: chManager,
logger: logger{logging: logging},
}
return consumer, nil
}
// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
func getDefaultConsumeOptions() ConsumeOptions {
return ConsumeOptions{
QueueOptions: QueueOptions{
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: nil,
},
BindingOptions: BindingOptions{
Exchange: "",
NoWait: false,
Args: nil,
},
QosOptions: QosOptions{
Concurrency: 1,
Prefetch: 10,
Global: false,
},
ConsumerOptions: ConsumerOptions{
Name: "",
AutoAck: false,
Exclusive: false,
NoWait: false,
NoLocal: false,
Args: nil,
},
}
}
// fillInConsumeDefaults -
func fillInConsumeDefaults(consumeOptions ConsumeOptions) ConsumeOptions {
defaults := getDefaultConsumeOptions()
if consumeOptions.QosOptions.Concurrency < 1 {
consumeOptions.QosOptions.Concurrency = defaults.QosOptions.Concurrency
}
return consumeOptions
}
// StartConsumers starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency".
// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s).
// The provided handler is called once for each message. If the provided queue doesn't exist, it
// will be created on the cluster
func (consumer Consumer) StartConsumers(
handler func(d Delivery) bool,
consumeOptions *ConsumeOptions,
queue string,
routingKeys ...string,
) error {
defaults := getDefaultConsumeOptions()
finalOptions := ConsumeOptions{}
if consumeOptions == nil {
finalOptions = defaults
} else {
finalOptions = fillInConsumeDefaults(*consumeOptions)
}
err := consumer.startGoroutines(
handler,
finalOptions,
queue,
routingKeys...,
)
if err != nil {
return err
}
go func() {
for err := range consumer.chManager.notifyCancelOrClose {
consumer.logger.Printf("consume cancel/close handler triggered. err: %v", err)
consumer.startGoroutinesWithRetries(
handler,
finalOptions,
queue,
routingKeys...,
)
}
}()
return nil
}
// startGoroutinesWithRetries attempts to start consuming on a channel
// with an exponential backoff
func (consumer Consumer) startGoroutinesWithRetries(
handler func(d Delivery) bool,
consumeOptions ConsumeOptions,
queue string,
routingKeys ...string,
) {
backoffTime := time.Second
for {
consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime)
time.Sleep(backoffTime)
backoffTime *= 2
err := consumer.startGoroutines(
handler,
consumeOptions,
queue,
routingKeys...,
)
if err != nil {
consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err)
continue
}
break
}
}
// startGoroutines declares the queue if it doesn't exist,
// binds the queue to the routing key(s), and starts the goroutines
// that will consume from the queue
func (consumer Consumer) startGoroutines(
handler func(d Delivery) bool,
consumeOptions ConsumeOptions,
queue string,
routingKeys ...string,
) error {
consumer.chManager.channelMux.RLock()
defer consumer.chManager.channelMux.RUnlock()
_, err := consumer.chManager.channel.QueueDeclare(
queue,
consumeOptions.QueueOptions.Durable,
consumeOptions.QueueOptions.AutoDelete,
consumeOptions.QueueOptions.Exclusive,
consumeOptions.QueueOptions.NoWait,
tableToAMQPTable(consumeOptions.QueueOptions.Args),
)
if err != nil {
return err
}
for _, routingKey := range routingKeys {
err = consumer.chManager.channel.QueueBind(
queue,
routingKey,
consumeOptions.BindingOptions.Exchange,
consumeOptions.BindingOptions.NoWait,
tableToAMQPTable(consumeOptions.BindingOptions.Args),
)
if err != nil {
return err
}
}
err = consumer.chManager.channel.Qos(
consumeOptions.QosOptions.Prefetch,
0,
consumeOptions.QosOptions.Global,
)
if err != nil {
return err
}
msgs, err := consumer.chManager.channel.Consume(
queue,
consumeOptions.ConsumerOptions.Name,
consumeOptions.ConsumerOptions.AutoAck,
consumeOptions.ConsumerOptions.Exclusive,
consumeOptions.ConsumerOptions.NoLocal, // no-local is not supported by RabbitMQ
consumeOptions.ConsumerOptions.NoWait,
tableToAMQPTable(consumeOptions.ConsumerOptions.Args),
)
if err != nil {
return err
}
for i := 0; i < consumeOptions.QosOptions.Concurrency; i++ {
go func() {
for msg := range msgs {
if consumeOptions.ConsumerOptions.AutoAck {
handler(Delivery{msg})
continue
}
if handler(Delivery{msg}) {
msg.Ack(false)
} else {
msg.Nack(false, true)
}
}
log.Println("rabbit consumer goroutine closed")
}()
}
log.Printf("Processing messages on %v goroutines", consumeOptions.QosOptions.Concurrency)
return nil
}