From 62371e8dba5801bbcff64383e964b8979e973e91 Mon Sep 17 00:00:00 2001 From: Christoph Haas Date: Tue, 5 Jul 2022 23:41:41 +0200 Subject: [PATCH 1/2] queue, exchange and binding declaration is now simplified and much clearer to use --- channel.go | 4 +- consume.go | 67 +--- consume_options.go | 169 ++-------- declare.go | 373 ++++++++++++++++++++++ examples/consumer/main.go | 6 - examples/consumer_with_declare/.gitignore | 1 + examples/consumer_with_declare/main.go | 74 +++++ 7 files changed, 477 insertions(+), 217 deletions(-) create mode 100644 declare.go create mode 100644 examples/consumer_with_declare/.gitignore create mode 100644 examples/consumer_with_declare/main.go diff --git a/channel.go b/channel.go index 62122ea..2b8f397 100644 --- a/channel.go +++ b/channel.go @@ -103,8 +103,8 @@ func (chManager *channelManager) reconnect() error { return err } - chManager.channel.Close() - chManager.connection.Close() + _ = chManager.channel.Close() + _ = chManager.connection.Close() chManager.connection = newConn chManager.channel = newChannel diff --git a/consume.go b/consume.go index ede1e96..771edf4 100644 --- a/consume.go +++ b/consume.go @@ -85,16 +85,15 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { } // StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". -// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s). +// Each goroutine spawns a handler that consumes off of the given queue which binds to the routing key(s). // The provided handler is called once for each message. If the provided queue doesn't exist, it // will be created on the cluster func (consumer Consumer) StartConsuming( handler Handler, queue string, - routingKeys []string, optionFuncs ...func(*ConsumeOptions), ) error { - defaultOptions := getDefaultConsumeOptions() + defaultOptions := getDefaultConsumeOptions(queue) options := &defaultOptions for _, optionFunc := range optionFuncs { optionFunc(options) @@ -102,8 +101,6 @@ func (consumer Consumer) StartConsuming( err := consumer.startGoroutines( handler, - queue, - routingKeys, *options, ) if err != nil { @@ -115,8 +112,6 @@ func (consumer Consumer) StartConsuming( consumer.logger.Infof("successful recovery from: %v", err) err = consumer.startGoroutines( handler, - queue, - routingKeys, *options, ) if err != nil { @@ -139,61 +134,17 @@ func (consumer Consumer) Close() error { // that will consume from the queue func (consumer Consumer) startGoroutines( handler Handler, - queue string, - routingKeys []string, consumeOptions ConsumeOptions, ) error { - consumer.chManager.channelMux.RLock() - defer consumer.chManager.channelMux.RUnlock() - - if consumeOptions.QueueDeclare { - _, err := consumer.chManager.channel.QueueDeclare( - queue, - consumeOptions.QueueDurable, - consumeOptions.QueueAutoDelete, - consumeOptions.QueueExclusive, - consumeOptions.QueueNoWait, - tableToAMQPTable(consumeOptions.QueueArgs), - ) - if err != nil { - return err - } + err := handleDeclare(consumer.chManager, consumeOptions.DeclareOptions) + if err != nil { + return fmt.Errorf("declare failed: %w", err) } - if consumeOptions.BindingExchange != nil { - exchange := consumeOptions.BindingExchange - if exchange.Name == "" { - return fmt.Errorf("binding to exchange but name not specified") - } - if exchange.Declare { - err := consumer.chManager.channel.ExchangeDeclare( - exchange.Name, - exchange.Kind, - exchange.Durable, - exchange.AutoDelete, - exchange.Internal, - exchange.NoWait, - tableToAMQPTable(exchange.ExchangeArgs), - ) - if err != nil { - return err - } - } - for _, routingKey := range routingKeys { - err := consumer.chManager.channel.QueueBind( - queue, - routingKey, - exchange.Name, - consumeOptions.BindingNoWait, - tableToAMQPTable(consumeOptions.BindingArgs), - ) - if err != nil { - return err - } - } - } + consumer.chManager.channelMux.RLock() + defer consumer.chManager.channelMux.RUnlock() - err := consumer.chManager.channel.Qos( + err = consumer.chManager.channel.Qos( consumeOptions.QOSPrefetch, 0, consumeOptions.QOSGlobal, @@ -203,7 +154,7 @@ func (consumer Consumer) startGoroutines( } msgs, err := consumer.chManager.channel.Consume( - queue, + consumeOptions.QueueName, consumeOptions.ConsumerName, consumeOptions.ConsumerAutoAck, consumeOptions.ConsumerExclusive, diff --git a/consume_options.go b/consume_options.go index 0a66857..fc9e36a 100644 --- a/consume_options.go +++ b/consume_options.go @@ -1,17 +1,9 @@ package rabbitmq -// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided -func getDefaultConsumeOptions() ConsumeOptions { +// getDefaultConsumeOptions describes the options that will be used when a value isn't provided +func getDefaultConsumeOptions(queue string) ConsumeOptions { return ConsumeOptions{ - QueueDurable: false, - QueueAutoDelete: false, - QueueExclusive: false, - QueueNoWait: false, - QueueDeclare: true, - QueueArgs: nil, - BindingExchange: nil, - BindingNoWait: false, - BindingArgs: nil, + QueueName: queue, Concurrency: 1, QOSPrefetch: 0, QOSGlobal: false, @@ -26,15 +18,8 @@ func getDefaultConsumeOptions() ConsumeOptions { // ConsumeOptions are used to describe how a new consumer will be created. type ConsumeOptions struct { - QueueDurable bool - QueueAutoDelete bool - QueueExclusive bool - QueueNoWait bool - QueueDeclare bool - QueueArgs Table - BindingExchange *BindingExchangeOptions - BindingNoWait bool - BindingArgs Table + DeclareOptions + QueueName string Concurrency int QOSPrefetch int QOSGlobal bool @@ -46,135 +31,24 @@ type ConsumeOptions struct { ConsumerArgs Table } -// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values. -func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions { - if options.BindingExchange == nil { - options.BindingExchange = &BindingExchangeOptions{ - Name: "", - Kind: "direct", - Durable: false, - AutoDelete: false, - Internal: false, - NoWait: false, - ExchangeArgs: nil, - Declare: true, - } - } - return options.BindingExchange -} - -// BindingExchangeOptions are used when binding to an exchange. -// it will verify the exchange is created before binding to it. -type BindingExchangeOptions struct { - Name string - Kind string - Durable bool - AutoDelete bool - Internal bool - NoWait bool - ExchangeArgs Table - Declare bool -} - -// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't -// be destroyed when the server restarts. It must only be bound to durable exchanges -func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { - options.QueueDurable = true -} - -// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will -// be deleted when there are no more conusmers on it -func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) { - options.QueueAutoDelete = true -} - -// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means -// it's are only accessible by the connection that declares it and -// will be deleted when the connection closes. Channels on other connections -// will receive an error when attempting to declare, bind, consume, purge or -// delete a queue with the same name. -func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) { - options.QueueExclusive = true -} - -// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means -// the queue will assume to be declared on the server. A -// channel exception will arrive if the conditions are met for existing queues -// or attempting to modify an existing queue from a different connection. -func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { - options.QueueNoWait = true -} - -// WithConsumeOptionsQueueNoDeclare sets the queue to no declare, which means -// the queue will be assumed to be declared on the server, and won't be -// declared at all. -func WithConsumeOptionsQueueNoDeclare(options *ConsumeOptions) { - options.QueueDeclare = false -} - -// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes -// in the cluster will have the messages distributed amongst them for higher reliability -func WithConsumeOptionsQuorum(options *ConsumeOptions) { - if options.QueueArgs == nil { - options.QueueArgs = Table{} - } - options.QueueArgs["x-queue-type"] = "quorum" -} - -// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to -func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) { +// WithConsumeDeclareOptions allows to set declare options that can be used to set up queue, exchange or bindings +// before the consumer process starts. +func WithConsumeDeclareOptions(declareOptionsFuncs ...func(options *DeclareOptions)) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Name = name - } -} - -// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type -func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Kind = kind - } -} - -// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag -func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Durable = true -} - -// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag -func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true -} - -// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag -func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Internal = true -} - -// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag -func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).NoWait = true -} + for _, declareOption := range declareOptionsFuncs { + // If a queue was set to declare, ensure that the queue name is set. + if options.DeclareOptions.Queue != nil { + if options.DeclareOptions.Queue.Name == "" { + options.DeclareOptions.Queue.Name = options.QueueName + } + } + + declareOption(&options.DeclareOptions) + } -// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange -func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args } } -// WithConsumeOptionsBindingExchangeSkipDeclare returns a function that skips the declaration of the -// binding exchange. Use this setting if the exchange already exists and you don't need to declare -// it on consumer start. -func WithConsumeOptionsBindingExchangeSkipDeclare(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Declare = false -} - -// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound -// the channel will not be closed with an error. -func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) { - options.BindingNoWait = true -} - // WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that // many goroutines will be spawned to run the provided handler on messages func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { @@ -230,10 +104,3 @@ func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) { options.ConsumerNoWait = true } - -// WithConsumeOptionsQueueArgs returns a function that sets the queue arguments -func WithConsumeOptionsQueueArgs(args Table) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.QueueArgs = args - } -} diff --git a/declare.go b/declare.go new file mode 100644 index 0000000..35f2621 --- /dev/null +++ b/declare.go @@ -0,0 +1,373 @@ +package rabbitmq + +import "fmt" + +// DeclareOptions are used to describe how a new queues, exchanges the routing setup should look like. +type DeclareOptions struct { + Queue *QueueOptions + Exchange *ExchangeOptions + Bindings []Binding +} + +// QueueOptions are used to configure a queue. +// If the Passive flag is set the client will only check if the queue exists on the server +// and that the settings match, no creation attempt will be made. +type QueueOptions struct { + Name string + Durable bool + AutoDelete bool + Exclusive bool + NoWait bool + Passive bool // if false, a missing queue will be created on the server + Args Table +} + +// ExchangeOptions are used to configure an exchange. +// If the Passive flag is set the client will only check if the exchange exists on the server +// and that the settings match, no creation attempt will be made. +type ExchangeOptions struct { + Name string + Kind string // possible values: empty string for default exchange or direct, topic, fanout + Durable bool + AutoDelete bool + Internal bool + NoWait bool + Passive bool // if false, a missing exchange will be created on the server + Args Table +} + +// BindingOption are used to configure a queue bindings. +type BindingOption struct { + NoWait bool + Args Table +} + +// Binding describes a queue binding to a specific exchange. +type Binding struct { + BindingOption + QueueName string + ExchangeName string + RoutingKey string +} + +// SetBindings trys to generate bindings for the given routing keys and the queue and exchange options. +// If either Queue or Exchange properties are empty or no queue name is specified, no bindings will be set. +func (o *DeclareOptions) SetBindings(routingKeys []string, opt BindingOption) { + if o.Queue == nil || o.Exchange == nil { + return // nothing to set... + } + + if o.Queue.Name == "" { + return // nothing to set... + } + + for _, routingKey := range routingKeys { + o.Bindings = append(o.Bindings, Binding{ + QueueName: o.Queue.Name, + ExchangeName: o.Exchange.Name, + RoutingKey: routingKey, + BindingOption: opt, + }) + } +} + +// handleDeclare handles the queue, exchange and binding declare process on the server. +// If there are no options set, no actions will be executed. +func handleDeclare(chManager *channelManager, options DeclareOptions) error { + chManager.channelMux.RLock() + defer chManager.channelMux.RUnlock() + + // bind queue + if options.Queue != nil { + queue := options.Queue + if queue.Name == "" { + return fmt.Errorf("missing queue name") + } + if queue.Passive { + _, err := chManager.channel.QueueDeclarePassive( + queue.Name, + queue.Durable, + queue.AutoDelete, + queue.Exclusive, + queue.NoWait, + tableToAMQPTable(queue.Args), + ) + if err != nil { + return err + } + } else { + _, err := chManager.channel.QueueDeclare( + queue.Name, + queue.Durable, + queue.AutoDelete, + queue.Exclusive, + queue.NoWait, + tableToAMQPTable(queue.Args), + ) + if err != nil { + return err + } + } + } + + // bind exchange + if options.Exchange != nil { + exchange := options.Exchange + if exchange.Name == "" { + return fmt.Errorf("missing exchange name") + } + if exchange.Passive { + err := chManager.channel.ExchangeDeclarePassive( + exchange.Name, + exchange.Kind, + exchange.Durable, + exchange.AutoDelete, + exchange.Internal, + exchange.NoWait, + tableToAMQPTable(exchange.Args), + ) + if err != nil { + return err + } + } else { + err := chManager.channel.ExchangeDeclare( + exchange.Name, + exchange.Kind, + exchange.Durable, + exchange.AutoDelete, + exchange.Internal, + exchange.NoWait, + tableToAMQPTable(exchange.Args), + ) + if err != nil { + return err + } + } + } + + // handle binding of queues to exchange + for _, binding := range options.Bindings { + err := chManager.channel.QueueBind( + binding.QueueName, // name of the queue + binding.RoutingKey, // bindingKey + binding.ExchangeName, // sourceExchange + binding.NoWait, // noWait + tableToAMQPTable(binding.Args), // arguments + ) + if err != nil { + return err + } + } + + return nil +} + +// getExchangeOptionsOrSetDefault returns pointer to current ExchangeOptions options. +// If no exchange options are set yet, new options with default values will be defined. +func getExchangeOptionsOrSetDefault(options *DeclareOptions) *ExchangeOptions { + if options.Exchange == nil { + options.Exchange = &ExchangeOptions{ + Name: "", + Kind: "direct", + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + Args: nil, + Passive: false, + } + } + return options.Exchange +} + +// getQueueOptionsOrSetDefault returns pointer to current QueueOptions options. +// If no queue options are set yet, new options with default values will be defined. +func getQueueOptionsOrSetDefault(options *DeclareOptions) *QueueOptions { + if options.Queue == nil { + options.Queue = &QueueOptions{ + Name: "", + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Passive: false, + Args: nil, + } + } + return options.Queue +} + +// region general-options + +// WithDeclareQueue sets the queue that should be declared prior to other RabbitMQ actions are being executed. +// Only the settings will be validated if the queue already exists on the server. +// Matching settings will result in no action, different settings will result in an error. +// If the 'Passive' property is set to false, a missing queue will be created on the server. +func WithDeclareQueue(settings *QueueOptions) func(*DeclareOptions) { + return func(options *DeclareOptions) { + options.Queue = settings + } +} + +// WithDeclareExchange sets the exchange that should be declared prior to other RabbitMQ actions are being executed. +// Only the settings will be validated if the exchange already exists on the server. +// Matching settings will result in no action, different settings will result in an error. +// If the 'Passive' property is set to false, a missing exchange will be created on the server. +func WithDeclareExchange(settings *ExchangeOptions) func(*DeclareOptions) { + return func(options *DeclareOptions) { + options.Exchange = settings + } +} + +// WithDeclareBindings sets the bindings that should be declared prior to other RabbitMQ actions are being executed. +// Only the settings will be validated if one of the bindings already exists on the server. +// Matching settings will result in no action, different settings will result in an error. +// If the 'Passive' property is set to false, missing bindings will be created on the server. +func WithDeclareBindings(bindings []Binding) func(*DeclareOptions) { + return func(options *DeclareOptions) { + options.Bindings = bindings + } +} + +// WithDeclareBindingsForRoutingKeys sets the bindings that should be declared prior to other RabbitMQ +// actions are being executed. +// This function must be called after the queue and exchange declaration settings have been set, +// otherwise this function has no effect. +func WithDeclareBindingsForRoutingKeys(routingKeys []string) func(*DeclareOptions) { + return func(options *DeclareOptions) { + options.SetBindings(routingKeys, BindingOption{}) + } +} + +// endregion general-options + +// region single-options + +// WithDeclareQueueName returns a function that sets the queue name. +func WithDeclareQueueName(name string) func(*DeclareOptions) { + return func(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).Name = name + } +} + +// WithDeclareQueueDurable sets the queue to durable, which means it won't +// be destroyed when the server restarts. It must only be bound to durable exchanges. +func WithDeclareQueueDurable(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).Durable = true +} + +// WithDeclareQueueAutoDelete sets the queue to auto delete, which means it will +// be deleted when there are no more consumers on it. +func WithDeclareQueueAutoDelete(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).AutoDelete = true +} + +// WithDeclareQueueExclusive sets the queue to exclusive, which means +// it's are only accessible by the connection that declares it and +// will be deleted when the connection closes. Channels on other connections +// will receive an error when attempting to declare, bind, consume, purge or +// delete a queue with the same name. +func WithDeclareQueueExclusive(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).Exclusive = true +} + +// WithDeclareQueueNoWait sets the queue to nowait, which means +// the queue will assume to be declared on the server. A channel +// exception will arrive if the conditions are met for existing queues +// or attempting to modify an existing queue from a different connection. +func WithDeclareQueueNoWait(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).NoWait = true +} + +// WithDeclareQueueNoDeclare sets the queue to no declare, which means +// the queue will be assumed to be declared on the server, and thus only will be validated. +func WithDeclareQueueNoDeclare(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).Passive = true +} + +// WithDeclareQueueArgs returns a function that sets the queue arguments. +func WithDeclareQueueArgs(args Table) func(*DeclareOptions) { + return func(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).Args = args + } +} + +// WithDeclareQueueQuorum sets the queue a quorum type, which means multiple nodes +// in the cluster will have the messages distributed amongst them for higher reliability. +func WithDeclareQueueQuorum(options *DeclareOptions) { + queue := getQueueOptionsOrSetDefault(options) + if queue.Args == nil { + queue.Args = Table{} + } + queue.Args["x-queue-type"] = "quorum" +} + +// WithDeclareExchangeName returns a function that sets the exchange name. +func WithDeclareExchangeName(name string) func(*DeclareOptions) { + return func(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).Name = name + } +} + +// WithDeclareExchangeKind returns a function that sets the binding exchange kind/type. +func WithDeclareExchangeKind(kind string) func(*DeclareOptions) { + return func(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).Kind = kind + } +} + +// WithDeclareExchangeDurable returns a function that sets the binding exchange durable flag. +func WithDeclareExchangeDurable(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).Durable = true +} + +// WithDeclareExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag. +func WithDeclareExchangeAutoDelete(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).AutoDelete = true +} + +// WithDeclareExchangeInternal returns a function that sets the binding exchange internal flag. +func WithDeclareExchangeInternal(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).Internal = true +} + +// WithDeclareExchangeNoWait returns a function that sets the binding exchange noWait flag. +func WithDeclareExchangeNoWait(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).NoWait = true +} + +// WithDeclareExchangeArgs returns a function that sets the binding exchange arguments +// that are specific to the server's implementation of the exchange. +func WithDeclareExchangeArgs(args Table) func(*DeclareOptions) { + return func(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).Args = args + } +} + +// WithDeclareExchangeNoDeclare returns a function that skips the declaration of the +// binding exchange. Use this setting if the exchange already exists and you don't need to declare +// it on consumer start. +func WithDeclareExchangeNoDeclare(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).Passive = true +} + +// WithDeclareBindingNoWait sets the bindings to nowait, which means if the queue can not be bound +// the channel will not be closed with an error. +// This function must be called after bindings have been defined, otherwise it has no effect. +func WithDeclareBindingNoWait(options *ConsumeOptions) { + for i := range options.Bindings { + options.Bindings[i].NoWait = true + } +} + +// WithDeclareBindingArgs sets the arguments of the bindings to args. +// This function must be called after bindings have been defined, otherwise it has no effect. +func WithDeclareBindingArgs(args Table) func(*DeclareOptions) { + return func(options *DeclareOptions) { + for i := range options.Bindings { + options.Bindings[i].Args = args + } + } +} + +// endregion single-options diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 21d3dad..4b416f8 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -34,13 +34,7 @@ func main() { return rabbitmq.Ack }, "my_queue", - []string{"routing_key", "routing_key_2"}, rabbitmq.WithConsumeOptionsConcurrency(10), - rabbitmq.WithConsumeOptionsQueueDurable, - rabbitmq.WithConsumeOptionsQuorum, - rabbitmq.WithConsumeOptionsBindingExchangeName("events"), - rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"), - rabbitmq.WithConsumeOptionsBindingExchangeDurable, rabbitmq.WithConsumeOptionsConsumerName(consumerName), ) if err != nil { diff --git a/examples/consumer_with_declare/.gitignore b/examples/consumer_with_declare/.gitignore new file mode 100644 index 0000000..e8842b8 --- /dev/null +++ b/examples/consumer_with_declare/.gitignore @@ -0,0 +1 @@ +consumer_with_declare diff --git a/examples/consumer_with_declare/main.go b/examples/consumer_with_declare/main.go new file mode 100644 index 0000000..02540db --- /dev/null +++ b/examples/consumer_with_declare/main.go @@ -0,0 +1,74 @@ +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "syscall" + + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +var consumerName = "example_with_declare" + +func main() { + consumer, err := rabbitmq.NewConsumer( + "amqp://guest:guest@localhost", rabbitmq.Config{}, + rabbitmq.WithConsumerOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer func() { + err := consumer.Close() + if err != nil { + log.Fatal(err) + } + }() + + err = consumer.StartConsuming( + func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }, + "my_queue", + rabbitmq.WithConsumeDeclareOptions( + rabbitmq.WithDeclareQueueDurable, + rabbitmq.WithDeclareQueueQuorum, + rabbitmq.WithDeclareExchangeName("events"), + rabbitmq.WithDeclareExchangeKind("topic"), + rabbitmq.WithDeclareExchangeDurable, + rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}), // implicit bindings + rabbitmq.WithDeclareBindings([]rabbitmq.Binding{ // custom bindings + { + QueueName: "my_queue", + ExchangeName: "events", + RoutingKey: "a_custom_key", + }, + }), + ), + rabbitmq.WithConsumeOptionsConsumerName(consumerName), + ) + if err != nil { + log.Fatal(err) + } + + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() + + fmt.Println("awaiting signal") + <-done + fmt.Println("stopping consumer") +} From 70a7350572caececfb6b3d37fad84a5b3e4863da Mon Sep 17 00:00:00 2001 From: Christoph Haas Date: Tue, 5 Jul 2022 23:41:41 +0200 Subject: [PATCH 2/2] queue, exchange and binding declaration is now simplified and much clearer to use --- README.md | 40 ++++++++++++++++++++++++++++++++-------- declare.go | 2 +- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 738957b..be993de 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Supported by [Boot.dev](https://boot.dev) [Streadway's AMQP](https://github.com/rabbitmq/amqp091-go) library is currently the most robust and well-supported Go client I'm aware of. It's a fantastic option and I recommend starting there and seeing if it fulfills your needs. Their project has made an effort to stay within the scope of the AMQP protocol, as such, no reconnection logic and few ease-of-use abstractions are provided. -### Goal +### Goal The goal with `go-rabbitmq` is to still provide most all of the nitty-gritty functionality of AMQP, but to make it easier to work with via a higher-level API. Particularly: @@ -48,7 +48,6 @@ err = consumer.StartConsuming( return rabbitmq.Ack }, "my_queue", - []string{"routing_key1", "routing_key2"} ) if err != nil { log.Fatal(err) @@ -74,13 +73,7 @@ err = consumer.StartConsuming( return rabbitmq.Ack }, "my_queue", - []string{"routing_key", "routing_key_2"}, rabbitmq.WithConsumeOptionsConcurrency(10), - rabbitmq.WithConsumeOptionsQueueDurable, - rabbitmq.WithConsumeOptionsQuorum, - rabbitmq.WithConsumeOptionsBindingExchangeName("events"), - rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"), - rabbitmq.WithConsumeOptionsBindingExchangeDurable, rabbitmq.WithConsumeOptionsConsumerName(consumerName), ) if err != nil { @@ -137,6 +130,37 @@ go func() { }() ``` +## 🚀 Quick Start Queue, Exchange and Binding Declaration + +### Consumer + +```go +consumer, err := rabbitmq.NewConsumer("amqp://user:pass@localhost", rabbitmq.Config{}) +if err != nil { + log.Fatal(err) +} +defer consumer.Close() +err = consumer.StartConsuming( + func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }, + "my_queue", + rabbitmq.WithConsumeDeclareOptions( + rabbitmq.WithDeclareQueueDurable, + rabbitmq.WithDeclareQueueQuorum, + rabbitmq.WithDeclareExchangeName("events"), + rabbitmq.WithDeclareExchangeKind("topic"), + rabbitmq.WithDeclareExchangeDurable, + rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}), + ), + ) +if err != nil { + log.Fatal(err) +} +``` + ## Other usage examples See the [examples](examples) directory for more ideas. diff --git a/declare.go b/declare.go index 35f2621..84d7fe4 100644 --- a/declare.go +++ b/declare.go @@ -354,7 +354,7 @@ func WithDeclareExchangeNoDeclare(options *DeclareOptions) { // WithDeclareBindingNoWait sets the bindings to nowait, which means if the queue can not be bound // the channel will not be closed with an error. // This function must be called after bindings have been defined, otherwise it has no effect. -func WithDeclareBindingNoWait(options *ConsumeOptions) { +func WithDeclareBindingNoWait(options *DeclareOptions) { for i := range options.Bindings { options.Bindings[i].NoWait = true }