diff --git a/README.md b/README.md index 78ce52c..5553908 100644 --- a/README.md +++ b/README.md @@ -28,50 +28,62 @@ go get github.com/wagslane/go-rabbitmq ## 🚀 Quick Start Consumer ```go -consumer, err := rabbitmq.GetConsumer("amqp://user:pass@localhost", true) +consumer, err := rabbitmq.GetConsumer( + "amqp://user:pass@localhost", + // can pass nothing for no logging + func(opts *rabbitmq.ConsumerOptions) { + opts.Logging = true + }, +) if err != nil { log.Fatal(err) } -err = consumer.StartConsumers( +err = consumer.StartConsuming( func(d rabbitmq.Delivery) bool { log.Printf("consumed: %v", string(d.Body)) - // true to ACK, false to NACK return true }, - // can pass nil here for defaults - &rabbitmq.ConsumeOptions{ - QueueOptions: rabbitmq.QueueOptions{ - Durable: true, - }, - QosOptions: rabbitmq.QosOptions{ - Concurrency: 10, - Prefetch: 100, - }, - }, "my_queue", - "routing_key1", "routing_key2", + []string{"routing_key1", "routing_key2"}, + // can pass nothing here for defaults + func(opts *rabbitmq.ConsumeOptions) { + opts.QueueDurable = true + opts.Concurrency = 10 + opts.QOSPrefetch = 100 + }, ) if err != nil { log.Fatal(err) } + +// block main thread so consumers run forever +forever := make(chan struct{}) +<-forever ``` ## 🚀 Quick Start Publisher ```go -publisher, returns, err := rabbitmq.GetPublisher("amqp://user:pass@localhost", true) +publisher, returns, err := rabbitmq.GetPublisher( + "amqp://user:pass@localhost", + // can pass nothing for no logging + func(opts *rabbitmq.PublisherOptions) { + opts.Logging = true + }, +) if err != nil { log.Fatal(err) } err = publisher.Publish( []byte("hello, world"), - // leave nil for defaults - &rabbitmq.PublishOptions{ - Exchange: "events", - Mandatory: true, + []string{"routing_key"}, + // leave blank for defaults + func(opts *rabbitmq.PublishOptions) { + opts.DeliveryMode = rabbitmq.Persistent + opts.Mandatory = true + opts.ContentType = "application/json" }, - "routing_key", ) if err != nil { log.Fatal(err) diff --git a/consume.go b/consume.go index b8d25e4..9850b49 100644 --- a/consume.go +++ b/consume.go @@ -1,7 +1,6 @@ package rabbitmq import ( - "log" "time" "github.com/streadway/amqp" @@ -13,6 +12,12 @@ type Consumer struct { logger logger } +// ConsumerOptions are used to describe a consumer's configuration. +// Logging set to true will enable the consumer to print to stdout +type ConsumerOptions struct { + Logging bool +} + // Delivery captures the fields for a previously delivered message resident in // a queue to be delivered by the server to a consumer from Channel.Consume or // Channel.Get. @@ -20,57 +25,20 @@ type Delivery struct { amqp.Delivery } -// ConsumeOptions are used to describe how a new consumer will be created. -type ConsumeOptions struct { - QueueOptions QueueOptions - BindingOptions BindingOptions - QosOptions QosOptions - ConsumerOptions ConsumerOptions - Logging bool -} - -// QueueOptions - -type QueueOptions struct { - Durable bool - AutoDelete bool - Exclusive bool - NoWait bool - Args Table -} - -// BindingOptions - -type BindingOptions struct { - Exchange string - NoWait bool - Args Table -} - -// QosOptions - -type QosOptions struct { - Concurrency int - Prefetch int - Global bool -} - -// ConsumerOptions - -type ConsumerOptions struct { - Name string - AutoAck bool - Exclusive bool - NoWait bool - NoLocal bool - Args Table -} - // GetConsumer returns a new Consumer connected to the given rabbitmq server -func GetConsumer(url string, logging bool) (Consumer, error) { - chManager, err := newChannelManager(url, logging) +func GetConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { + options := &ConsumerOptions{} + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + + chManager, err := newChannelManager(url, options.Logging) if err != nil { return Consumer{}, err } consumer := Consumer{ chManager: chManager, - logger: logger{logging: logging}, + logger: logger{logging: options.Logging}, } return consumer, nil } @@ -78,66 +46,71 @@ func GetConsumer(url string, logging bool) (Consumer, error) { // getDefaultConsumeOptions descibes the options that will be used when a value isn't provided func getDefaultConsumeOptions() ConsumeOptions { return ConsumeOptions{ - QueueOptions: QueueOptions{ - Durable: false, - AutoDelete: false, - Exclusive: false, - NoWait: false, - Args: nil, - }, - BindingOptions: BindingOptions{ - Exchange: "", - NoWait: false, - Args: nil, - }, - QosOptions: QosOptions{ - Concurrency: 1, - Prefetch: 10, - Global: false, - }, - ConsumerOptions: ConsumerOptions{ - Name: "", - AutoAck: false, - Exclusive: false, - NoWait: false, - NoLocal: false, - Args: nil, - }, + QueueDurable: false, + QueueAutoDelete: false, + QueueExclusive: false, + QueueNoWait: false, + QueueArgs: nil, + BindingExchange: "", + BindingNoWait: false, + BindingArgs: nil, + Concurrency: 1, + QOSPrefetch: 0, + QOSGlobal: false, + ConsumerName: "", + ConsumerAutoAck: false, + ConsumerExclusive: false, + ConsumerNoWait: false, + ConsumerNoLocal: false, + ConsumerArgs: nil, } } -// fillInConsumeDefaults - -func fillInConsumeDefaults(consumeOptions ConsumeOptions) ConsumeOptions { - defaults := getDefaultConsumeOptions() - if consumeOptions.QosOptions.Concurrency < 1 { - consumeOptions.QosOptions.Concurrency = defaults.QosOptions.Concurrency - } - return consumeOptions +// ConsumeOptions are used to describe how a new consumer will be created. +type ConsumeOptions struct { + QueueDurable bool + QueueAutoDelete bool + QueueExclusive bool + QueueNoWait bool + QueueArgs Table + BindingExchange string + BindingNoWait bool + BindingArgs Table + Concurrency int + QOSPrefetch int + QOSGlobal bool + ConsumerName string + ConsumerAutoAck bool + ConsumerExclusive bool + ConsumerNoWait bool + ConsumerNoLocal bool + ConsumerArgs Table } -// StartConsumers starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". +// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". // Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s). // The provided handler is called once for each message. If the provided queue doesn't exist, it // will be created on the cluster -func (consumer Consumer) StartConsumers( +func (consumer Consumer) StartConsuming( handler func(d Delivery) bool, - consumeOptions *ConsumeOptions, queue string, - routingKeys ...string, + routingKeys []string, + optionFuncs ...func(*ConsumeOptions), ) error { - defaults := getDefaultConsumeOptions() - finalOptions := ConsumeOptions{} - if consumeOptions == nil { - finalOptions = defaults - } else { - finalOptions = fillInConsumeDefaults(*consumeOptions) + defaultOptions := getDefaultConsumeOptions() + options := &ConsumeOptions{} + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + if options.Concurrency < 1 { + options.Concurrency = defaultOptions.Concurrency } err := consumer.startGoroutines( handler, - finalOptions, queue, - routingKeys..., + routingKeys, + *options, ) if err != nil { return err @@ -148,9 +121,9 @@ func (consumer Consumer) StartConsumers( consumer.logger.Printf("consume cancel/close handler triggered. err: %v", err) consumer.startGoroutinesWithRetries( handler, - finalOptions, queue, - routingKeys..., + routingKeys, + *options, ) } }() @@ -161,9 +134,9 @@ func (consumer Consumer) StartConsumers( // with an exponential backoff func (consumer Consumer) startGoroutinesWithRetries( handler func(d Delivery) bool, - consumeOptions ConsumeOptions, queue string, - routingKeys ...string, + routingKeys []string, + consumeOptions ConsumeOptions, ) { backoffTime := time.Second for { @@ -172,9 +145,9 @@ func (consumer Consumer) startGoroutinesWithRetries( backoffTime *= 2 err := consumer.startGoroutines( handler, - consumeOptions, queue, - routingKeys..., + routingKeys, + consumeOptions, ) if err != nil { consumer.logger.Printf("couldn't start consumer goroutines. err: %v", err) @@ -189,20 +162,20 @@ func (consumer Consumer) startGoroutinesWithRetries( // that will consume from the queue func (consumer Consumer) startGoroutines( handler func(d Delivery) bool, - consumeOptions ConsumeOptions, queue string, - routingKeys ...string, + routingKeys []string, + consumeOptions ConsumeOptions, ) error { consumer.chManager.channelMux.RLock() defer consumer.chManager.channelMux.RUnlock() _, err := consumer.chManager.channel.QueueDeclare( queue, - consumeOptions.QueueOptions.Durable, - consumeOptions.QueueOptions.AutoDelete, - consumeOptions.QueueOptions.Exclusive, - consumeOptions.QueueOptions.NoWait, - tableToAMQPTable(consumeOptions.QueueOptions.Args), + consumeOptions.QueueDurable, + consumeOptions.QueueAutoDelete, + consumeOptions.QueueExclusive, + consumeOptions.QueueNoWait, + tableToAMQPTable(consumeOptions.QueueArgs), ) if err != nil { return err @@ -212,9 +185,9 @@ func (consumer Consumer) startGoroutines( err = consumer.chManager.channel.QueueBind( queue, routingKey, - consumeOptions.BindingOptions.Exchange, - consumeOptions.BindingOptions.NoWait, - tableToAMQPTable(consumeOptions.BindingOptions.Args), + consumeOptions.BindingExchange, + consumeOptions.BindingNoWait, + tableToAMQPTable(consumeOptions.BindingArgs), ) if err != nil { return err @@ -222,9 +195,9 @@ func (consumer Consumer) startGoroutines( } err = consumer.chManager.channel.Qos( - consumeOptions.QosOptions.Prefetch, + consumeOptions.QOSPrefetch, 0, - consumeOptions.QosOptions.Global, + consumeOptions.QOSGlobal, ) if err != nil { return err @@ -232,21 +205,21 @@ func (consumer Consumer) startGoroutines( msgs, err := consumer.chManager.channel.Consume( queue, - consumeOptions.ConsumerOptions.Name, - consumeOptions.ConsumerOptions.AutoAck, - consumeOptions.ConsumerOptions.Exclusive, - consumeOptions.ConsumerOptions.NoLocal, // no-local is not supported by RabbitMQ - consumeOptions.ConsumerOptions.NoWait, - tableToAMQPTable(consumeOptions.ConsumerOptions.Args), + consumeOptions.ConsumerName, + consumeOptions.ConsumerAutoAck, + consumeOptions.ConsumerExclusive, + consumeOptions.ConsumerNoLocal, // no-local is not supported by RabbitMQ + consumeOptions.ConsumerNoWait, + tableToAMQPTable(consumeOptions.ConsumerArgs), ) if err != nil { return err } - for i := 0; i < consumeOptions.QosOptions.Concurrency; i++ { + for i := 0; i < consumeOptions.Concurrency; i++ { go func() { for msg := range msgs { - if consumeOptions.ConsumerOptions.AutoAck { + if consumeOptions.ConsumerAutoAck { handler(Delivery{msg}) continue } @@ -256,9 +229,9 @@ func (consumer Consumer) startGoroutines( msg.Nack(false, true) } } - log.Println("rabbit consumer goroutine closed") + consumer.logger.Println("rabbit consumer goroutine closed") }() } - log.Printf("Processing messages on %v goroutines", consumeOptions.QosOptions.Concurrency) + consumer.logger.Printf("Processing messages on %v goroutines", consumeOptions.Concurrency) return nil } diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 719e74a..e4e3858 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -7,29 +7,30 @@ import ( ) func main() { - consumer, err := rabbitmq.GetConsumer("amqp://user:pass@localhost", true) + consumer, err := rabbitmq.GetConsumer( + "amqp://user:pass@localhost", + // can pass nothing for no logging + func(opts *rabbitmq.ConsumerOptions) { + opts.Logging = true + }, + ) if err != nil { log.Fatal(err) } - err = consumer.StartConsumers( + err = consumer.StartConsuming( func(d rabbitmq.Delivery) bool { log.Printf("consumed: %v", string(d.Body)) - // true to ACK, false to NACK return true }, - // can pass nil here for defaults - &rabbitmq.ConsumeOptions{ - QueueOptions: rabbitmq.QueueOptions{ - Durable: true, - }, - QosOptions: rabbitmq.QosOptions{ - Concurrency: 10, - Prefetch: 100, - }, - }, "my_queue", - "routing_key1", "routing_key2", + []string{"routing_key1", "routing_key2"}, + // can pass nothing here for defaults + func(opts *rabbitmq.ConsumeOptions) { + opts.QueueDurable = true + opts.Concurrency = 10 + opts.QOSPrefetch = 100 + }, ) if err != nil { log.Fatal(err) diff --git a/examples/publisher/main.go b/examples/publisher/main.go index 47801fa..fe6c0d1 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -7,18 +7,25 @@ import ( ) func main() { - publisher, returns, err := rabbitmq.GetPublisher("amqp://user:pass@localhost", true) + publisher, returns, err := rabbitmq.GetPublisher( + "amqp://user:pass@localhost", + // can pass nothing for no logging + func(opts *rabbitmq.PublisherOptions) { + opts.Logging = true + }, + ) if err != nil { log.Fatal(err) } err = publisher.Publish( []byte("hello, world"), - // leave nil for defaults - &rabbitmq.PublishOptions{ - Exchange: "events", - Mandatory: true, + []string{"routing_key"}, + // leave blank for defaults + func(opts *rabbitmq.PublishOptions) { + opts.DeliveryMode = rabbitmq.Persistent + opts.Mandatory = true + opts.ContentType = "application/json" }, - "routing_key", ) if err != nil { log.Fatal(err) diff --git a/publish.go b/publish.go index a0a3c32..7e0342b 100644 --- a/publish.go +++ b/publish.go @@ -54,13 +54,24 @@ type Publisher struct { logger logger } +// PublisherOptions are used to describe a publisher's configuration. +// Logging set to true will enable the consumer to print to stdout +type PublisherOptions struct { + Logging bool +} + // GetPublisher returns a new publisher with an open channel to the cluster. // If you plan to enforce mandatory or immediate publishing, those failures will be reported // on the channel of Returns that you should setup a listener on. // Flow controls are automatically handled as they are sent from the server, and publishing // will fail with an error when the server is requesting a slowdown -func GetPublisher(url string, logging bool) (Publisher, <-chan Return, error) { - chManager, err := newChannelManager(url, logging) +func GetPublisher(url string, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error) { + options := &PublisherOptions{} + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + + chManager, err := newChannelManager(url, options.Logging) if err != nil { return Publisher{}, nil, err } @@ -70,6 +81,7 @@ func GetPublisher(url string, logging bool) (Publisher, <-chan Return, error) { notifyFlowChan: make(chan bool), disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, + logger: logger{logging: options.Logging}, } returnAMQPChan := make(chan amqp.Return) @@ -91,31 +103,35 @@ func GetPublisher(url string, logging bool) (Publisher, <-chan Return, error) { } // Publish publishes the provided data to the given routing keys over the connection -func (publisher *Publisher) Publish(data []byte, publishOptions *PublishOptions, routingKeys ...string) error { +func (publisher *Publisher) Publish( + data []byte, + routingKeys []string, + optionFuncs ...func(*PublishOptions), +) error { publisher.disablePublishDueToFlowMux.RLock() if publisher.disablePublishDueToFlow { return fmt.Errorf("publishing blocked due to high flow on the server") } publisher.disablePublishDueToFlowMux.RUnlock() - defaults := getDefaultPublishOptions() - finalOptions := PublishOptions{} - if publishOptions == nil { - finalOptions = defaults - } else { - finalOptions = fillInPublishDefaults(*publishOptions) + options := &PublishOptions{} + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + if options.DeliveryMode == 0 { + options.DeliveryMode = Transient } for _, routingKey := range routingKeys { err := publisher.chManager.channel.Publish( - finalOptions.Exchange, + options.Exchange, routingKey, - finalOptions.Mandatory, - finalOptions.Immediate, + options.Mandatory, + options.Immediate, amqp.Publishing{ - ContentType: finalOptions.ContentType, + ContentType: options.ContentType, Body: data, - DeliveryMode: finalOptions.DeliveryMode, + DeliveryMode: options.DeliveryMode, }) if err != nil { return err @@ -124,17 +140,6 @@ func (publisher *Publisher) Publish(data []byte, publishOptions *PublishOptions, return nil } -// getDefaultPublishOptions - -func getDefaultPublishOptions() PublishOptions { - return PublishOptions{ - Exchange: "", - Mandatory: false, - Immediate: false, - ContentType: "", - DeliveryMode: Transient, - } -} - func (publisher *Publisher) startNotifyFlowHandler() { for ok := range publisher.notifyFlowChan { publisher.disablePublishDueToFlowMux.Lock() @@ -148,12 +153,3 @@ func (publisher *Publisher) startNotifyFlowHandler() { publisher.logger.Println("resuming publishing due to flow request from server") } } - -// fillInPublishDefaults completes in any fields we're sure weren't set with their defaults -func fillInPublishDefaults(publishOptions PublishOptions) PublishOptions { - defaults := getDefaultPublishOptions() - if publishOptions.DeliveryMode == 0 { - publishOptions.DeliveryMode = defaults.DeliveryMode - } - return publishOptions -}