Browse Source

improve the handling of queue, exchange and bindings declaration, fixes #43

pull/80/head
Christoph Haas 4 years ago
parent
commit
3d51e9d75a
11 changed files with 633 additions and 222 deletions
  1. +53
    -7
      README.md
  2. +9
    -58
      consume.go
  3. +18
    -151
      consume_options.go
  4. +1
    -0
      consume_test.go
  5. +373
    -0
      declare.go
  6. +0
    -6
      examples/consumer/main.go
  7. +1
    -0
      examples/consumer_with_declare/.gitignore
  8. +74
    -0
      examples/consumer_with_declare/main.go
  9. +1
    -0
      examples/publisher_with_declare/.gitignore
  10. +87
    -0
      examples/publisher_with_declare/main.go
  11. +16
    -0
      publish.go

+ 53
- 7
README.md View File

@ -48,7 +48,6 @@ err = consumer.StartConsuming(
return rabbitmq.Ack
},
"my_queue",
[]string{"routing_key1", "routing_key2"}
)
if err != nil {
log.Fatal(err)
@ -74,13 +73,7 @@ err = consumer.StartConsuming(
return rabbitmq.Ack
},
"my_queue",
[]string{"routing_key", "routing_key_2"},
rabbitmq.WithConsumeOptionsConcurrency(10),
rabbitmq.WithConsumeOptionsQueueDurable,
rabbitmq.WithConsumeOptionsQuorum,
rabbitmq.WithConsumeOptionsBindingExchangeName("events"),
rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"),
rabbitmq.WithConsumeOptionsBindingExchangeDurable,
rabbitmq.WithConsumeOptionsConsumerName(consumerName),
)
if err != nil {
@ -137,6 +130,59 @@ go func() {
}()
```
## 🚀 Quick Start Queue, Exchange and Binding Declaration
### Consumer
```go
consumer, err := rabbitmq.NewConsumer("amqp://user:pass@localhost", rabbitmq.Config{})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue",
rabbitmq.WithConsumeDeclareOptions(
rabbitmq.WithDeclareQueueDurable,
rabbitmq.WithDeclareQueueQuorum,
rabbitmq.WithDeclareExchangeName("events"),
rabbitmq.WithDeclareExchangeKind("topic"),
rabbitmq.WithDeclareExchangeDurable,
rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}),
),
)
if err != nil {
log.Fatal(err)
}
```
### Publisher
```go
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherDeclareOptions(
rabbitmq.WithDeclareQueueName("my_queue"),
rabbitmq.WithDeclareQueueDurable,
rabbitmq.WithDeclareQueueQuorum,
rabbitmq.WithDeclareExchangeName("events"),
rabbitmq.WithDeclareExchangeKind("topic"),
rabbitmq.WithDeclareExchangeDurable,
rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}),
),
)
if err != nil {
log.Fatal(err)
}
defer publisher.Close()
```
## Other usage examples
See the [examples](examples) directory for more ideas.


+ 9
- 58
consume.go View File

@ -85,16 +85,15 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) {
}
// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency".
// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s).
// Each goroutine spawns a handler that consumes off of the given queue which binds to the routing key(s).
// The provided handler is called once for each message. If the provided queue doesn't exist, it
// will be created on the cluster
func (consumer Consumer) StartConsuming(
handler Handler,
queue string,
routingKeys []string,
optionFuncs ...func(*ConsumeOptions),
) error {
defaultOptions := getDefaultConsumeOptions()
defaultOptions := getDefaultConsumeOptions(queue)
options := &defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
@ -102,8 +101,6 @@ func (consumer Consumer) StartConsuming(
err := consumer.startGoroutines(
handler,
queue,
routingKeys,
*options,
)
if err != nil {
@ -115,8 +112,6 @@ func (consumer Consumer) StartConsuming(
consumer.logger.Infof("successful recovery from: %v", err)
err = consumer.startGoroutines(
handler,
queue,
routingKeys,
*options,
)
if err != nil {
@ -139,61 +134,17 @@ func (consumer Consumer) Close() error {
// that will consume from the queue
func (consumer Consumer) startGoroutines(
handler Handler,
queue string,
routingKeys []string,
consumeOptions ConsumeOptions,
) error {
consumer.chManager.channelMux.RLock()
defer consumer.chManager.channelMux.RUnlock()
if consumeOptions.QueueDeclare {
_, err := consumer.chManager.channel.QueueDeclare(
queue,
consumeOptions.QueueDurable,
consumeOptions.QueueAutoDelete,
consumeOptions.QueueExclusive,
consumeOptions.QueueNoWait,
tableToAMQPTable(consumeOptions.QueueArgs),
)
if err != nil {
return err
}
err := handleDeclare(consumer.chManager, consumeOptions.DeclareOptions)
if err != nil {
return fmt.Errorf("declare failed: %w", err)
}
if consumeOptions.BindingExchange != nil {
exchange := consumeOptions.BindingExchange
if exchange.Name == "" {
return fmt.Errorf("binding to exchange but name not specified")
}
if exchange.Declare {
err := consumer.chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.ExchangeArgs),
)
if err != nil {
return err
}
}
for _, routingKey := range routingKeys {
err := consumer.chManager.channel.QueueBind(
queue,
routingKey,
exchange.Name,
consumeOptions.BindingNoWait,
tableToAMQPTable(consumeOptions.BindingArgs),
)
if err != nil {
return err
}
}
}
consumer.chManager.channelMux.RLock()
defer consumer.chManager.channelMux.RUnlock()
err := consumer.chManager.channel.Qos(
err = consumer.chManager.channel.Qos(
consumeOptions.QOSPrefetch,
0,
consumeOptions.QOSGlobal,
@ -203,7 +154,7 @@ func (consumer Consumer) startGoroutines(
}
msgs, err := consumer.chManager.channel.Consume(
queue,
consumeOptions.QueueName,
consumeOptions.ConsumerName,
consumeOptions.ConsumerAutoAck,
consumeOptions.ConsumerExclusive,


+ 18
- 151
consume_options.go View File

@ -1,17 +1,9 @@
package rabbitmq
// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
func getDefaultConsumeOptions() ConsumeOptions {
// getDefaultConsumeOptions describes the options that will be used when a value isn't provided
func getDefaultConsumeOptions(queue string) ConsumeOptions {
return ConsumeOptions{
QueueDurable: false,
QueueAutoDelete: false,
QueueExclusive: false,
QueueNoWait: false,
QueueDeclare: true,
QueueArgs: nil,
BindingExchange: nil,
BindingNoWait: false,
BindingArgs: nil,
QueueName: queue,
Concurrency: 1,
QOSPrefetch: 0,
QOSGlobal: false,
@ -26,15 +18,8 @@ func getDefaultConsumeOptions() ConsumeOptions {
// ConsumeOptions are used to describe how a new consumer will be created.
type ConsumeOptions struct {
QueueDurable bool
QueueAutoDelete bool
QueueExclusive bool
QueueNoWait bool
QueueDeclare bool
QueueArgs Table
BindingExchange *BindingExchangeOptions
BindingNoWait bool
BindingArgs Table
DeclareOptions
QueueName string
Concurrency int
QOSPrefetch int
QOSGlobal bool
@ -46,135 +31,24 @@ type ConsumeOptions struct {
ConsumerArgs Table
}
// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values.
func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions {
if options.BindingExchange == nil {
options.BindingExchange = &BindingExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
ExchangeArgs: nil,
Declare: true,
}
}
return options.BindingExchange
}
// BindingExchangeOptions are used when binding to an exchange.
// it will verify the exchange is created before binding to it.
type BindingExchangeOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
ExchangeArgs Table
Declare bool
}
// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges
func WithConsumeOptionsQueueDurable(options *ConsumeOptions) {
options.QueueDurable = true
}
// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will
// be deleted when there are no more conusmers on it
func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) {
options.QueueAutoDelete = true
}
// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means
// it's are only accessible by the connection that declares it and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting to declare, bind, consume, purge or
// delete a queue with the same name.
func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) {
options.QueueExclusive = true
}
// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means
// the queue will assume to be declared on the server. A
// channel exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) {
options.QueueNoWait = true
}
// WithConsumeOptionsQueueNoDeclare sets the queue to no declare, which means
// the queue will be assumed to be declared on the server, and won't be
// declared at all.
func WithConsumeOptionsQueueNoDeclare(options *ConsumeOptions) {
options.QueueDeclare = false
}
// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes
// in the cluster will have the messages distributed amongst them for higher reliability
func WithConsumeOptionsQuorum(options *ConsumeOptions) {
if options.QueueArgs == nil {
options.QueueArgs = Table{}
}
options.QueueArgs["x-queue-type"] = "quorum"
}
// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to
func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) {
// WithConsumeDeclareOptions allows to set declare options that can be used to set up queue, exchange or bindings
// before the consumer process starts.
func WithConsumeDeclareOptions(declareOptionsFuncs ...func(options *DeclareOptions)) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Name = name
}
}
// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type
func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Kind = kind
}
}
// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag
func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Durable = true
}
// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true
}
// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag
func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Internal = true
}
// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag
func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).NoWait = true
}
for _, declareOption := range declareOptionsFuncs {
// If a queue was set to declare, ensure that the queue name is set.
if options.DeclareOptions.Queue != nil {
if options.DeclareOptions.Queue.Name == "" {
options.DeclareOptions.Queue.Name = options.QueueName
}
}
declareOption(&options.DeclareOptions)
}
// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args
}
}
// WithConsumeOptionsBindingExchangeSkipDeclare returns a function that skips the declaration of the
// binding exchange. Use this setting if the exchange already exists and you don't need to declare
// it on consumer start.
func WithConsumeOptionsBindingExchangeSkipDeclare(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Declare = false
}
// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound
// the channel will not be closed with an error.
func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) {
options.BindingNoWait = true
}
// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that
// many goroutines will be spawned to run the provided handler on messages
func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) {
@ -230,10 +104,3 @@ func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) {
func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) {
options.ConsumerNoWait = true
}
// WithConsumeOptionsQueueArgs returns a function that sets the queue arguments
func WithConsumeOptionsQueueArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.QueueArgs = args
}
}

+ 1
- 0
consume_test.go View File

@ -0,0 +1 @@
package rabbitmq

+ 373
- 0
declare.go View File

@ -0,0 +1,373 @@
package rabbitmq
import "fmt"
// DeclareOptions are used to describe how a new queues, exchanges the routing setup should look like.
type DeclareOptions struct {
Queue *QueueOptions
Exchange *ExchangeOptions
Bindings []Binding
}
// QueueOptions are used to configure a queue.
// If the Passive flag is set the client will only check if the queue exists on the server
// and that the settings match, no creation attempt will be made.
type QueueOptions struct {
Name string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Passive bool // if false, a missing queue will be created on the server
Args Table
}
// ExchangeOptions are used to configure an exchange.
// If the Passive flag is set the client will only check if the exchange exists on the server
// and that the settings match, no creation attempt will be made.
type ExchangeOptions struct {
Name string
Kind string // possible values: empty string for default exchange or direct, topic, fanout
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Passive bool // if false, a missing exchange will be created on the server
Args Table
}
// BindingOption are used to configure a queue bindings.
type BindingOption struct {
NoWait bool
Args Table
}
// Binding describes a queue binding to a specific exchange.
type Binding struct {
BindingOption
QueueName string
ExchangeName string
RoutingKey string
}
// SetBindings trys to generate bindings for the given routing keys and the queue and exchange options.
// If either Queue or Exchange properties are empty or no queue name is specified, no bindings will be set.
func (o *DeclareOptions) SetBindings(routingKeys []string, opt BindingOption) {
if o.Queue == nil || o.Exchange == nil {
return // nothing to set...
}
if o.Queue.Name == "" {
return // nothing to set...
}
for _, routingKey := range routingKeys {
o.Bindings = append(o.Bindings, Binding{
QueueName: o.Queue.Name,
ExchangeName: o.Exchange.Name,
RoutingKey: routingKey,
BindingOption: opt,
})
}
}
// handleDeclare handles the queue, exchange and binding declare process on the server.
// If there are no options set, no actions will be executed.
func handleDeclare(chManager *channelManager, options DeclareOptions) error {
chManager.channelMux.RLock()
defer chManager.channelMux.RUnlock()
// bind queue
if options.Queue != nil {
queue := options.Queue
if queue.Name == "" {
return fmt.Errorf("missing queue name")
}
if queue.Passive {
_, err := chManager.channel.QueueDeclarePassive(
queue.Name,
queue.Durable,
queue.AutoDelete,
queue.Exclusive,
queue.NoWait,
tableToAMQPTable(queue.Args),
)
if err != nil {
return err
}
} else {
_, err := chManager.channel.QueueDeclare(
queue.Name,
queue.Durable,
queue.AutoDelete,
queue.Exclusive,
queue.NoWait,
tableToAMQPTable(queue.Args),
)
if err != nil {
return err
}
}
}
// bind exchange
if options.Exchange != nil {
exchange := options.Exchange
if exchange.Name == "" {
return fmt.Errorf("missing exchange name")
}
if exchange.Passive {
err := chManager.channel.ExchangeDeclarePassive(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.Args),
)
if err != nil {
return err
}
} else {
err := chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.Args),
)
if err != nil {
return err
}
}
}
// handle binding of queues to exchange
for _, binding := range options.Bindings {
err := chManager.channel.QueueBind(
binding.QueueName, // name of the queue
binding.RoutingKey, // bindingKey
binding.ExchangeName, // sourceExchange
binding.NoWait, // noWait
tableToAMQPTable(binding.Args), // arguments
)
if err != nil {
return err
}
}
return nil
}
// getExchangeOptionsOrSetDefault returns pointer to current ExchangeOptions options.
// If no exchange options are set yet, new options with default values will be defined.
func getExchangeOptionsOrSetDefault(options *DeclareOptions) *ExchangeOptions {
if options.Exchange == nil {
options.Exchange = &ExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Args: nil,
Passive: false,
}
}
return options.Exchange
}
// getQueueOptionsOrSetDefault returns pointer to current QueueOptions options.
// If no queue options are set yet, new options with default values will be defined.
func getQueueOptionsOrSetDefault(options *DeclareOptions) *QueueOptions {
if options.Queue == nil {
options.Queue = &QueueOptions{
Name: "",
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Passive: false,
Args: nil,
}
}
return options.Queue
}
// region general-options
// WithDeclareQueue sets the queue that should be declared prior to other RabbitMQ actions are being executed.
// Only the settings will be validated if the queue already exists on the server.
// Matching settings will result in no action, different settings will result in an error.
// If the 'Passive' property is set to false, a missing queue will be created on the server.
func WithDeclareQueue(settings *QueueOptions) func(*DeclareOptions) {
return func(options *DeclareOptions) {
options.Queue = settings
}
}
// WithDeclareExchange sets the exchange that should be declared prior to other RabbitMQ actions are being executed.
// Only the settings will be validated if the exchange already exists on the server.
// Matching settings will result in no action, different settings will result in an error.
// If the 'Passive' property is set to false, a missing exchange will be created on the server.
func WithDeclareExchange(settings *ExchangeOptions) func(*DeclareOptions) {
return func(options *DeclareOptions) {
options.Exchange = settings
}
}
// WithDeclareBindings sets the bindings that should be declared prior to other RabbitMQ actions are being executed.
// Only the settings will be validated if one of the bindings already exists on the server.
// Matching settings will result in no action, different settings will result in an error.
// If the 'Passive' property is set to false, missing bindings will be created on the server.
func WithDeclareBindings(bindings []Binding) func(*DeclareOptions) {
return func(options *DeclareOptions) {
options.Bindings = bindings
}
}
// WithDeclareBindingsForRoutingKeys sets the bindings that should be declared prior to other RabbitMQ
// actions are being executed.
// This function must be called after the queue and exchange declaration settings have been set,
// otherwise this function has no effect.
func WithDeclareBindingsForRoutingKeys(routingKeys []string) func(*DeclareOptions) {
return func(options *DeclareOptions) {
options.SetBindings(routingKeys, BindingOption{})
}
}
// endregion general-options
// region single-options
// WithDeclareQueueName returns a function that sets the queue name.
func WithDeclareQueueName(name string) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Name = name
}
}
// WithDeclareQueueDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges.
func WithDeclareQueueDurable(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Durable = true
}
// WithDeclareQueueAutoDelete sets the queue to auto delete, which means it will
// be deleted when there are no more consumers on it.
func WithDeclareQueueAutoDelete(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).AutoDelete = true
}
// WithDeclareQueueExclusive sets the queue to exclusive, which means
// it's are only accessible by the connection that declares it and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting to declare, bind, consume, purge or
// delete a queue with the same name.
func WithDeclareQueueExclusive(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Exclusive = true
}
// WithDeclareQueueNoWait sets the queue to nowait, which means
// the queue will assume to be declared on the server. A channel
// exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
func WithDeclareQueueNoWait(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).NoWait = true
}
// WithDeclareQueueNoDeclare sets the queue to no declare, which means
// the queue will be assumed to be declared on the server, and thus only will be validated.
func WithDeclareQueueNoDeclare(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Passive = true
}
// WithDeclareQueueArgs returns a function that sets the queue arguments.
func WithDeclareQueueArgs(args Table) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Args = args
}
}
// WithDeclareQueueQuorum sets the queue a quorum type, which means multiple nodes
// in the cluster will have the messages distributed amongst them for higher reliability.
func WithDeclareQueueQuorum(options *DeclareOptions) {
queue := getQueueOptionsOrSetDefault(options)
if queue.Args == nil {
queue.Args = Table{}
}
queue.Args["x-queue-type"] = "quorum"
}
// WithDeclareExchangeName returns a function that sets the exchange name.
func WithDeclareExchangeName(name string) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Name = name
}
}
// WithDeclareExchangeKind returns a function that sets the binding exchange kind/type.
func WithDeclareExchangeKind(kind string) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Kind = kind
}
}
// WithDeclareExchangeDurable returns a function that sets the binding exchange durable flag.
func WithDeclareExchangeDurable(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Durable = true
}
// WithDeclareExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag.
func WithDeclareExchangeAutoDelete(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).AutoDelete = true
}
// WithDeclareExchangeInternal returns a function that sets the binding exchange internal flag.
func WithDeclareExchangeInternal(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Internal = true
}
// WithDeclareExchangeNoWait returns a function that sets the binding exchange noWait flag.
func WithDeclareExchangeNoWait(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).NoWait = true
}
// WithDeclareExchangeArgs returns a function that sets the binding exchange arguments
// that are specific to the server's implementation of the exchange.
func WithDeclareExchangeArgs(args Table) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Args = args
}
}
// WithDeclareExchangeNoDeclare returns a function that skips the declaration of the
// binding exchange. Use this setting if the exchange already exists and you don't need to declare
// it on consumer start.
func WithDeclareExchangeNoDeclare(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Passive = true
}
// WithDeclareBindingNoWait sets the bindings to nowait, which means if the queue can not be bound
// the channel will not be closed with an error.
// This function must be called after bindings have been defined, otherwise it has no effect.
func WithDeclareBindingNoWait(options *ConsumeOptions) {
for i := range options.Bindings {
options.Bindings[i].NoWait = true
}
}
// WithDeclareBindingArgs sets the arguments of the bindings to args.
// This function must be called after bindings have been defined, otherwise it has no effect.
func WithDeclareBindingArgs(args Table) func(*DeclareOptions) {
return func(options *DeclareOptions) {
for i := range options.Bindings {
options.Bindings[i].Args = args
}
}
}
// endregion single-options

+ 0
- 6
examples/consumer/main.go View File

@ -34,13 +34,7 @@ func main() {
return rabbitmq.Ack
},
"my_queue",
[]string{"routing_key", "routing_key_2"},
rabbitmq.WithConsumeOptionsConcurrency(10),
rabbitmq.WithConsumeOptionsQueueDurable,
rabbitmq.WithConsumeOptionsQuorum,
rabbitmq.WithConsumeOptionsBindingExchangeName("events"),
rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"),
rabbitmq.WithConsumeOptionsBindingExchangeDurable,
rabbitmq.WithConsumeOptionsConsumerName(consumerName),
)
if err != nil {


+ 1
- 0
examples/consumer_with_declare/.gitignore View File

@ -0,0 +1 @@
consumer_with_declare

+ 74
- 0
examples/consumer_with_declare/main.go View File

@ -0,0 +1,74 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
var consumerName = "example_with_declare"
func main() {
consumer, err := rabbitmq.NewConsumer(
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer func() {
err := consumer.Close()
if err != nil {
log.Fatal(err)
}
}()
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue",
rabbitmq.WithConsumeDeclareOptions(
rabbitmq.WithDeclareQueueDurable,
rabbitmq.WithDeclareQueueQuorum,
rabbitmq.WithDeclareExchangeName("events"),
rabbitmq.WithDeclareExchangeKind("topic"),
rabbitmq.WithDeclareExchangeDurable,
rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}), // implicit bindings
rabbitmq.WithDeclareBindings([]rabbitmq.Binding{ // custom bindings
{
QueueName: "my_queue",
ExchangeName: "events",
RoutingKey: "a_custom_key",
},
}),
),
rabbitmq.WithConsumeOptionsConsumerName(consumerName),
)
if err != nil {
log.Fatal(err)
}
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println()
fmt.Println(sig)
done <- true
}()
fmt.Println("awaiting signal")
<-done
fmt.Println("stopping consumer")
}

+ 1
- 0
examples/publisher_with_declare/.gitignore View File

@ -0,0 +1 @@
publisher_with_declare

+ 87
- 0
examples/publisher_with_declare/main.go View File

@ -0,0 +1,87 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
func main() {
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherDeclareOptions(
rabbitmq.WithDeclareQueueName("my_queue"),
rabbitmq.WithDeclareQueueDurable,
rabbitmq.WithDeclareQueueQuorum,
rabbitmq.WithDeclareExchangeName("events"),
rabbitmq.WithDeclareExchangeKind("topic"),
rabbitmq.WithDeclareExchangeDurable,
rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}),
),
)
if err != nil {
log.Fatal(err)
}
defer func() {
err := publisher.Close()
if err != nil {
log.Fatal(err)
}
}()
returns := publisher.NotifyReturn()
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))
}
}()
confirmations := publisher.NotifyPublish()
go func() {
for c := range confirmations {
log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack)
}
}()
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println()
fmt.Println(sig)
done <- true
}()
fmt.Println("awaiting signal")
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
err = publisher.Publish(
[]byte("hello, world"),
[]string{"routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Println(err)
}
case <-done:
fmt.Println("stopping publisher")
return
}
}
}

+ 16
- 0
publish.go View File

@ -56,10 +56,21 @@ type Publisher struct {
// PublisherOptions are used to describe a publisher's configuration.
// Logger is a custom logging interface.
type PublisherOptions struct {
DeclareOptions
Logger Logger
ReconnectInterval time.Duration
}
// WithPublisherDeclareOptions allows to set declare options that can be used to set up queue, exchange or bindings
// before the publisher process starts.
func WithPublisherDeclareOptions(declareOptionsFuncs ...func(options *DeclareOptions)) func(*PublisherOptions) {
return func(options *PublisherOptions) {
for _, declareOption := range declareOptionsFuncs {
declareOption(&options.DeclareOptions)
}
}
}
// WithPublisherOptionsReconnectInterval sets the interval at which the publisher will
// attempt to reconnect to the rabbit server
func WithPublisherOptionsReconnectInterval(reconnectInterval time.Duration) func(options *PublisherOptions) {
@ -112,6 +123,11 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio
notifyPublishChan: nil,
}
err = handleDeclare(chManager, options.DeclareOptions)
if err != nil {
return nil, fmt.Errorf("declare failed: %w", err)
}
go publisher.startNotifyFlowHandler()
go publisher.startNotifyBlockedHandler()


Loading…
Cancel
Save