diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f11b75 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea/ diff --git a/consume.go b/consume.go index 2013509..277032a 100644 --- a/consume.go +++ b/consume.go @@ -71,7 +71,7 @@ func getDefaultConsumeOptions() ConsumeOptions { QueueExclusive: false, QueueNoWait: false, QueueArgs: nil, - BindingExchange: "", + BindingExchange: nil, BindingNoWait: false, BindingArgs: nil, Concurrency: 1, @@ -93,7 +93,7 @@ type ConsumeOptions struct { QueueExclusive bool QueueNoWait bool QueueArgs Table - BindingExchange string + BindingExchange *BindingExchangeOptions BindingNoWait bool BindingArgs Table Concurrency int @@ -107,6 +107,18 @@ type ConsumeOptions struct { ConsumerArgs Table } +// BindingExchangeOptions are used when binding to an exchange. +// it will verify the exchange is created before binding to it. +type BindingExchangeOptions struct { + Name string + Type string + Durable bool + AutoDelete bool + Internal bool + NoWait bool + ExchangeArgs Table +} + // WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't // be destroyed when the server restarts. It must only be bound to durable exchanges func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { @@ -146,9 +158,17 @@ func WithConsumeOptionsQuorum(options *ConsumeOptions) { } // WithConsumeOptionsBindingExchange returns a function that sets the exchange the queue will be bound to -func WithConsumeOptionsBindingExchange(exchange string) func(*ConsumeOptions) { +func WithConsumeOptionsBindingExchange(name, kind string, durable, autoDelete, internal, noWait bool, args Table) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - options.BindingExchange = exchange + options.BindingExchange = &BindingExchangeOptions{ + Name: name, + Type: kind, + Durable: durable, + AutoDelete: autoDelete, + Internal: internal, + NoWait: noWait, + ExchangeArgs: args, + } } } @@ -300,12 +320,25 @@ func (consumer Consumer) startGoroutines( return err } - if consumeOptions.BindingExchange != "" { + if consumeOptions.BindingExchange != nil { + exchange := consumeOptions.BindingExchange + err = consumer.chManager.channel.ExchangeDeclare( + exchange.Name, + exchange.Type, + exchange.Durable, + exchange.AutoDelete, + exchange.Internal, + exchange.NoWait, + tableToAMQPTable(exchange.ExchangeArgs), + ) + if err != nil { + return err + } for _, routingKey := range routingKeys { err = consumer.chManager.channel.QueueBind( queue, routingKey, - consumeOptions.BindingExchange, + exchange.Name, consumeOptions.BindingNoWait, tableToAMQPTable(consumeOptions.BindingArgs), ) diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 355dc29..63f9aff 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -25,7 +25,7 @@ func main() { rabbitmq.WithConsumeOptionsConcurrency(10), rabbitmq.WithConsumeOptionsQueueDurable, rabbitmq.WithConsumeOptionsQuorum, - rabbitmq.WithConsumeOptionsBindingExchange("events"), + rabbitmq.WithConsumeOptionsBindingExchange("events", "topic", true, false, false, true, nil), ) if err != nil { log.Fatal(err)