diff --git a/README.md b/README.md index 5381e80..1d0d7a7 100644 --- a/README.md +++ b/README.md @@ -53,10 +53,7 @@ if err != nil { ```go consumer, err := rabbitmq.NewConsumer( "amqp://user:pass@localhost", - // can pass nothing for no logging - func(opts *rabbitmq.ConsumerOptions) { - opts.Logging = true - }, + rabbitmq.WithConsumerOptionsLogging, ) if err != nil { log.Fatal(err) @@ -69,12 +66,9 @@ err = consumer.StartConsuming( }, "my_queue", []string{"routing_key1", "routing_key2"}, - // can pass nothing here for defaults - func(opts *rabbitmq.ConsumeOptions) { - opts.QueueDurable = true - opts.Concurrency = 10 - opts.QOSPrefetch = 100 - }, + rabbitmq.WithConsumeOptionsConcurrency(10), + rabbitmq.WithConsumeOptionsQueueDurable, + rabbitmq.WithConsumeOptionsQuorum, ) if err != nil { log.Fatal(err) @@ -102,9 +96,7 @@ if err != nil { publisher, returns, err := rabbitmq.NewPublisher( "amqp://user:pass@localhost", // can pass nothing for no logging - func(opts *rabbitmq.PublisherOptions) { - opts.Logging = true - }, + rabbitmq.WithPublisherOptionsLogging, ) if err != nil { log.Fatal(err) @@ -113,11 +105,9 @@ err = publisher.Publish( []byte("hello, world"), []string{"routing_key"}, // leave blank for defaults - func(opts *rabbitmq.PublishOptions) { - opts.DeliveryMode = rabbitmq.Persistent - opts.Mandatory = true - opts.ContentType = "application/json" - }, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, ) if err != nil { log.Fatal(err) diff --git a/consume.go b/consume.go index db5e48e..f16a386 100644 --- a/consume.go +++ b/consume.go @@ -43,6 +43,11 @@ func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, e return consumer, nil } +// WithConsumerOptionsLogging sets logging to true on the consumer options +func WithConsumerOptionsLogging(options *ConsumerOptions) { + options.Logging = true +} + // getDefaultConsumeOptions descibes the options that will be used when a value isn't provided func getDefaultConsumeOptions() ConsumeOptions { return ConsumeOptions{ @@ -87,6 +92,105 @@ type ConsumeOptions struct { ConsumerArgs Table } +// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't +// be destroyed when the server restarts. It must only be bound to durable exchanges +func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { + options.QueueDurable = true +} + +// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will +// be deleted when there are no more conusmers on it +func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) { + options.QueueAutoDelete = true +} + +// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means +// it's are only accessible by the connection that declares it and +// will be deleted when the connection closes. Channels on other connections +// will receive an error when attempting to declare, bind, consume, purge or +// delete a queue with the same name. +func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) { + options.QueueExclusive = true +} + +// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means +// the queue will assume to be declared on the server. A +// channel exception will arrive if the conditions are met for existing queues +// or attempting to modify an existing queue from a different connection. +func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { + options.QueueNoWait = true +} + +// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes +// in the cluster will have the messages distributed amongst them for higher reliability +func WithConsumeOptionsQuorum(options *ConsumeOptions) { + if options.QueueArgs == nil { + options.QueueArgs = Table{} + } + options.QueueArgs["x-queue-type"] = "quorum" +} + +// WithConsumeOptionsBindingExchange returns a function that sets the exchange the queue will be bound to +func WithConsumeOptionsBindingExchange(exchange string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.BindingExchange = exchange + } +} + +// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound +// the channel will not be closed with an error. +func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) { + options.BindingNoWait = true +} + +// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that +// many goroutines will be spawned to run the provided handler on messages +func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.Concurrency = concurrency + } +} + +// WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that +// many messages will be fetched from the server in advance to help with throughput. +// This doesn't affect the handler, messages are still processed one at a time. +func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.QOSPrefetch = prefetchCount + } +} + +// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means +// these QOS settings apply to ALL existing and future +// consumers on all channels on the same connection +func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) { + options.QOSGlobal = true +} + +// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer +// if unset a random name will be given +func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + options.ConsumerName = consumerName + } +} + +// WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means +// the server will ensure that this is the sole consumer +// from this queue. When exclusive is false, the server will fairly distribute +// deliveries across multiple consumers. +func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { + options.ConsumerExclusive = true +} + +// WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means +// it does not wait for the server to confirm the request and +// immediately begin deliveries. If it is not possible to consume, a channel +// exception will be raised and the channel will be closed. +func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) { + options.ConsumerNoWait = true +} + // StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". // Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s). // The provided handler is called once for each message. If the provided queue doesn't exist, it diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 2e83a37..aa240e4 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -9,10 +9,7 @@ import ( func main() { consumer, err := rabbitmq.NewConsumer( "amqp://user:pass@localhost", - // can pass nothing for no logging - func(opts *rabbitmq.ConsumerOptions) { - opts.Logging = true - }, + rabbitmq.WithConsumerOptionsLogging, ) if err != nil { log.Fatal(err) @@ -25,12 +22,9 @@ func main() { }, "my_queue", []string{"routing_key1", "routing_key2"}, - // can pass nothing here for defaults - func(opts *rabbitmq.ConsumeOptions) { - opts.QueueDurable = true - opts.Concurrency = 10 - opts.QOSPrefetch = 100 - }, + rabbitmq.WithConsumeOptionsConcurrency(10), + rabbitmq.WithConsumeOptionsQueueDurable, + rabbitmq.WithConsumeOptionsQuorum, ) if err != nil { log.Fatal(err) diff --git a/examples/publisher/main.go b/examples/publisher/main.go index 48a3ec3..1b3296d 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -9,10 +9,7 @@ import ( func main() { publisher, returns, err := rabbitmq.NewPublisher( "amqp://user:pass@localhost", - // can pass nothing for no logging - func(opts *rabbitmq.PublisherOptions) { - opts.Logging = true - }, + rabbitmq.WithPublisherOptionsLogging, ) if err != nil { log.Fatal(err) @@ -20,12 +17,9 @@ func main() { err = publisher.Publish( []byte("hello, world"), []string{"routing_key"}, - // leave blank for defaults - func(opts *rabbitmq.PublishOptions) { - opts.DeliveryMode = rabbitmq.Persistent - opts.Mandatory = true - opts.ContentType = "application/json" - }, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, ) if err != nil { log.Fatal(err) diff --git a/publish.go b/publish.go index 88ed32c..f390272 100644 --- a/publish.go +++ b/publish.go @@ -7,13 +7,13 @@ import ( "github.com/streadway/amqp" ) -// DeliveryMode. Transient means higher throughput but messages will not be -// restored on broker restart. The delivery mode of publishings is unrelated -// to the durability of the queues they reside on. Transient messages will +// DeliveryMode. Transient means higher throughput but messages will not be +// restored on broker restart. The delivery mode of publishings is unrelated +// to the durability of the queues they reside on. Transient messages will // not be restored to durable queues, persistent messages will be restored to // durable queues and lost on non-durable queues during server restart. // -// This remains typed as uint8 to match Publishing.DeliveryMode. Other +// This remains typed as uint8 to match Publishing.DeliveryMode. Other // delivery modes specific to custom queue implementations are not enumerated // here. const ( @@ -42,6 +42,40 @@ type PublishOptions struct { DeliveryMode uint8 } +// WithPublishOptionsExchange returns a function that sets the exchange to publish to +func WithPublishOptionsExchange(exchange string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.Exchange = exchange + } +} + +// WithPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not +// bound to the routing key a message will be sent back on the returns channel for you to handle +func WithPublishOptionsMandatory(options *PublishOptions) { + options.Mandatory = true +} + +// WithPublishOptionsImmediate makes the publishing immediate, which means when a consumer is not available +// to immediately handle the new message, a message will be sent back on the returns channel for you to handle +func WithPublishOptionsImmediate(options *PublishOptions) { + options.Immediate = true +} + +// WithPublishOptionsContentType returns a function that sets the content type, i.e. "application/json" +func WithPublishOptionsContentType(contentType string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.ContentType = contentType + } +} + +// WithPublishOptionsPersistentDelivery sets the message to persist. Transient messages will +// not be restored to durable queues, persistent messages will be restored to +// durable queues and lost on non-durable queues during server restart. By default publishings +// are transient +func WithPublishOptionsPersistentDelivery(options *PublishOptions) { + options.DeliveryMode = Persistent +} + // Publisher allows you to publish messages safely across an open connection type Publisher struct { chManager *channelManager @@ -60,6 +94,11 @@ type PublisherOptions struct { Logging bool } +// WithPublisherOptionsLogging sets logging to true on the consumer options +func WithPublisherOptionsLogging(options *PublisherOptions) { + options.Logging = true +} + // NewPublisher returns a new publisher with an open channel to the cluster. // If you plan to enforce mandatory or immediate publishing, those failures will be reported // on the channel of Returns that you should setup a listener on.