diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f11b75 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea/ diff --git a/consume.go b/consume.go index 2013509..1c22504 100644 --- a/consume.go +++ b/consume.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "fmt" "time" "github.com/streadway/amqp" @@ -71,7 +72,7 @@ func getDefaultConsumeOptions() ConsumeOptions { QueueExclusive: false, QueueNoWait: false, QueueArgs: nil, - BindingExchange: "", + BindingExchange: nil, BindingNoWait: false, BindingArgs: nil, Concurrency: 1, @@ -93,7 +94,7 @@ type ConsumeOptions struct { QueueExclusive bool QueueNoWait bool QueueArgs Table - BindingExchange string + BindingExchange *BindingExchangeOptions BindingNoWait bool BindingArgs Table Concurrency int @@ -107,6 +108,34 @@ type ConsumeOptions struct { ConsumerArgs Table } +// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values. +func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions { + if options.BindingExchange == nil { + options.BindingExchange = &BindingExchangeOptions{ + Name: "", + Kind: "direct", + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + ExchangeArgs: nil, + } + } + return options.BindingExchange +} + +// BindingExchangeOptions are used when binding to an exchange. +// it will verify the exchange is created before binding to it. +type BindingExchangeOptions struct { + Name string + Kind string + Durable bool + AutoDelete bool + Internal bool + NoWait bool + ExchangeArgs Table +} + // WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't // be destroyed when the server restarts. It must only be bound to durable exchanges func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { @@ -145,10 +174,44 @@ func WithConsumeOptionsQuorum(options *ConsumeOptions) { options.QueueArgs["x-queue-type"] = "quorum" } -// WithConsumeOptionsBindingExchange returns a function that sets the exchange the queue will be bound to -func WithConsumeOptionsBindingExchange(exchange string) func(*ConsumeOptions) { +// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to +func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Name = name + } +} + +// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type +func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Kind = kind + } +} + +// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag +func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Durable = true +} + +// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag +func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true +} + +// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag +func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Internal = true +} + +// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag +func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).NoWait = true +} + +// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange +func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - options.BindingExchange = exchange + getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args } } @@ -300,12 +363,28 @@ func (consumer Consumer) startGoroutines( return err } - if consumeOptions.BindingExchange != "" { + if consumeOptions.BindingExchange != nil { + exchange := consumeOptions.BindingExchange + if exchange.Name == "" { + return fmt.Errorf("binding to exchange but name not specified") + } + err = consumer.chManager.channel.ExchangeDeclare( + exchange.Name, + exchange.Kind, + exchange.Durable, + exchange.AutoDelete, + exchange.Internal, + exchange.NoWait, + tableToAMQPTable(exchange.ExchangeArgs), + ) + if err != nil { + return err + } for _, routingKey := range routingKeys { err = consumer.chManager.channel.QueueBind( queue, routingKey, - consumeOptions.BindingExchange, + exchange.Name, consumeOptions.BindingNoWait, tableToAMQPTable(consumeOptions.BindingArgs), ) diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 355dc29..93c906d 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -25,7 +25,9 @@ func main() { rabbitmq.WithConsumeOptionsConcurrency(10), rabbitmq.WithConsumeOptionsQueueDurable, rabbitmq.WithConsumeOptionsQuorum, - rabbitmq.WithConsumeOptionsBindingExchange("events"), + rabbitmq.WithConsumeOptionsBindingExchangeName("events"), + rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"), + rabbitmq.WithConsumeOptionsBindingExchangeDurable, ) if err != nil { log.Fatal(err)