Browse Source

declare exchange before binding

pull/8/head
Miguel Bautista 5 years ago
parent
commit
b23ceb0d79
2 changed files with 70 additions and 14 deletions
  1. +67
    -13
      consume.go
  2. +3
    -1
      examples/consumer/main.go

+ 67
- 13
consume.go View File

@ -1,6 +1,7 @@
package rabbitmq package rabbitmq
import ( import (
"fmt"
"time" "time"
"github.com/streadway/amqp" "github.com/streadway/amqp"
@ -107,11 +108,27 @@ type ConsumeOptions struct {
ConsumerArgs Table ConsumerArgs Table
} }
// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values.
func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions {
if options.BindingExchange == nil {
options.BindingExchange = &BindingExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
ExchangeArgs: nil,
}
}
return options.BindingExchange
}
// BindingExchangeOptions are used when binding to an exchange. // BindingExchangeOptions are used when binding to an exchange.
// it will verify the exchange is created before binding to it. // it will verify the exchange is created before binding to it.
type BindingExchangeOptions struct { type BindingExchangeOptions struct {
Name string Name string
Type string
Kind string
Durable bool Durable bool
AutoDelete bool AutoDelete bool
Internal bool Internal bool
@ -157,18 +174,52 @@ func WithConsumeOptionsQuorum(options *ConsumeOptions) {
options.QueueArgs["x-queue-type"] = "quorum" options.QueueArgs["x-queue-type"] = "quorum"
} }
// WithConsumeOptionsBindingExchange returns a function that sets the exchange the queue will be bound to
func WithConsumeOptionsBindingExchange(name, kind string, durable, autoDelete, internal, noWait bool, args Table) func(*ConsumeOptions) {
// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to
func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) { return func(options *ConsumeOptions) {
options.BindingExchange = &BindingExchangeOptions{
Name: name,
Type: kind,
Durable: durable,
AutoDelete: autoDelete,
Internal: internal,
NoWait: noWait,
ExchangeArgs: args,
}
getBindingExchangeOptionsOrSetDefault(options).Name = name
}
}
// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type
func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Kind = kind
}
}
// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag
func WithConsumeOptionsBindingExchangeDurable(durable bool) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Durable = durable
}
}
// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
func WithConsumeOptionsBindingExchangeAutoDelete(autoDelete bool) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).AutoDelete = autoDelete
}
}
// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag
func WithConsumeOptionsBindingExchangeInternal(internal bool) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Internal = internal
}
}
// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag
func WithConsumeOptionsBindingExchangeNoWait(noWait bool) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).NoWait = noWait
}
}
// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args
} }
} }
@ -322,9 +373,12 @@ func (consumer Consumer) startGoroutines(
if consumeOptions.BindingExchange != nil { if consumeOptions.BindingExchange != nil {
exchange := consumeOptions.BindingExchange exchange := consumeOptions.BindingExchange
if exchange.Name == "" {
return fmt.Errorf("binding to exchange but name not specified")
}
err = consumer.chManager.channel.ExchangeDeclare( err = consumer.chManager.channel.ExchangeDeclare(
exchange.Name, exchange.Name,
exchange.Type,
exchange.Kind,
exchange.Durable, exchange.Durable,
exchange.AutoDelete, exchange.AutoDelete,
exchange.Internal, exchange.Internal,


+ 3
- 1
examples/consumer/main.go View File

@ -25,7 +25,9 @@ func main() {
rabbitmq.WithConsumeOptionsConcurrency(10), rabbitmq.WithConsumeOptionsConcurrency(10),
rabbitmq.WithConsumeOptionsQueueDurable, rabbitmq.WithConsumeOptionsQueueDurable,
rabbitmq.WithConsumeOptionsQuorum, rabbitmq.WithConsumeOptionsQuorum,
rabbitmq.WithConsumeOptionsBindingExchange("events", "topic", true, false, false, true, nil),
rabbitmq.WithConsumeOptionsBindingExchangeName("events"),
rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"),
rabbitmq.WithConsumeOptionsBindingExchangeDurable(true),
) )
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)


Loading…
Cancel
Save