Browse Source

add options functions

pull/4/head v0.3.0
lane-c-wagner 5 years ago
parent
commit
effbe2c75a
5 changed files with 163 additions and 42 deletions
  1. +8
    -18
      README.md
  2. +104
    -0
      consume.go
  3. +4
    -10
      examples/consumer/main.go
  4. +4
    -10
      examples/publisher/main.go
  5. +43
    -4
      publish.go

+ 8
- 18
README.md View File

@ -53,10 +53,7 @@ if err != nil {
```go
consumer, err := rabbitmq.NewConsumer(
"amqp://user:pass@localhost",
// can pass nothing for no logging
func(opts *rabbitmq.ConsumerOptions) {
opts.Logging = true
},
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
@ -69,12 +66,9 @@ err = consumer.StartConsuming(
},
"my_queue",
[]string{"routing_key1", "routing_key2"},
// can pass nothing here for defaults
func(opts *rabbitmq.ConsumeOptions) {
opts.QueueDurable = true
opts.Concurrency = 10
opts.QOSPrefetch = 100
},
rabbitmq.WithConsumeOptionsConcurrency(10),
rabbitmq.WithConsumeOptionsQueueDurable,
rabbitmq.WithConsumeOptionsQuorum,
)
if err != nil {
log.Fatal(err)
@ -102,9 +96,7 @@ if err != nil {
publisher, returns, err := rabbitmq.NewPublisher(
"amqp://user:pass@localhost",
// can pass nothing for no logging
func(opts *rabbitmq.PublisherOptions) {
opts.Logging = true
},
rabbitmq.WithPublisherOptionsLogging,
)
if err != nil {
log.Fatal(err)
@ -113,11 +105,9 @@ err = publisher.Publish(
[]byte("hello, world"),
[]string{"routing_key"},
// leave blank for defaults
func(opts *rabbitmq.PublishOptions) {
opts.DeliveryMode = rabbitmq.Persistent
opts.Mandatory = true
opts.ContentType = "application/json"
},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
)
if err != nil {
log.Fatal(err)


+ 104
- 0
consume.go View File

@ -43,6 +43,11 @@ func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, e
return consumer, nil
}
// WithConsumerOptionsLogging sets logging to true on the consumer options
func WithConsumerOptionsLogging(options *ConsumerOptions) {
options.Logging = true
}
// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
func getDefaultConsumeOptions() ConsumeOptions {
return ConsumeOptions{
@ -87,6 +92,105 @@ type ConsumeOptions struct {
ConsumerArgs Table
}
// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges
func WithConsumeOptionsQueueDurable(options *ConsumeOptions) {
options.QueueDurable = true
}
// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will
// be deleted when there are no more conusmers on it
func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) {
options.QueueAutoDelete = true
}
// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means
// it's are only accessible by the connection that declares it and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting to declare, bind, consume, purge or
// delete a queue with the same name.
func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) {
options.QueueExclusive = true
}
// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means
// the queue will assume to be declared on the server. A
// channel exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) {
options.QueueNoWait = true
}
// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes
// in the cluster will have the messages distributed amongst them for higher reliability
func WithConsumeOptionsQuorum(options *ConsumeOptions) {
if options.QueueArgs == nil {
options.QueueArgs = Table{}
}
options.QueueArgs["x-queue-type"] = "quorum"
}
// WithConsumeOptionsBindingExchange returns a function that sets the exchange the queue will be bound to
func WithConsumeOptionsBindingExchange(exchange string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.BindingExchange = exchange
}
}
// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound
// the channel will not be closed with an error.
func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) {
options.BindingNoWait = true
}
// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that
// many goroutines will be spawned to run the provided handler on messages
func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.Concurrency = concurrency
}
}
// WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that
// many messages will be fetched from the server in advance to help with throughput.
// This doesn't affect the handler, messages are still processed one at a time.
func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.QOSPrefetch = prefetchCount
}
}
// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means
// these QOS settings apply to ALL existing and future
// consumers on all channels on the same connection
func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) {
options.QOSGlobal = true
}
// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer
// if unset a random name will be given
func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.ConsumerName = consumerName
}
}
// WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means
// the server will ensure that this is the sole consumer
// from this queue. When exclusive is false, the server will fairly distribute
// deliveries across multiple consumers.
func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) {
options.ConsumerExclusive = true
}
// WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means
// it does not wait for the server to confirm the request and
// immediately begin deliveries. If it is not possible to consume, a channel
// exception will be raised and the channel will be closed.
func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) {
options.ConsumerNoWait = true
}
// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency".
// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s).
// The provided handler is called once for each message. If the provided queue doesn't exist, it


+ 4
- 10
examples/consumer/main.go View File

@ -9,10 +9,7 @@ import (
func main() {
consumer, err := rabbitmq.NewConsumer(
"amqp://user:pass@localhost",
// can pass nothing for no logging
func(opts *rabbitmq.ConsumerOptions) {
opts.Logging = true
},
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
@ -25,12 +22,9 @@ func main() {
},
"my_queue",
[]string{"routing_key1", "routing_key2"},
// can pass nothing here for defaults
func(opts *rabbitmq.ConsumeOptions) {
opts.QueueDurable = true
opts.Concurrency = 10
opts.QOSPrefetch = 100
},
rabbitmq.WithConsumeOptionsConcurrency(10),
rabbitmq.WithConsumeOptionsQueueDurable,
rabbitmq.WithConsumeOptionsQuorum,
)
if err != nil {
log.Fatal(err)


+ 4
- 10
examples/publisher/main.go View File

@ -9,10 +9,7 @@ import (
func main() {
publisher, returns, err := rabbitmq.NewPublisher(
"amqp://user:pass@localhost",
// can pass nothing for no logging
func(opts *rabbitmq.PublisherOptions) {
opts.Logging = true
},
rabbitmq.WithPublisherOptionsLogging,
)
if err != nil {
log.Fatal(err)
@ -20,12 +17,9 @@ func main() {
err = publisher.Publish(
[]byte("hello, world"),
[]string{"routing_key"},
// leave blank for defaults
func(opts *rabbitmq.PublishOptions) {
opts.DeliveryMode = rabbitmq.Persistent
opts.Mandatory = true
opts.ContentType = "application/json"
},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
)
if err != nil {
log.Fatal(err)


+ 43
- 4
publish.go View File

@ -7,13 +7,13 @@ import (
"github.com/streadway/amqp"
)
// DeliveryMode. Transient means higher throughput but messages will not be
// restored on broker restart. The delivery mode of publishings is unrelated
// to the durability of the queues they reside on. Transient messages will
// DeliveryMode. Transient means higher throughput but messages will not be
// restored on broker restart. The delivery mode of publishings is unrelated
// to the durability of the queues they reside on. Transient messages will
// not be restored to durable queues, persistent messages will be restored to
// durable queues and lost on non-durable queues during server restart.
//
// This remains typed as uint8 to match Publishing.DeliveryMode. Other
// This remains typed as uint8 to match Publishing.DeliveryMode. Other
// delivery modes specific to custom queue implementations are not enumerated
// here.
const (
@ -42,6 +42,40 @@ type PublishOptions struct {
DeliveryMode uint8
}
// WithPublishOptionsExchange returns a function that sets the exchange to publish to
func WithPublishOptionsExchange(exchange string) func(*PublishOptions) {
return func(options *PublishOptions) {
options.Exchange = exchange
}
}
// WithPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not
// bound to the routing key a message will be sent back on the returns channel for you to handle
func WithPublishOptionsMandatory(options *PublishOptions) {
options.Mandatory = true
}
// WithPublishOptionsImmediate makes the publishing immediate, which means when a consumer is not available
// to immediately handle the new message, a message will be sent back on the returns channel for you to handle
func WithPublishOptionsImmediate(options *PublishOptions) {
options.Immediate = true
}
// WithPublishOptionsContentType returns a function that sets the content type, i.e. "application/json"
func WithPublishOptionsContentType(contentType string) func(*PublishOptions) {
return func(options *PublishOptions) {
options.ContentType = contentType
}
}
// WithPublishOptionsPersistentDelivery sets the message to persist. Transient messages will
// not be restored to durable queues, persistent messages will be restored to
// durable queues and lost on non-durable queues during server restart. By default publishings
// are transient
func WithPublishOptionsPersistentDelivery(options *PublishOptions) {
options.DeliveryMode = Persistent
}
// Publisher allows you to publish messages safely across an open connection
type Publisher struct {
chManager *channelManager
@ -60,6 +94,11 @@ type PublisherOptions struct {
Logging bool
}
// WithPublisherOptionsLogging sets logging to true on the consumer options
func WithPublisherOptionsLogging(options *PublisherOptions) {
options.Logging = true
}
// NewPublisher returns a new publisher with an open channel to the cluster.
// If you plan to enforce mandatory or immediate publishing, those failures will be reported
// on the channel of Returns that you should setup a listener on.


Loading…
Cancel
Save