Browse Source

StopConsuming

pull/20/head v0.6.0
wagslane 5 years ago
parent
commit
98402bf376
2 changed files with 264 additions and 248 deletions
  1. +19
    -10
      channel.go
  2. +245
    -238
      consume.go

+ 19
- 10
channel.go View File

@ -12,12 +12,13 @@ type channelManager struct {
logger Logger logger Logger
url string url string
channel *amqp.Channel channel *amqp.Channel
connection *amqp.Connection
channelMux *sync.RWMutex channelMux *sync.RWMutex
notifyCancelOrClose chan error notifyCancelOrClose chan error
} }
func newChannelManager(url string, log Logger) (*channelManager, error) { func newChannelManager(url string, log Logger) (*channelManager, error) {
ch, err := getNewChannel(url)
conn, ch, err := getNewChannel(url)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -25,6 +26,7 @@ func newChannelManager(url string, log Logger) (*channelManager, error) {
chManager := channelManager{ chManager := channelManager{
logger: log, logger: log,
url: url, url: url,
connection: conn,
channel: ch, channel: ch,
channelMux: &sync.RWMutex{}, channelMux: &sync.RWMutex{},
notifyCancelOrClose: make(chan error), notifyCancelOrClose: make(chan error),
@ -33,16 +35,16 @@ func newChannelManager(url string, log Logger) (*channelManager, error) {
return &chManager, nil return &chManager, nil
} }
func getNewChannel(url string) (*amqp.Channel, error) {
func getNewChannel(url string) (*amqp.Connection, *amqp.Channel, error) {
amqpConn, err := amqp.Dial(url) amqpConn, err := amqp.Dial(url)
if err != nil { if err != nil {
return nil, err
return nil, nil, err
} }
ch, err := amqpConn.Channel() ch, err := amqpConn.Channel()
if err != nil { if err != nil {
return nil, err
return nil, nil, err
} }
return ch, err
return amqpConn, ch, err
} }
// startNotifyCancelOrClosed listens on the channel's cancelled and closed // startNotifyCancelOrClosed listens on the channel's cancelled and closed
@ -56,10 +58,13 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan) notifyCancelChan = chManager.channel.NotifyCancel(notifyCancelChan)
select { select {
case err := <-notifyCloseChan: case err := <-notifyCloseChan:
chManager.logger.Printf("attempting to reconnect to amqp server after close")
chManager.reconnectWithBackoff()
chManager.logger.Printf("successfully reconnected to amqp server after close")
chManager.notifyCancelOrClose <- err
// If the connection close is triggered by the Server, a reconnection takes place
if err.Server {
chManager.logger.Printf("attempting to reconnect to amqp server after close")
chManager.reconnectWithBackoff()
chManager.logger.Printf("successfully reconnected to amqp server after close")
chManager.notifyCancelOrClose <- err
}
case err := <-notifyCancelChan: case err := <-notifyCancelChan:
chManager.logger.Printf("attempting to reconnect to amqp server after cancel") chManager.logger.Printf("attempting to reconnect to amqp server after cancel")
chManager.reconnectWithBackoff() chManager.reconnectWithBackoff()
@ -101,11 +106,15 @@ func (chManager *channelManager) reconnectWithBackoff() {
func (chManager *channelManager) reconnect() error { func (chManager *channelManager) reconnect() error {
chManager.channelMux.Lock() chManager.channelMux.Lock()
defer chManager.channelMux.Unlock() defer chManager.channelMux.Unlock()
newChannel, err := getNewChannel(chManager.url)
newConn, newChannel, err := getNewChannel(chManager.url)
if err != nil { if err != nil {
return err return err
} }
chManager.channel.Close() chManager.channel.Close()
chManager.connection.Close()
chManager.connection = newConn
chManager.channel = newChannel chManager.channel = newChannel
go chManager.startNotifyCancelOrClosed() go chManager.startNotifyCancelOrClosed()
return nil return nil


+ 245
- 238
consume.go View File

@ -1,238 +1,245 @@
package rabbitmq
import (
"fmt"
"time"
"github.com/streadway/amqp"
)
// Consumer allows you to create and connect to queues for data consumption.
type Consumer struct {
chManager *channelManager
logger Logger
}
// ConsumerOptions are used to describe a consumer's configuration.
// Logging set to true will enable the consumer to print to stdout
// Logger specifies a custom Logger interface implementation overruling Logging.
type ConsumerOptions struct {
Logging bool
Logger Logger
}
// Delivery captures the fields for a previously delivered message resident in
// a queue to be delivered by the server to a consumer from Channel.Consume or
// Channel.Get.
type Delivery struct {
amqp.Delivery
}
// NewConsumer returns a new Consumer connected to the given rabbitmq server
func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
options := &ConsumerOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if options.Logger == nil {
options.Logger = &noLogger{} // default no logging
}
chManager, err := newChannelManager(url, options.Logger)
if err != nil {
return Consumer{}, err
}
consumer := Consumer{
chManager: chManager,
logger: options.Logger,
}
return consumer, nil
}
// WithConsumerOptionsLogging sets a logger to log to stdout
func WithConsumerOptionsLogging(options *ConsumerOptions) {
options.Logging = true
options.Logger = &stdLogger{}
}
// WithConsumerOptionsLogger sets logging to a custom interface.
// Use WithConsumerOptionsLogging to just log to stdout.
func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Logging = true
options.Logger = log
}
}
// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency".
// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s).
// The provided handler is called once for each message. If the provided queue doesn't exist, it
// will be created on the cluster
func (consumer Consumer) StartConsuming(
handler func(d Delivery) bool,
queue string,
routingKeys []string,
optionFuncs ...func(*ConsumeOptions),
) error {
defaultOptions := getDefaultConsumeOptions()
options := &ConsumeOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if options.Concurrency < 1 {
options.Concurrency = defaultOptions.Concurrency
}
err := consumer.startGoroutines(
handler,
queue,
routingKeys,
*options,
)
if err != nil {
return err
}
go func() {
for err := range consumer.chManager.notifyCancelOrClose {
consumer.logger.Printf("consume cancel/close handler triggered. err: %v", err)
consumer.startGoroutinesWithRetries(
handler,
queue,
routingKeys,
*options,
)
}
}()
return nil
}
// startGoroutinesWithRetries attempts to start consuming on a channel
// with an exponential backoff
func (consumer Consumer) startGoroutinesWithRetries(
handler func(d Delivery) bool,
queue string,
routingKeys []string,
consumeOptions ConsumeOptions,
) {
backoffTime := time.Second
for {
consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime)
time.Sleep(backoffTime)
backoffTime *= 2
err := consumer.startGoroutines(
handler,
queue,
routingKeys,
consumeOptions,
)
if err != nil {
consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err)
continue
}
break
}
}
// startGoroutines declares the queue if it doesn't exist,
// binds the queue to the routing key(s), and starts the goroutines
// that will consume from the queue
func (consumer Consumer) startGoroutines(
handler func(d Delivery) bool,
queue string,
routingKeys []string,
consumeOptions ConsumeOptions,
) error {
consumer.chManager.channelMux.RLock()
defer consumer.chManager.channelMux.RUnlock()
_, err := consumer.chManager.channel.QueueDeclare(
queue,
consumeOptions.QueueDurable,
consumeOptions.QueueAutoDelete,
consumeOptions.QueueExclusive,
consumeOptions.QueueNoWait,
tableToAMQPTable(consumeOptions.QueueArgs),
)
if err != nil {
return err
}
if consumeOptions.BindingExchange != nil {
exchange := consumeOptions.BindingExchange
if exchange.Name == "" {
return fmt.Errorf("binding to exchange but name not specified")
}
err = consumer.chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.ExchangeArgs),
)
if err != nil {
return err
}
for _, routingKey := range routingKeys {
err = consumer.chManager.channel.QueueBind(
queue,
routingKey,
exchange.Name,
consumeOptions.BindingNoWait,
tableToAMQPTable(consumeOptions.BindingArgs),
)
if err != nil {
return err
}
}
}
err = consumer.chManager.channel.Qos(
consumeOptions.QOSPrefetch,
0,
consumeOptions.QOSGlobal,
)
if err != nil {
return err
}
msgs, err := consumer.chManager.channel.Consume(
queue,
consumeOptions.ConsumerName,
consumeOptions.ConsumerAutoAck,
consumeOptions.ConsumerExclusive,
consumeOptions.ConsumerNoLocal, // no-local is not supported by RabbitMQ
consumeOptions.ConsumerNoWait,
tableToAMQPTable(consumeOptions.ConsumerArgs),
)
if err != nil {
return err
}
for i := 0; i < consumeOptions.Concurrency; i++ {
go func() {
for msg := range msgs {
if consumeOptions.ConsumerAutoAck {
handler(Delivery{msg})
continue
}
if handler(Delivery{msg}) {
err := msg.Ack(false)
if err != nil {
consumer.logger.Printf("can't ack message: %v", err)
}
} else {
err := msg.Nack(false, true)
if err != nil {
consumer.logger.Printf("can't nack message: %v", err)
}
}
}
consumer.logger.Printf("rabbit consumer goroutine closed")
}()
}
consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency)
return nil
}
package rabbitmq
import (
"fmt"
"time"
"github.com/streadway/amqp"
)
// Consumer allows you to create and connect to queues for data consumption.
type Consumer struct {
chManager *channelManager
logger Logger
}
// ConsumerOptions are used to describe a consumer's configuration.
// Logging set to true will enable the consumer to print to stdout
// Logger specifies a custom Logger interface implementation overruling Logging.
type ConsumerOptions struct {
Logging bool
Logger Logger
}
// Delivery captures the fields for a previously delivered message resident in
// a queue to be delivered by the server to a consumer from Channel.Consume or
// Channel.Get.
type Delivery struct {
amqp.Delivery
}
// NewConsumer returns a new Consumer connected to the given rabbitmq server
func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
options := &ConsumerOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if options.Logger == nil {
options.Logger = &noLogger{} // default no logging
}
chManager, err := newChannelManager(url, options.Logger)
if err != nil {
return Consumer{}, err
}
consumer := Consumer{
chManager: chManager,
logger: options.Logger,
}
return consumer, nil
}
// WithConsumerOptionsLogging sets a logger to log to stdout
func WithConsumerOptionsLogging(options *ConsumerOptions) {
options.Logging = true
options.Logger = &stdLogger{}
}
// WithConsumerOptionsLogger sets logging to a custom interface.
// Use WithConsumerOptionsLogging to just log to stdout.
func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) {
return func(options *ConsumerOptions) {
options.Logging = true
options.Logger = log
}
}
// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency".
// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s).
// The provided handler is called once for each message. If the provided queue doesn't exist, it
// will be created on the cluster
func (consumer Consumer) StartConsuming(
handler func(d Delivery) bool,
queue string,
routingKeys []string,
optionFuncs ...func(*ConsumeOptions),
) error {
defaultOptions := getDefaultConsumeOptions()
options := &ConsumeOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
if options.Concurrency < 1 {
options.Concurrency = defaultOptions.Concurrency
}
err := consumer.startGoroutines(
handler,
queue,
routingKeys,
*options,
)
if err != nil {
return err
}
go func() {
for err := range consumer.chManager.notifyCancelOrClose {
consumer.logger.Printf("consume cancel/close handler triggered. err: %v", err)
consumer.startGoroutinesWithRetries(
handler,
queue,
routingKeys,
*options,
)
}
}()
return nil
}
// StopConsuming stops the consumption of messages.
// The consumer should be discarded as it's not safe for re-use
func (consumer Consumer) StopConsuming() {
consumer.chManager.channel.Close()
consumer.chManager.connection.Close()
}
// startGoroutinesWithRetries attempts to start consuming on a channel
// with an exponential backoff
func (consumer Consumer) startGoroutinesWithRetries(
handler func(d Delivery) bool,
queue string,
routingKeys []string,
consumeOptions ConsumeOptions,
) {
backoffTime := time.Second
for {
consumer.logger.Printf("waiting %s seconds to attempt to start consumer goroutines", backoffTime)
time.Sleep(backoffTime)
backoffTime *= 2
err := consumer.startGoroutines(
handler,
queue,
routingKeys,
consumeOptions,
)
if err != nil {
consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err)
continue
}
break
}
}
// startGoroutines declares the queue if it doesn't exist,
// binds the queue to the routing key(s), and starts the goroutines
// that will consume from the queue
func (consumer Consumer) startGoroutines(
handler func(d Delivery) bool,
queue string,
routingKeys []string,
consumeOptions ConsumeOptions,
) error {
consumer.chManager.channelMux.RLock()
defer consumer.chManager.channelMux.RUnlock()
_, err := consumer.chManager.channel.QueueDeclare(
queue,
consumeOptions.QueueDurable,
consumeOptions.QueueAutoDelete,
consumeOptions.QueueExclusive,
consumeOptions.QueueNoWait,
tableToAMQPTable(consumeOptions.QueueArgs),
)
if err != nil {
return err
}
if consumeOptions.BindingExchange != nil {
exchange := consumeOptions.BindingExchange
if exchange.Name == "" {
return fmt.Errorf("binding to exchange but name not specified")
}
err = consumer.chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.ExchangeArgs),
)
if err != nil {
return err
}
for _, routingKey := range routingKeys {
err = consumer.chManager.channel.QueueBind(
queue,
routingKey,
exchange.Name,
consumeOptions.BindingNoWait,
tableToAMQPTable(consumeOptions.BindingArgs),
)
if err != nil {
return err
}
}
}
err = consumer.chManager.channel.Qos(
consumeOptions.QOSPrefetch,
0,
consumeOptions.QOSGlobal,
)
if err != nil {
return err
}
msgs, err := consumer.chManager.channel.Consume(
queue,
consumeOptions.ConsumerName,
consumeOptions.ConsumerAutoAck,
consumeOptions.ConsumerExclusive,
consumeOptions.ConsumerNoLocal, // no-local is not supported by RabbitMQ
consumeOptions.ConsumerNoWait,
tableToAMQPTable(consumeOptions.ConsumerArgs),
)
if err != nil {
return err
}
for i := 0; i < consumeOptions.Concurrency; i++ {
go func() {
for msg := range msgs {
if consumeOptions.ConsumerAutoAck {
handler(Delivery{msg})
continue
}
if handler(Delivery{msg}) {
err := msg.Ack(false)
if err != nil {
consumer.logger.Printf("can't ack message: %v", err)
}
} else {
err := msg.Nack(false, true)
if err != nil {
consumer.logger.Printf("can't nack message: %v", err)
}
}
}
consumer.logger.Printf("rabbit consumer goroutine closed")
}()
}
consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency)
return nil
}

Loading…
Cancel
Save