package rabbitmq import ( "time" "github.com/streadway/amqp" ) // Consumer allows you to create and connect to queues for data consumption. type Consumer struct { chManager *channelManager logger logger } // ConsumerOptions are used to describe a consumer's configuration. // Logging set to true will enable the consumer to print to stdout type ConsumerOptions struct { Logging bool } // Delivery captures the fields for a previously delivered message resident in // a queue to be delivered by the server to a consumer from Channel.Consume or // Channel.Get. type Delivery struct { amqp.Delivery } // NewConsumer returns a new Consumer connected to the given rabbitmq server func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { options := &ConsumerOptions{} for _, optionFunc := range optionFuncs { optionFunc(options) } chManager, err := newChannelManager(url, options.Logging) if err != nil { return Consumer{}, err } consumer := Consumer{ chManager: chManager, logger: logger{logging: options.Logging}, } return consumer, nil } // WithConsumerOptionsLogging sets logging to true on the consumer options func WithConsumerOptionsLogging(options *ConsumerOptions) { options.Logging = true } // getDefaultConsumeOptions descibes the options that will be used when a value isn't provided func getDefaultConsumeOptions() ConsumeOptions { return ConsumeOptions{ QueueDurable: false, QueueAutoDelete: false, QueueExclusive: false, QueueNoWait: false, QueueArgs: nil, BindingExchange: "", BindingNoWait: false, BindingArgs: nil, Concurrency: 1, QOSPrefetch: 0, QOSGlobal: false, ConsumerName: "", ConsumerAutoAck: false, ConsumerExclusive: false, ConsumerNoWait: false, ConsumerNoLocal: false, ConsumerArgs: nil, } } // ConsumeOptions are used to describe how a new consumer will be created. type ConsumeOptions struct { QueueDurable bool QueueAutoDelete bool QueueExclusive bool QueueNoWait bool QueueArgs Table BindingExchange string BindingNoWait bool BindingArgs Table Concurrency int QOSPrefetch int QOSGlobal bool ConsumerName string ConsumerAutoAck bool ConsumerExclusive bool ConsumerNoWait bool ConsumerNoLocal bool ConsumerArgs Table } // WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't // be destroyed when the server restarts. It must only be bound to durable exchanges func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { options.QueueDurable = true } // WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will // be deleted when there are no more conusmers on it func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) { options.QueueAutoDelete = true } // WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means // it's are only accessible by the connection that declares it and // will be deleted when the connection closes. Channels on other connections // will receive an error when attempting to declare, bind, consume, purge or // delete a queue with the same name. func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) { options.QueueExclusive = true } // WithConsumeOptionsQueueNoWait sets the queue to nowait, which means // the queue will assume to be declared on the server. A // channel exception will arrive if the conditions are met for existing queues // or attempting to modify an existing queue from a different connection. func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { options.QueueNoWait = true } // WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes // in the cluster will have the messages distributed amongst them for higher reliability func WithConsumeOptionsQuorum(options *ConsumeOptions) { if options.QueueArgs == nil { options.QueueArgs = Table{} } options.QueueArgs["x-queue-type"] = "quorum" } // WithConsumeOptionsBindingExchange returns a function that sets the exchange the queue will be bound to func WithConsumeOptionsBindingExchange(exchange string) func(*ConsumeOptions) { return func(options *ConsumeOptions) { options.BindingExchange = exchange } } // WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound // the channel will not be closed with an error. func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) { options.BindingNoWait = true } // WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that // many goroutines will be spawned to run the provided handler on messages func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { return func(options *ConsumeOptions) { options.Concurrency = concurrency } } // WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that // many messages will be fetched from the server in advance to help with throughput. // This doesn't affect the handler, messages are still processed one at a time. func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) { return func(options *ConsumeOptions) { options.QOSPrefetch = prefetchCount } } // WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means // these QOS settings apply to ALL existing and future // consumers on all channels on the same connection func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) { options.QOSGlobal = true } // WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer // if unset a random name will be given func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { return func(options *ConsumeOptions) { options.ConsumerName = consumerName } } // WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means // the server will ensure that this is the sole consumer // from this queue. When exclusive is false, the server will fairly distribute // deliveries across multiple consumers. func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { options.ConsumerExclusive = true } // WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means // it does not wait for the server to confirm the request and // immediately begin deliveries. If it is not possible to consume, a channel // exception will be raised and the channel will be closed. func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) { options.ConsumerNoWait = true } // StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". // Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s). // The provided handler is called once for each message. If the provided queue doesn't exist, it // will be created on the cluster func (consumer Consumer) StartConsuming( handler func(d Delivery) bool, queue string, routingKeys []string, optionFuncs ...func(*ConsumeOptions), ) error { defaultOptions := getDefaultConsumeOptions() options := &ConsumeOptions{} for _, optionFunc := range optionFuncs { optionFunc(options) } if options.Concurrency < 1 { options.Concurrency = defaultOptions.Concurrency } err := consumer.startGoroutines( handler, queue, routingKeys, *options, ) if err != nil { return err } go func() { for err := range consumer.chManager.notifyCancelOrClose { consumer.logger.Printf("consume cancel/close handler triggered. err: %v", err) consumer.startGoroutinesWithRetries( handler, queue, routingKeys, *options, ) } }() return nil } // startGoroutinesWithRetries attempts to start consuming on a channel // with an exponential backoff func (consumer Consumer) startGoroutinesWithRetries( handler func(d Delivery) bool, queue string, routingKeys []string, consumeOptions ConsumeOptions, ) { backoffTime := time.Second for { consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime) time.Sleep(backoffTime) backoffTime *= 2 err := consumer.startGoroutines( handler, queue, routingKeys, consumeOptions, ) if err != nil { consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err) continue } break } } // startGoroutines declares the queue if it doesn't exist, // binds the queue to the routing key(s), and starts the goroutines // that will consume from the queue func (consumer Consumer) startGoroutines( handler func(d Delivery) bool, queue string, routingKeys []string, consumeOptions ConsumeOptions, ) error { consumer.chManager.channelMux.RLock() defer consumer.chManager.channelMux.RUnlock() _, err := consumer.chManager.channel.QueueDeclare( queue, consumeOptions.QueueDurable, consumeOptions.QueueAutoDelete, consumeOptions.QueueExclusive, consumeOptions.QueueNoWait, tableToAMQPTable(consumeOptions.QueueArgs), ) if err != nil { return err } if consumeOptions.BindingExchange != "" { for _, routingKey := range routingKeys { err = consumer.chManager.channel.QueueBind( queue, routingKey, consumeOptions.BindingExchange, consumeOptions.BindingNoWait, tableToAMQPTable(consumeOptions.BindingArgs), ) if err != nil { return err } } } err = consumer.chManager.channel.Qos( consumeOptions.QOSPrefetch, 0, consumeOptions.QOSGlobal, ) if err != nil { return err } msgs, err := consumer.chManager.channel.Consume( queue, consumeOptions.ConsumerName, consumeOptions.ConsumerAutoAck, consumeOptions.ConsumerExclusive, consumeOptions.ConsumerNoLocal, // no-local is not supported by RabbitMQ consumeOptions.ConsumerNoWait, tableToAMQPTable(consumeOptions.ConsumerArgs), ) if err != nil { return err } for i := 0; i < consumeOptions.Concurrency; i++ { go func() { for msg := range msgs { if consumeOptions.ConsumerAutoAck { handler(Delivery{msg}) continue } if handler(Delivery{msg}) { msg.Ack(false) } else { msg.Nack(false, true) } } consumer.logger.Println("rabbit consumer goroutine closed") }() } consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency) return nil }