Browse Source

Merge pull request #81 from prolicht-dev/simpler_bindings

Simpler bindings / Declare process | Part 1
pull/95/head
Lane Wagner 3 years ago
committed by GitHub
parent
commit
a842287596
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 507 additions and 223 deletions
  1. +32
    -8
      README.md
  2. +9
    -58
      consume.go
  3. +18
    -151
      consume_options.go
  4. +373
    -0
      declare.go
  5. +0
    -6
      examples/consumer/main.go
  6. +1
    -0
      examples/consumer_with_declare/.gitignore
  7. +74
    -0
      examples/consumer_with_declare/main.go

+ 32
- 8
README.md View File

@ -10,7 +10,7 @@ Supported by [Boot.dev](https://boot.dev)
[Streadway's AMQP](https://github.com/rabbitmq/amqp091-go) library is currently the most robust and well-supported Go client I'm aware of. It's a fantastic option and I recommend starting there and seeing if it fulfills your needs. Their project has made an effort to stay within the scope of the AMQP protocol, as such, no reconnection logic and few ease-of-use abstractions are provided.
### Goal
### Goal
The goal with `go-rabbitmq` is to provide *most* (but not all) of the nitty-gritty functionality of Streadway's AMQP, but to make it easier to work with via a higher-level API. `go-rabbitmq` is also built specifically for Rabbit, not for the AMQP protocol. In particular we want:
@ -48,7 +48,6 @@ err = consumer.StartConsuming(
return rabbitmq.Ack
},
"my_queue",
[]string{"routing_key1", "routing_key2"}
)
if err != nil {
log.Fatal(err)
@ -74,13 +73,7 @@ err = consumer.StartConsuming(
return rabbitmq.Ack
},
"my_queue",
[]string{"routing_key", "routing_key_2"},
rabbitmq.WithConsumeOptionsConcurrency(10),
rabbitmq.WithConsumeOptionsQueueDurable,
rabbitmq.WithConsumeOptionsQuorum,
rabbitmq.WithConsumeOptionsBindingExchangeName("events"),
rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"),
rabbitmq.WithConsumeOptionsBindingExchangeDurable,
rabbitmq.WithConsumeOptionsConsumerName(consumerName),
)
if err != nil {
@ -137,6 +130,37 @@ go func() {
}()
```
## 🚀 Quick Start Queue, Exchange and Binding Declaration
### Consumer
```go
consumer, err := rabbitmq.NewConsumer("amqp://user:pass@localhost", rabbitmq.Config{})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue",
rabbitmq.WithConsumeDeclareOptions(
rabbitmq.WithDeclareQueueDurable,
rabbitmq.WithDeclareQueueQuorum,
rabbitmq.WithDeclareExchangeName("events"),
rabbitmq.WithDeclareExchangeKind("topic"),
rabbitmq.WithDeclareExchangeDurable,
rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}),
),
)
if err != nil {
log.Fatal(err)
}
```
## Other usage examples
See the [examples](examples) directory for more ideas.


+ 9
- 58
consume.go View File

@ -85,16 +85,15 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) {
}
// StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency".
// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s).
// Each goroutine spawns a handler that consumes off of the given queue which binds to the routing key(s).
// The provided handler is called once for each message. If the provided queue doesn't exist, it
// will be created on the cluster
func (consumer Consumer) StartConsuming(
handler Handler,
queue string,
routingKeys []string,
optionFuncs ...func(*ConsumeOptions),
) error {
defaultOptions := getDefaultConsumeOptions()
defaultOptions := getDefaultConsumeOptions(queue)
options := &defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
@ -102,8 +101,6 @@ func (consumer Consumer) StartConsuming(
err := consumer.startGoroutines(
handler,
queue,
routingKeys,
*options,
)
if err != nil {
@ -115,8 +112,6 @@ func (consumer Consumer) StartConsuming(
consumer.logger.Infof("successful recovery from: %v", err)
err = consumer.startGoroutines(
handler,
queue,
routingKeys,
*options,
)
if err != nil {
@ -139,61 +134,17 @@ func (consumer Consumer) Close() error {
// that will consume from the queue
func (consumer Consumer) startGoroutines(
handler Handler,
queue string,
routingKeys []string,
consumeOptions ConsumeOptions,
) error {
consumer.chManager.channelMux.RLock()
defer consumer.chManager.channelMux.RUnlock()
if consumeOptions.QueueDeclare {
_, err := consumer.chManager.channel.QueueDeclare(
queue,
consumeOptions.QueueDurable,
consumeOptions.QueueAutoDelete,
consumeOptions.QueueExclusive,
consumeOptions.QueueNoWait,
tableToAMQPTable(consumeOptions.QueueArgs),
)
if err != nil {
return err
}
err := handleDeclare(consumer.chManager, consumeOptions.DeclareOptions)
if err != nil {
return fmt.Errorf("declare failed: %w", err)
}
if consumeOptions.BindingExchange != nil {
exchange := consumeOptions.BindingExchange
if exchange.Name == "" {
return fmt.Errorf("binding to exchange but name not specified")
}
if exchange.Declare {
err := consumer.chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.ExchangeArgs),
)
if err != nil {
return err
}
}
for _, routingKey := range routingKeys {
err := consumer.chManager.channel.QueueBind(
queue,
routingKey,
exchange.Name,
consumeOptions.BindingNoWait,
tableToAMQPTable(consumeOptions.BindingArgs),
)
if err != nil {
return err
}
}
}
consumer.chManager.channelMux.RLock()
defer consumer.chManager.channelMux.RUnlock()
err := consumer.chManager.channel.Qos(
err = consumer.chManager.channel.Qos(
consumeOptions.QOSPrefetch,
0,
consumeOptions.QOSGlobal,
@ -203,7 +154,7 @@ func (consumer Consumer) startGoroutines(
}
msgs, err := consumer.chManager.channel.Consume(
queue,
consumeOptions.QueueName,
consumeOptions.ConsumerName,
consumeOptions.ConsumerAutoAck,
consumeOptions.ConsumerExclusive,


+ 18
- 151
consume_options.go View File

@ -1,17 +1,9 @@
package rabbitmq
// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
func getDefaultConsumeOptions() ConsumeOptions {
// getDefaultConsumeOptions describes the options that will be used when a value isn't provided
func getDefaultConsumeOptions(queue string) ConsumeOptions {
return ConsumeOptions{
QueueDurable: false,
QueueAutoDelete: false,
QueueExclusive: false,
QueueNoWait: false,
QueueDeclare: true,
QueueArgs: nil,
BindingExchange: nil,
BindingNoWait: false,
BindingArgs: nil,
QueueName: queue,
Concurrency: 1,
QOSPrefetch: 0,
QOSGlobal: false,
@ -26,15 +18,8 @@ func getDefaultConsumeOptions() ConsumeOptions {
// ConsumeOptions are used to describe how a new consumer will be created.
type ConsumeOptions struct {
QueueDurable bool
QueueAutoDelete bool
QueueExclusive bool
QueueNoWait bool
QueueDeclare bool
QueueArgs Table
BindingExchange *BindingExchangeOptions
BindingNoWait bool
BindingArgs Table
DeclareOptions
QueueName string
Concurrency int
QOSPrefetch int
QOSGlobal bool
@ -46,135 +31,24 @@ type ConsumeOptions struct {
ConsumerArgs Table
}
// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values.
func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions {
if options.BindingExchange == nil {
options.BindingExchange = &BindingExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
ExchangeArgs: nil,
Declare: true,
}
}
return options.BindingExchange
}
// BindingExchangeOptions are used when binding to an exchange.
// it will verify the exchange is created before binding to it.
type BindingExchangeOptions struct {
Name string
Kind string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
ExchangeArgs Table
Declare bool
}
// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges
func WithConsumeOptionsQueueDurable(options *ConsumeOptions) {
options.QueueDurable = true
}
// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will
// be deleted when there are no more conusmers on it
func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) {
options.QueueAutoDelete = true
}
// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means
// it's are only accessible by the connection that declares it and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting to declare, bind, consume, purge or
// delete a queue with the same name.
func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) {
options.QueueExclusive = true
}
// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means
// the queue will assume to be declared on the server. A
// channel exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) {
options.QueueNoWait = true
}
// WithConsumeOptionsQueueNoDeclare sets the queue to no declare, which means
// the queue will be assumed to be declared on the server, and won't be
// declared at all.
func WithConsumeOptionsQueueNoDeclare(options *ConsumeOptions) {
options.QueueDeclare = false
}
// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes
// in the cluster will have the messages distributed amongst them for higher reliability
func WithConsumeOptionsQuorum(options *ConsumeOptions) {
if options.QueueArgs == nil {
options.QueueArgs = Table{}
}
options.QueueArgs["x-queue-type"] = "quorum"
}
// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to
func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) {
// WithConsumeDeclareOptions allows to set declare options that can be used to set up queue, exchange or bindings
// before the consumer process starts.
func WithConsumeDeclareOptions(declareOptionsFuncs ...func(options *DeclareOptions)) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Name = name
}
}
// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type
func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Kind = kind
}
}
// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag
func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Durable = true
}
// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag
func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true
}
// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag
func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Internal = true
}
// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag
func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).NoWait = true
}
for _, declareOption := range declareOptionsFuncs {
// If a queue was set to declare, ensure that the queue name is set.
if options.DeclareOptions.Queue != nil {
if options.DeclareOptions.Queue.Name == "" {
options.DeclareOptions.Queue.Name = options.QueueName
}
}
declareOption(&options.DeclareOptions)
}
// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange
func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args
}
}
// WithConsumeOptionsBindingExchangeSkipDeclare returns a function that skips the declaration of the
// binding exchange. Use this setting if the exchange already exists and you don't need to declare
// it on consumer start.
func WithConsumeOptionsBindingExchangeSkipDeclare(options *ConsumeOptions) {
getBindingExchangeOptionsOrSetDefault(options).Declare = false
}
// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound
// the channel will not be closed with an error.
func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) {
options.BindingNoWait = true
}
// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that
// many goroutines will be spawned to run the provided handler on messages
func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) {
@ -230,10 +104,3 @@ func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) {
func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) {
options.ConsumerNoWait = true
}
// WithConsumeOptionsQueueArgs returns a function that sets the queue arguments
func WithConsumeOptionsQueueArgs(args Table) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.QueueArgs = args
}
}

+ 373
- 0
declare.go View File

@ -0,0 +1,373 @@
package rabbitmq
import "fmt"
// DeclareOptions are used to describe how a new queues, exchanges the routing setup should look like.
type DeclareOptions struct {
Queue *QueueOptions
Exchange *ExchangeOptions
Bindings []Binding
}
// QueueOptions are used to configure a queue.
// If the Passive flag is set the client will only check if the queue exists on the server
// and that the settings match, no creation attempt will be made.
type QueueOptions struct {
Name string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Passive bool // if false, a missing queue will be created on the server
Args Table
}
// ExchangeOptions are used to configure an exchange.
// If the Passive flag is set the client will only check if the exchange exists on the server
// and that the settings match, no creation attempt will be made.
type ExchangeOptions struct {
Name string
Kind string // possible values: empty string for default exchange or direct, topic, fanout
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Passive bool // if false, a missing exchange will be created on the server
Args Table
}
// BindingOption are used to configure a queue bindings.
type BindingOption struct {
NoWait bool
Args Table
}
// Binding describes a queue binding to a specific exchange.
type Binding struct {
BindingOption
QueueName string
ExchangeName string
RoutingKey string
}
// SetBindings trys to generate bindings for the given routing keys and the queue and exchange options.
// If either Queue or Exchange properties are empty or no queue name is specified, no bindings will be set.
func (o *DeclareOptions) SetBindings(routingKeys []string, opt BindingOption) {
if o.Queue == nil || o.Exchange == nil {
return // nothing to set...
}
if o.Queue.Name == "" {
return // nothing to set...
}
for _, routingKey := range routingKeys {
o.Bindings = append(o.Bindings, Binding{
QueueName: o.Queue.Name,
ExchangeName: o.Exchange.Name,
RoutingKey: routingKey,
BindingOption: opt,
})
}
}
// handleDeclare handles the queue, exchange and binding declare process on the server.
// If there are no options set, no actions will be executed.
func handleDeclare(chManager *channelManager, options DeclareOptions) error {
chManager.channelMux.RLock()
defer chManager.channelMux.RUnlock()
// bind queue
if options.Queue != nil {
queue := options.Queue
if queue.Name == "" {
return fmt.Errorf("missing queue name")
}
if queue.Passive {
_, err := chManager.channel.QueueDeclarePassive(
queue.Name,
queue.Durable,
queue.AutoDelete,
queue.Exclusive,
queue.NoWait,
tableToAMQPTable(queue.Args),
)
if err != nil {
return err
}
} else {
_, err := chManager.channel.QueueDeclare(
queue.Name,
queue.Durable,
queue.AutoDelete,
queue.Exclusive,
queue.NoWait,
tableToAMQPTable(queue.Args),
)
if err != nil {
return err
}
}
}
// bind exchange
if options.Exchange != nil {
exchange := options.Exchange
if exchange.Name == "" {
return fmt.Errorf("missing exchange name")
}
if exchange.Passive {
err := chManager.channel.ExchangeDeclarePassive(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.Args),
)
if err != nil {
return err
}
} else {
err := chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
exchange.AutoDelete,
exchange.Internal,
exchange.NoWait,
tableToAMQPTable(exchange.Args),
)
if err != nil {
return err
}
}
}
// handle binding of queues to exchange
for _, binding := range options.Bindings {
err := chManager.channel.QueueBind(
binding.QueueName, // name of the queue
binding.RoutingKey, // bindingKey
binding.ExchangeName, // sourceExchange
binding.NoWait, // noWait
tableToAMQPTable(binding.Args), // arguments
)
if err != nil {
return err
}
}
return nil
}
// getExchangeOptionsOrSetDefault returns pointer to current ExchangeOptions options.
// If no exchange options are set yet, new options with default values will be defined.
func getExchangeOptionsOrSetDefault(options *DeclareOptions) *ExchangeOptions {
if options.Exchange == nil {
options.Exchange = &ExchangeOptions{
Name: "",
Kind: "direct",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
Args: nil,
Passive: false,
}
}
return options.Exchange
}
// getQueueOptionsOrSetDefault returns pointer to current QueueOptions options.
// If no queue options are set yet, new options with default values will be defined.
func getQueueOptionsOrSetDefault(options *DeclareOptions) *QueueOptions {
if options.Queue == nil {
options.Queue = &QueueOptions{
Name: "",
Durable: false,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Passive: false,
Args: nil,
}
}
return options.Queue
}
// region general-options
// WithDeclareQueue sets the queue that should be declared prior to other RabbitMQ actions are being executed.
// Only the settings will be validated if the queue already exists on the server.
// Matching settings will result in no action, different settings will result in an error.
// If the 'Passive' property is set to false, a missing queue will be created on the server.
func WithDeclareQueue(settings *QueueOptions) func(*DeclareOptions) {
return func(options *DeclareOptions) {
options.Queue = settings
}
}
// WithDeclareExchange sets the exchange that should be declared prior to other RabbitMQ actions are being executed.
// Only the settings will be validated if the exchange already exists on the server.
// Matching settings will result in no action, different settings will result in an error.
// If the 'Passive' property is set to false, a missing exchange will be created on the server.
func WithDeclareExchange(settings *ExchangeOptions) func(*DeclareOptions) {
return func(options *DeclareOptions) {
options.Exchange = settings
}
}
// WithDeclareBindings sets the bindings that should be declared prior to other RabbitMQ actions are being executed.
// Only the settings will be validated if one of the bindings already exists on the server.
// Matching settings will result in no action, different settings will result in an error.
// If the 'Passive' property is set to false, missing bindings will be created on the server.
func WithDeclareBindings(bindings []Binding) func(*DeclareOptions) {
return func(options *DeclareOptions) {
options.Bindings = bindings
}
}
// WithDeclareBindingsForRoutingKeys sets the bindings that should be declared prior to other RabbitMQ
// actions are being executed.
// This function must be called after the queue and exchange declaration settings have been set,
// otherwise this function has no effect.
func WithDeclareBindingsForRoutingKeys(routingKeys []string) func(*DeclareOptions) {
return func(options *DeclareOptions) {
options.SetBindings(routingKeys, BindingOption{})
}
}
// endregion general-options
// region single-options
// WithDeclareQueueName returns a function that sets the queue name.
func WithDeclareQueueName(name string) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Name = name
}
}
// WithDeclareQueueDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges.
func WithDeclareQueueDurable(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Durable = true
}
// WithDeclareQueueAutoDelete sets the queue to auto delete, which means it will
// be deleted when there are no more consumers on it.
func WithDeclareQueueAutoDelete(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).AutoDelete = true
}
// WithDeclareQueueExclusive sets the queue to exclusive, which means
// it's are only accessible by the connection that declares it and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting to declare, bind, consume, purge or
// delete a queue with the same name.
func WithDeclareQueueExclusive(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Exclusive = true
}
// WithDeclareQueueNoWait sets the queue to nowait, which means
// the queue will assume to be declared on the server. A channel
// exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
func WithDeclareQueueNoWait(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).NoWait = true
}
// WithDeclareQueueNoDeclare sets the queue to no declare, which means
// the queue will be assumed to be declared on the server, and thus only will be validated.
func WithDeclareQueueNoDeclare(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Passive = true
}
// WithDeclareQueueArgs returns a function that sets the queue arguments.
func WithDeclareQueueArgs(args Table) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getQueueOptionsOrSetDefault(options).Args = args
}
}
// WithDeclareQueueQuorum sets the queue a quorum type, which means multiple nodes
// in the cluster will have the messages distributed amongst them for higher reliability.
func WithDeclareQueueQuorum(options *DeclareOptions) {
queue := getQueueOptionsOrSetDefault(options)
if queue.Args == nil {
queue.Args = Table{}
}
queue.Args["x-queue-type"] = "quorum"
}
// WithDeclareExchangeName returns a function that sets the exchange name.
func WithDeclareExchangeName(name string) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Name = name
}
}
// WithDeclareExchangeKind returns a function that sets the binding exchange kind/type.
func WithDeclareExchangeKind(kind string) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Kind = kind
}
}
// WithDeclareExchangeDurable returns a function that sets the binding exchange durable flag.
func WithDeclareExchangeDurable(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Durable = true
}
// WithDeclareExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag.
func WithDeclareExchangeAutoDelete(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).AutoDelete = true
}
// WithDeclareExchangeInternal returns a function that sets the binding exchange internal flag.
func WithDeclareExchangeInternal(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Internal = true
}
// WithDeclareExchangeNoWait returns a function that sets the binding exchange noWait flag.
func WithDeclareExchangeNoWait(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).NoWait = true
}
// WithDeclareExchangeArgs returns a function that sets the binding exchange arguments
// that are specific to the server's implementation of the exchange.
func WithDeclareExchangeArgs(args Table) func(*DeclareOptions) {
return func(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Args = args
}
}
// WithDeclareExchangeNoDeclare returns a function that skips the declaration of the
// binding exchange. Use this setting if the exchange already exists and you don't need to declare
// it on consumer start.
func WithDeclareExchangeNoDeclare(options *DeclareOptions) {
getExchangeOptionsOrSetDefault(options).Passive = true
}
// WithDeclareBindingNoWait sets the bindings to nowait, which means if the queue can not be bound
// the channel will not be closed with an error.
// This function must be called after bindings have been defined, otherwise it has no effect.
func WithDeclareBindingNoWait(options *DeclareOptions) {
for i := range options.Bindings {
options.Bindings[i].NoWait = true
}
}
// WithDeclareBindingArgs sets the arguments of the bindings to args.
// This function must be called after bindings have been defined, otherwise it has no effect.
func WithDeclareBindingArgs(args Table) func(*DeclareOptions) {
return func(options *DeclareOptions) {
for i := range options.Bindings {
options.Bindings[i].Args = args
}
}
}
// endregion single-options

+ 0
- 6
examples/consumer/main.go View File

@ -34,13 +34,7 @@ func main() {
return rabbitmq.Ack
},
"my_queue",
[]string{"routing_key", "routing_key_2"},
rabbitmq.WithConsumeOptionsConcurrency(10),
rabbitmq.WithConsumeOptionsQueueDurable,
rabbitmq.WithConsumeOptionsQuorum,
rabbitmq.WithConsumeOptionsBindingExchangeName("events"),
rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"),
rabbitmq.WithConsumeOptionsBindingExchangeDurable,
rabbitmq.WithConsumeOptionsConsumerName(consumerName),
)
if err != nil {


+ 1
- 0
examples/consumer_with_declare/.gitignore View File

@ -0,0 +1 @@
consumer_with_declare

+ 74
- 0
examples/consumer_with_declare/main.go View File

@ -0,0 +1,74 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
var consumerName = "example_with_declare"
func main() {
consumer, err := rabbitmq.NewConsumer(
"amqp://guest:guest@localhost", rabbitmq.Config{},
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer func() {
err := consumer.Close()
if err != nil {
log.Fatal(err)
}
}()
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue",
rabbitmq.WithConsumeDeclareOptions(
rabbitmq.WithDeclareQueueDurable,
rabbitmq.WithDeclareQueueQuorum,
rabbitmq.WithDeclareExchangeName("events"),
rabbitmq.WithDeclareExchangeKind("topic"),
rabbitmq.WithDeclareExchangeDurable,
rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}), // implicit bindings
rabbitmq.WithDeclareBindings([]rabbitmq.Binding{ // custom bindings
{
QueueName: "my_queue",
ExchangeName: "events",
RoutingKey: "a_custom_key",
},
}),
),
rabbitmq.WithConsumeOptionsConsumerName(consumerName),
)
if err != nil {
log.Fatal(err)
}
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println()
fmt.Println(sig)
done <- true
}()
fmt.Println("awaiting signal")
<-done
fmt.Println("stopping consumer")
}

Loading…
Cancel
Save