diff --git a/consume.go b/consume.go index 277032a..a0fc1dd 100644 --- a/consume.go +++ b/consume.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "fmt" "time" "github.com/streadway/amqp" @@ -107,11 +108,27 @@ type ConsumeOptions struct { ConsumerArgs Table } +// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values. +func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions { + if options.BindingExchange == nil { + options.BindingExchange = &BindingExchangeOptions{ + Name: "", + Kind: "direct", + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + ExchangeArgs: nil, + } + } + return options.BindingExchange +} + // BindingExchangeOptions are used when binding to an exchange. // it will verify the exchange is created before binding to it. type BindingExchangeOptions struct { Name string - Type string + Kind string Durable bool AutoDelete bool Internal bool @@ -157,18 +174,52 @@ func WithConsumeOptionsQuorum(options *ConsumeOptions) { options.QueueArgs["x-queue-type"] = "quorum" } -// WithConsumeOptionsBindingExchange returns a function that sets the exchange the queue will be bound to -func WithConsumeOptionsBindingExchange(name, kind string, durable, autoDelete, internal, noWait bool, args Table) func(*ConsumeOptions) { +// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to +func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - options.BindingExchange = &BindingExchangeOptions{ - Name: name, - Type: kind, - Durable: durable, - AutoDelete: autoDelete, - Internal: internal, - NoWait: noWait, - ExchangeArgs: args, - } + getBindingExchangeOptionsOrSetDefault(options).Name = name + } +} + +// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type +func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Kind = kind + } +} + +// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag +func WithConsumeOptionsBindingExchangeDurable(durable bool) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Durable = durable + } +} + +// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag +func WithConsumeOptionsBindingExchangeAutoDelete(autoDelete bool) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).AutoDelete = autoDelete + } +} + +// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag +func WithConsumeOptionsBindingExchangeInternal(internal bool) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Internal = internal + } +} + +// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag +func WithConsumeOptionsBindingExchangeNoWait(noWait bool) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).NoWait = noWait + } +} + +// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange +func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args } } @@ -322,9 +373,12 @@ func (consumer Consumer) startGoroutines( if consumeOptions.BindingExchange != nil { exchange := consumeOptions.BindingExchange + if exchange.Name == "" { + return fmt.Errorf("binding to exchange but name not specified") + } err = consumer.chManager.channel.ExchangeDeclare( exchange.Name, - exchange.Type, + exchange.Kind, exchange.Durable, exchange.AutoDelete, exchange.Internal, diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 63f9aff..4569a3d 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -25,7 +25,9 @@ func main() { rabbitmq.WithConsumeOptionsConcurrency(10), rabbitmq.WithConsumeOptionsQueueDurable, rabbitmq.WithConsumeOptionsQuorum, - rabbitmq.WithConsumeOptionsBindingExchange("events", "topic", true, false, false, true, nil), + rabbitmq.WithConsumeOptionsBindingExchangeName("events"), + rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"), + rabbitmq.WithConsumeOptionsBindingExchangeDurable(true), ) if err != nil { log.Fatal(err)