@ -1,6 +1,7 @@
package rabbitmq
import (
"fmt"
"time"
"github.com/streadway/amqp"
@ -71,7 +72,7 @@ func getDefaultConsumeOptions() ConsumeOptions {
QueueExclusive : false ,
QueueNoWait : false ,
QueueArgs : nil ,
BindingExchange : "" ,
BindingExchange : nil ,
BindingNoWait : false ,
BindingArgs : nil ,
Concurrency : 1 ,
@ -93,7 +94,7 @@ type ConsumeOptions struct {
QueueExclusive bool
QueueNoWait bool
QueueArgs Table
BindingExchange string
BindingExchange * BindingExchangeOptions
BindingNoWait bool
BindingArgs Table
Concurrency int
@ -107,6 +108,34 @@ type ConsumeOptions struct {
ConsumerArgs Table
}
// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values.
func getBindingExchangeOptionsOrSetDefault ( options * ConsumeOptions ) * BindingExchangeOptions {
if options . BindingExchange == nil {
options . BindingExchange = & BindingExchangeOptions {
Name : "" ,
Kind : "direct" ,
Durable : false ,
AutoDelete : false ,
Internal : false ,
NoWait : false ,
ExchangeArgs : nil ,
}
}
return options . BindingExchange
}
// BindingExchangeOptions are used when binding to an exchange.
// it will verify the exchange is created before binding to it.
type BindingExchangeOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
ExchangeArgs Table
}
// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges
func WithConsumeOptionsQueueDurable ( options * ConsumeOptions ) {
@ -145,10 +174,44 @@ func WithConsumeOptionsQuorum(options *ConsumeOptions) {
options . QueueArgs [ "x-queue-type" ] = "quorum"
}
// WithConsumeOptionsBindingExchange returns a function that sets the exchange the queue will be bound to
func WithConsumeOptionsBindingExchange ( exchange string ) func ( * ConsumeOptions ) {
// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to
func WithConsumeOptionsBindingExchangeName ( name string ) func ( * ConsumeOptions ) {
return func ( options * ConsumeOptions ) {
getBindingExchangeOptionsOrSetDefault ( options ) . Name = name
}
}
// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type
func WithConsumeOptionsBindingExchangeKind ( kind string ) func ( * ConsumeOptions ) {
return func ( options * ConsumeOptions ) {
getBindingExchangeOptionsOrSetDefault ( options ) . Kind = kind
}
}
// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag
func WithConsumeOptionsBindingExchangeDurable ( options * ConsumeOptions ) {
getBindingExchangeOptionsOrSetDefault ( options ) . Durable = true
}
// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
func WithConsumeOptionsBindingExchangeAutoDelete ( options * ConsumeOptions ) {
getBindingExchangeOptionsOrSetDefault ( options ) . AutoDelete = true
}
// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag
func WithConsumeOptionsBindingExchangeInternal ( options * ConsumeOptions ) {
getBindingExchangeOptionsOrSetDefault ( options ) . Internal = true
}
// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag
func WithConsumeOptionsBindingExchangeNoWait ( options * ConsumeOptions ) {
getBindingExchangeOptionsOrSetDefault ( options ) . NoWait = true
}
// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
func WithConsumeOptionsBindingExchangeArgs ( args Table ) func ( * ConsumeOptions ) {
return func ( options * ConsumeOptions ) {
options . BindingExchange = exchange
getBindingExchangeOptionsOrSetDefault ( options ) . ExchangeArgs = args
}
}
@ -300,12 +363,28 @@ func (consumer Consumer) startGoroutines(
return err
}
if consumeOptions . BindingExchange != "" {
if consumeOptions . BindingExchange != nil {
exchange := consumeOptions . BindingExchange
if exchange . Name == "" {
return fmt . Errorf ( "binding to exchange but name not specified" )
}
err = consumer . chManager . channel . ExchangeDeclare (
exchange . Name ,
exchange . Kind ,
exchange . Durable ,
exchange . AutoDelete ,
exchange . Internal ,
exchange . NoWait ,
tableToAMQPTable ( exchange . ExchangeArgs ) ,
)
if err != nil {
return err
}
for _ , routingKey := range routingKeys {
err = consumer . chManager . channel . QueueBind (
queue ,
routingKey ,
consumeOptions . BindingExchange ,
exchange . Nam e,
consumeOptions . BindingNoWait ,
tableToAMQPTable ( consumeOptions . BindingArgs ) ,
)