From 04b8c6a9e8a0e0ee85c34c9ec52fad1e97843cd2 Mon Sep 17 00:00:00 2001 From: Miguel Bautista Date: Mon, 10 May 2021 20:11:21 -0400 Subject: [PATCH 1/4] declare exchange before binding --- .gitignore | 1 + consume.go | 45 +++++++++++++++++++++++++++++++++------ examples/consumer/main.go | 2 +- 3 files changed, 41 insertions(+), 7 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9f11b75 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea/ diff --git a/consume.go b/consume.go index 2013509..277032a 100644 --- a/consume.go +++ b/consume.go @@ -71,7 +71,7 @@ func getDefaultConsumeOptions() ConsumeOptions { QueueExclusive: false, QueueNoWait: false, QueueArgs: nil, - BindingExchange: "", + BindingExchange: nil, BindingNoWait: false, BindingArgs: nil, Concurrency: 1, @@ -93,7 +93,7 @@ type ConsumeOptions struct { QueueExclusive bool QueueNoWait bool QueueArgs Table - BindingExchange string + BindingExchange *BindingExchangeOptions BindingNoWait bool BindingArgs Table Concurrency int @@ -107,6 +107,18 @@ type ConsumeOptions struct { ConsumerArgs Table } +// BindingExchangeOptions are used when binding to an exchange. +// it will verify the exchange is created before binding to it. +type BindingExchangeOptions struct { + Name string + Type string + Durable bool + AutoDelete bool + Internal bool + NoWait bool + ExchangeArgs Table +} + // WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't // be destroyed when the server restarts. It must only be bound to durable exchanges func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { @@ -146,9 +158,17 @@ func WithConsumeOptionsQuorum(options *ConsumeOptions) { } // WithConsumeOptionsBindingExchange returns a function that sets the exchange the queue will be bound to -func WithConsumeOptionsBindingExchange(exchange string) func(*ConsumeOptions) { +func WithConsumeOptionsBindingExchange(name, kind string, durable, autoDelete, internal, noWait bool, args Table) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - options.BindingExchange = exchange + options.BindingExchange = &BindingExchangeOptions{ + Name: name, + Type: kind, + Durable: durable, + AutoDelete: autoDelete, + Internal: internal, + NoWait: noWait, + ExchangeArgs: args, + } } } @@ -300,12 +320,25 @@ func (consumer Consumer) startGoroutines( return err } - if consumeOptions.BindingExchange != "" { + if consumeOptions.BindingExchange != nil { + exchange := consumeOptions.BindingExchange + err = consumer.chManager.channel.ExchangeDeclare( + exchange.Name, + exchange.Type, + exchange.Durable, + exchange.AutoDelete, + exchange.Internal, + exchange.NoWait, + tableToAMQPTable(exchange.ExchangeArgs), + ) + if err != nil { + return err + } for _, routingKey := range routingKeys { err = consumer.chManager.channel.QueueBind( queue, routingKey, - consumeOptions.BindingExchange, + exchange.Name, consumeOptions.BindingNoWait, tableToAMQPTable(consumeOptions.BindingArgs), ) diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 355dc29..63f9aff 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -25,7 +25,7 @@ func main() { rabbitmq.WithConsumeOptionsConcurrency(10), rabbitmq.WithConsumeOptionsQueueDurable, rabbitmq.WithConsumeOptionsQuorum, - rabbitmq.WithConsumeOptionsBindingExchange("events"), + rabbitmq.WithConsumeOptionsBindingExchange("events", "topic", true, false, false, true, nil), ) if err != nil { log.Fatal(err) From b23ceb0d7924c2ab2d10349dd93195ad9fd4f727 Mon Sep 17 00:00:00 2001 From: Miguel Bautista Date: Tue, 11 May 2021 18:44:50 -0400 Subject: [PATCH 2/4] declare exchange before binding --- consume.go | 80 ++++++++++++++++++++++++++++++++------- examples/consumer/main.go | 4 +- 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/consume.go b/consume.go index 277032a..a0fc1dd 100644 --- a/consume.go +++ b/consume.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "fmt" "time" "github.com/streadway/amqp" @@ -107,11 +108,27 @@ type ConsumeOptions struct { ConsumerArgs Table } +// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values. +func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions { + if options.BindingExchange == nil { + options.BindingExchange = &BindingExchangeOptions{ + Name: "", + Kind: "direct", + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + ExchangeArgs: nil, + } + } + return options.BindingExchange +} + // BindingExchangeOptions are used when binding to an exchange. // it will verify the exchange is created before binding to it. type BindingExchangeOptions struct { Name string - Type string + Kind string Durable bool AutoDelete bool Internal bool @@ -157,18 +174,52 @@ func WithConsumeOptionsQuorum(options *ConsumeOptions) { options.QueueArgs["x-queue-type"] = "quorum" } -// WithConsumeOptionsBindingExchange returns a function that sets the exchange the queue will be bound to -func WithConsumeOptionsBindingExchange(name, kind string, durable, autoDelete, internal, noWait bool, args Table) func(*ConsumeOptions) { +// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to +func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - options.BindingExchange = &BindingExchangeOptions{ - Name: name, - Type: kind, - Durable: durable, - AutoDelete: autoDelete, - Internal: internal, - NoWait: noWait, - ExchangeArgs: args, - } + getBindingExchangeOptionsOrSetDefault(options).Name = name + } +} + +// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type +func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Kind = kind + } +} + +// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag +func WithConsumeOptionsBindingExchangeDurable(durable bool) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Durable = durable + } +} + +// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag +func WithConsumeOptionsBindingExchangeAutoDelete(autoDelete bool) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).AutoDelete = autoDelete + } +} + +// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag +func WithConsumeOptionsBindingExchangeInternal(internal bool) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Internal = internal + } +} + +// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag +func WithConsumeOptionsBindingExchangeNoWait(noWait bool) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).NoWait = noWait + } +} + +// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange +func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { + return func(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args } } @@ -322,9 +373,12 @@ func (consumer Consumer) startGoroutines( if consumeOptions.BindingExchange != nil { exchange := consumeOptions.BindingExchange + if exchange.Name == "" { + return fmt.Errorf("binding to exchange but name not specified") + } err = consumer.chManager.channel.ExchangeDeclare( exchange.Name, - exchange.Type, + exchange.Kind, exchange.Durable, exchange.AutoDelete, exchange.Internal, diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 63f9aff..4569a3d 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -25,7 +25,9 @@ func main() { rabbitmq.WithConsumeOptionsConcurrency(10), rabbitmq.WithConsumeOptionsQueueDurable, rabbitmq.WithConsumeOptionsQuorum, - rabbitmq.WithConsumeOptionsBindingExchange("events", "topic", true, false, false, true, nil), + rabbitmq.WithConsumeOptionsBindingExchangeName("events"), + rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"), + rabbitmq.WithConsumeOptionsBindingExchangeDurable(true), ) if err != nil { log.Fatal(err) From 3e6f1b7c78bedd80c9a7e39a60bf3caa244e8f9e Mon Sep 17 00:00:00 2001 From: Miguel Bautista Date: Tue, 11 May 2021 18:47:50 -0400 Subject: [PATCH 3/4] declare exchange before binding --- consume.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/consume.go b/consume.go index a0fc1dd..f23f18c 100644 --- a/consume.go +++ b/consume.go @@ -189,30 +189,30 @@ func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { } // WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag -func WithConsumeOptionsBindingExchangeDurable(durable bool) func(*ConsumeOptions) { +func WithConsumeOptionsBindingExchangeDurable() func(*ConsumeOptions) { return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Durable = durable + getBindingExchangeOptionsOrSetDefault(options).Durable = true } } // WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag -func WithConsumeOptionsBindingExchangeAutoDelete(autoDelete bool) func(*ConsumeOptions) { +func WithConsumeOptionsBindingExchangeAutoDelete() func(*ConsumeOptions) { return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).AutoDelete = autoDelete + getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true } } // WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag -func WithConsumeOptionsBindingExchangeInternal(internal bool) func(*ConsumeOptions) { +func WithConsumeOptionsBindingExchangeInternal() func(*ConsumeOptions) { return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Internal = internal + getBindingExchangeOptionsOrSetDefault(options).Internal = true } } // WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag -func WithConsumeOptionsBindingExchangeNoWait(noWait bool) func(*ConsumeOptions) { +func WithConsumeOptionsBindingExchangeNoWait() func(*ConsumeOptions) { return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).NoWait = noWait + getBindingExchangeOptionsOrSetDefault(options).NoWait = true } } From ba7b786e6346d73ef3c49d75d716ebed38891514 Mon Sep 17 00:00:00 2001 From: Miguel Bautista Date: Tue, 11 May 2021 18:52:21 -0400 Subject: [PATCH 4/4] declare exchange before binding --- consume.go | 24 ++++++++---------------- examples/consumer/main.go | 2 +- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/consume.go b/consume.go index f23f18c..1c22504 100644 --- a/consume.go +++ b/consume.go @@ -189,31 +189,23 @@ func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { } // WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag -func WithConsumeOptionsBindingExchangeDurable() func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Durable = true - } +func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Durable = true } // WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag -func WithConsumeOptionsBindingExchangeAutoDelete() func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true - } +func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true } // WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag -func WithConsumeOptionsBindingExchangeInternal() func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Internal = true - } +func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).Internal = true } // WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag -func WithConsumeOptionsBindingExchangeNoWait() func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).NoWait = true - } +func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) { + getBindingExchangeOptionsOrSetDefault(options).NoWait = true } // WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 4569a3d..93c906d 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -27,7 +27,7 @@ func main() { rabbitmq.WithConsumeOptionsQuorum, rabbitmq.WithConsumeOptionsBindingExchangeName("events"), rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"), - rabbitmq.WithConsumeOptionsBindingExchangeDurable(true), + rabbitmq.WithConsumeOptionsBindingExchangeDurable, ) if err != nil { log.Fatal(err)