diff --git a/channel.go b/channel.go index 62122ea..2b8f397 100644 --- a/channel.go +++ b/channel.go @@ -103,8 +103,8 @@ func (chManager *channelManager) reconnect() error { return err } - chManager.channel.Close() - chManager.connection.Close() + _ = chManager.channel.Close() + _ = chManager.connection.Close() chManager.connection = newConn chManager.channel = newChannel diff --git a/consume.go b/consume.go index ede1e96..771edf4 100644 --- a/consume.go +++ b/consume.go @@ -85,16 +85,15 @@ func WithConsumerOptionsLogger(log Logger) func(options *ConsumerOptions) { } // StartConsuming starts n goroutines where n="ConsumeOptions.QosOptions.Concurrency". -// Each goroutine spawns a handler that consumes off of the qiven queue which binds to the routing key(s). +// Each goroutine spawns a handler that consumes off of the given queue which binds to the routing key(s). // The provided handler is called once for each message. If the provided queue doesn't exist, it // will be created on the cluster func (consumer Consumer) StartConsuming( handler Handler, queue string, - routingKeys []string, optionFuncs ...func(*ConsumeOptions), ) error { - defaultOptions := getDefaultConsumeOptions() + defaultOptions := getDefaultConsumeOptions(queue) options := &defaultOptions for _, optionFunc := range optionFuncs { optionFunc(options) @@ -102,8 +101,6 @@ func (consumer Consumer) StartConsuming( err := consumer.startGoroutines( handler, - queue, - routingKeys, *options, ) if err != nil { @@ -115,8 +112,6 @@ func (consumer Consumer) StartConsuming( consumer.logger.Infof("successful recovery from: %v", err) err = consumer.startGoroutines( handler, - queue, - routingKeys, *options, ) if err != nil { @@ -139,61 +134,17 @@ func (consumer Consumer) Close() error { // that will consume from the queue func (consumer Consumer) startGoroutines( handler Handler, - queue string, - routingKeys []string, consumeOptions ConsumeOptions, ) error { - consumer.chManager.channelMux.RLock() - defer consumer.chManager.channelMux.RUnlock() - - if consumeOptions.QueueDeclare { - _, err := consumer.chManager.channel.QueueDeclare( - queue, - consumeOptions.QueueDurable, - consumeOptions.QueueAutoDelete, - consumeOptions.QueueExclusive, - consumeOptions.QueueNoWait, - tableToAMQPTable(consumeOptions.QueueArgs), - ) - if err != nil { - return err - } + err := handleDeclare(consumer.chManager, consumeOptions.DeclareOptions) + if err != nil { + return fmt.Errorf("declare failed: %w", err) } - if consumeOptions.BindingExchange != nil { - exchange := consumeOptions.BindingExchange - if exchange.Name == "" { - return fmt.Errorf("binding to exchange but name not specified") - } - if exchange.Declare { - err := consumer.chManager.channel.ExchangeDeclare( - exchange.Name, - exchange.Kind, - exchange.Durable, - exchange.AutoDelete, - exchange.Internal, - exchange.NoWait, - tableToAMQPTable(exchange.ExchangeArgs), - ) - if err != nil { - return err - } - } - for _, routingKey := range routingKeys { - err := consumer.chManager.channel.QueueBind( - queue, - routingKey, - exchange.Name, - consumeOptions.BindingNoWait, - tableToAMQPTable(consumeOptions.BindingArgs), - ) - if err != nil { - return err - } - } - } + consumer.chManager.channelMux.RLock() + defer consumer.chManager.channelMux.RUnlock() - err := consumer.chManager.channel.Qos( + err = consumer.chManager.channel.Qos( consumeOptions.QOSPrefetch, 0, consumeOptions.QOSGlobal, @@ -203,7 +154,7 @@ func (consumer Consumer) startGoroutines( } msgs, err := consumer.chManager.channel.Consume( - queue, + consumeOptions.QueueName, consumeOptions.ConsumerName, consumeOptions.ConsumerAutoAck, consumeOptions.ConsumerExclusive, diff --git a/consume_options.go b/consume_options.go index 0a66857..fc9e36a 100644 --- a/consume_options.go +++ b/consume_options.go @@ -1,17 +1,9 @@ package rabbitmq -// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided -func getDefaultConsumeOptions() ConsumeOptions { +// getDefaultConsumeOptions describes the options that will be used when a value isn't provided +func getDefaultConsumeOptions(queue string) ConsumeOptions { return ConsumeOptions{ - QueueDurable: false, - QueueAutoDelete: false, - QueueExclusive: false, - QueueNoWait: false, - QueueDeclare: true, - QueueArgs: nil, - BindingExchange: nil, - BindingNoWait: false, - BindingArgs: nil, + QueueName: queue, Concurrency: 1, QOSPrefetch: 0, QOSGlobal: false, @@ -26,15 +18,8 @@ func getDefaultConsumeOptions() ConsumeOptions { // ConsumeOptions are used to describe how a new consumer will be created. type ConsumeOptions struct { - QueueDurable bool - QueueAutoDelete bool - QueueExclusive bool - QueueNoWait bool - QueueDeclare bool - QueueArgs Table - BindingExchange *BindingExchangeOptions - BindingNoWait bool - BindingArgs Table + DeclareOptions + QueueName string Concurrency int QOSPrefetch int QOSGlobal bool @@ -46,135 +31,24 @@ type ConsumeOptions struct { ConsumerArgs Table } -// getBindingExchangeOptionsOrSetDefault returns pointer to current BindingExchange options. if no BindingExchange options are set yet, it will set it with default values. -func getBindingExchangeOptionsOrSetDefault(options *ConsumeOptions) *BindingExchangeOptions { - if options.BindingExchange == nil { - options.BindingExchange = &BindingExchangeOptions{ - Name: "", - Kind: "direct", - Durable: false, - AutoDelete: false, - Internal: false, - NoWait: false, - ExchangeArgs: nil, - Declare: true, - } - } - return options.BindingExchange -} - -// BindingExchangeOptions are used when binding to an exchange. -// it will verify the exchange is created before binding to it. -type BindingExchangeOptions struct { - Name string - Kind string - Durable bool - AutoDelete bool - Internal bool - NoWait bool - ExchangeArgs Table - Declare bool -} - -// WithConsumeOptionsQueueDurable sets the queue to durable, which means it won't -// be destroyed when the server restarts. It must only be bound to durable exchanges -func WithConsumeOptionsQueueDurable(options *ConsumeOptions) { - options.QueueDurable = true -} - -// WithConsumeOptionsQueueAutoDelete sets the queue to auto delete, which means it will -// be deleted when there are no more conusmers on it -func WithConsumeOptionsQueueAutoDelete(options *ConsumeOptions) { - options.QueueAutoDelete = true -} - -// WithConsumeOptionsQueueExclusive sets the queue to exclusive, which means -// it's are only accessible by the connection that declares it and -// will be deleted when the connection closes. Channels on other connections -// will receive an error when attempting to declare, bind, consume, purge or -// delete a queue with the same name. -func WithConsumeOptionsQueueExclusive(options *ConsumeOptions) { - options.QueueExclusive = true -} - -// WithConsumeOptionsQueueNoWait sets the queue to nowait, which means -// the queue will assume to be declared on the server. A -// channel exception will arrive if the conditions are met for existing queues -// or attempting to modify an existing queue from a different connection. -func WithConsumeOptionsQueueNoWait(options *ConsumeOptions) { - options.QueueNoWait = true -} - -// WithConsumeOptionsQueueNoDeclare sets the queue to no declare, which means -// the queue will be assumed to be declared on the server, and won't be -// declared at all. -func WithConsumeOptionsQueueNoDeclare(options *ConsumeOptions) { - options.QueueDeclare = false -} - -// WithConsumeOptionsQuorum sets the queue a quorum type, which means multiple nodes -// in the cluster will have the messages distributed amongst them for higher reliability -func WithConsumeOptionsQuorum(options *ConsumeOptions) { - if options.QueueArgs == nil { - options.QueueArgs = Table{} - } - options.QueueArgs["x-queue-type"] = "quorum" -} - -// WithConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to -func WithConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions) { +// WithConsumeDeclareOptions allows to set declare options that can be used to set up queue, exchange or bindings +// before the consumer process starts. +func WithConsumeDeclareOptions(declareOptionsFuncs ...func(options *DeclareOptions)) func(*ConsumeOptions) { return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Name = name - } -} - -// WithConsumeOptionsBindingExchangeKind returns a function that sets the binding exchange kind/type -func WithConsumeOptionsBindingExchangeKind(kind string) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Kind = kind - } -} - -// WithConsumeOptionsBindingExchangeDurable returns a function that sets the binding exchange durable flag -func WithConsumeOptionsBindingExchangeDurable(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Durable = true -} - -// WithConsumeOptionsBindingExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag -func WithConsumeOptionsBindingExchangeAutoDelete(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).AutoDelete = true -} - -// WithConsumeOptionsBindingExchangeInternal returns a function that sets the binding exchange internal flag -func WithConsumeOptionsBindingExchangeInternal(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Internal = true -} - -// WithConsumeOptionsBindingExchangeNoWait returns a function that sets the binding exchange noWait flag -func WithConsumeOptionsBindingExchangeNoWait(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).NoWait = true -} + for _, declareOption := range declareOptionsFuncs { + // If a queue was set to declare, ensure that the queue name is set. + if options.DeclareOptions.Queue != nil { + if options.DeclareOptions.Queue.Name == "" { + options.DeclareOptions.Queue.Name = options.QueueName + } + } + + declareOption(&options.DeclareOptions) + } -// WithConsumeOptionsBindingExchangeArgs returns a function that sets the binding exchange arguments that are specific to the server's implementation of the exchange -func WithConsumeOptionsBindingExchangeArgs(args Table) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).ExchangeArgs = args } } -// WithConsumeOptionsBindingExchangeSkipDeclare returns a function that skips the declaration of the -// binding exchange. Use this setting if the exchange already exists and you don't need to declare -// it on consumer start. -func WithConsumeOptionsBindingExchangeSkipDeclare(options *ConsumeOptions) { - getBindingExchangeOptionsOrSetDefault(options).Declare = false -} - -// WithConsumeOptionsBindingNoWait sets the bindings to nowait, which means if the queue can not be bound -// the channel will not be closed with an error. -func WithConsumeOptionsBindingNoWait(options *ConsumeOptions) { - options.BindingNoWait = true -} - // WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that // many goroutines will be spawned to run the provided handler on messages func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) { @@ -230,10 +104,3 @@ func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) { func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) { options.ConsumerNoWait = true } - -// WithConsumeOptionsQueueArgs returns a function that sets the queue arguments -func WithConsumeOptionsQueueArgs(args Table) func(*ConsumeOptions) { - return func(options *ConsumeOptions) { - options.QueueArgs = args - } -} diff --git a/declare.go b/declare.go new file mode 100644 index 0000000..35f2621 --- /dev/null +++ b/declare.go @@ -0,0 +1,373 @@ +package rabbitmq + +import "fmt" + +// DeclareOptions are used to describe how a new queues, exchanges the routing setup should look like. +type DeclareOptions struct { + Queue *QueueOptions + Exchange *ExchangeOptions + Bindings []Binding +} + +// QueueOptions are used to configure a queue. +// If the Passive flag is set the client will only check if the queue exists on the server +// and that the settings match, no creation attempt will be made. +type QueueOptions struct { + Name string + Durable bool + AutoDelete bool + Exclusive bool + NoWait bool + Passive bool // if false, a missing queue will be created on the server + Args Table +} + +// ExchangeOptions are used to configure an exchange. +// If the Passive flag is set the client will only check if the exchange exists on the server +// and that the settings match, no creation attempt will be made. +type ExchangeOptions struct { + Name string + Kind string // possible values: empty string for default exchange or direct, topic, fanout + Durable bool + AutoDelete bool + Internal bool + NoWait bool + Passive bool // if false, a missing exchange will be created on the server + Args Table +} + +// BindingOption are used to configure a queue bindings. +type BindingOption struct { + NoWait bool + Args Table +} + +// Binding describes a queue binding to a specific exchange. +type Binding struct { + BindingOption + QueueName string + ExchangeName string + RoutingKey string +} + +// SetBindings trys to generate bindings for the given routing keys and the queue and exchange options. +// If either Queue or Exchange properties are empty or no queue name is specified, no bindings will be set. +func (o *DeclareOptions) SetBindings(routingKeys []string, opt BindingOption) { + if o.Queue == nil || o.Exchange == nil { + return // nothing to set... + } + + if o.Queue.Name == "" { + return // nothing to set... + } + + for _, routingKey := range routingKeys { + o.Bindings = append(o.Bindings, Binding{ + QueueName: o.Queue.Name, + ExchangeName: o.Exchange.Name, + RoutingKey: routingKey, + BindingOption: opt, + }) + } +} + +// handleDeclare handles the queue, exchange and binding declare process on the server. +// If there are no options set, no actions will be executed. +func handleDeclare(chManager *channelManager, options DeclareOptions) error { + chManager.channelMux.RLock() + defer chManager.channelMux.RUnlock() + + // bind queue + if options.Queue != nil { + queue := options.Queue + if queue.Name == "" { + return fmt.Errorf("missing queue name") + } + if queue.Passive { + _, err := chManager.channel.QueueDeclarePassive( + queue.Name, + queue.Durable, + queue.AutoDelete, + queue.Exclusive, + queue.NoWait, + tableToAMQPTable(queue.Args), + ) + if err != nil { + return err + } + } else { + _, err := chManager.channel.QueueDeclare( + queue.Name, + queue.Durable, + queue.AutoDelete, + queue.Exclusive, + queue.NoWait, + tableToAMQPTable(queue.Args), + ) + if err != nil { + return err + } + } + } + + // bind exchange + if options.Exchange != nil { + exchange := options.Exchange + if exchange.Name == "" { + return fmt.Errorf("missing exchange name") + } + if exchange.Passive { + err := chManager.channel.ExchangeDeclarePassive( + exchange.Name, + exchange.Kind, + exchange.Durable, + exchange.AutoDelete, + exchange.Internal, + exchange.NoWait, + tableToAMQPTable(exchange.Args), + ) + if err != nil { + return err + } + } else { + err := chManager.channel.ExchangeDeclare( + exchange.Name, + exchange.Kind, + exchange.Durable, + exchange.AutoDelete, + exchange.Internal, + exchange.NoWait, + tableToAMQPTable(exchange.Args), + ) + if err != nil { + return err + } + } + } + + // handle binding of queues to exchange + for _, binding := range options.Bindings { + err := chManager.channel.QueueBind( + binding.QueueName, // name of the queue + binding.RoutingKey, // bindingKey + binding.ExchangeName, // sourceExchange + binding.NoWait, // noWait + tableToAMQPTable(binding.Args), // arguments + ) + if err != nil { + return err + } + } + + return nil +} + +// getExchangeOptionsOrSetDefault returns pointer to current ExchangeOptions options. +// If no exchange options are set yet, new options with default values will be defined. +func getExchangeOptionsOrSetDefault(options *DeclareOptions) *ExchangeOptions { + if options.Exchange == nil { + options.Exchange = &ExchangeOptions{ + Name: "", + Kind: "direct", + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + Args: nil, + Passive: false, + } + } + return options.Exchange +} + +// getQueueOptionsOrSetDefault returns pointer to current QueueOptions options. +// If no queue options are set yet, new options with default values will be defined. +func getQueueOptionsOrSetDefault(options *DeclareOptions) *QueueOptions { + if options.Queue == nil { + options.Queue = &QueueOptions{ + Name: "", + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Passive: false, + Args: nil, + } + } + return options.Queue +} + +// region general-options + +// WithDeclareQueue sets the queue that should be declared prior to other RabbitMQ actions are being executed. +// Only the settings will be validated if the queue already exists on the server. +// Matching settings will result in no action, different settings will result in an error. +// If the 'Passive' property is set to false, a missing queue will be created on the server. +func WithDeclareQueue(settings *QueueOptions) func(*DeclareOptions) { + return func(options *DeclareOptions) { + options.Queue = settings + } +} + +// WithDeclareExchange sets the exchange that should be declared prior to other RabbitMQ actions are being executed. +// Only the settings will be validated if the exchange already exists on the server. +// Matching settings will result in no action, different settings will result in an error. +// If the 'Passive' property is set to false, a missing exchange will be created on the server. +func WithDeclareExchange(settings *ExchangeOptions) func(*DeclareOptions) { + return func(options *DeclareOptions) { + options.Exchange = settings + } +} + +// WithDeclareBindings sets the bindings that should be declared prior to other RabbitMQ actions are being executed. +// Only the settings will be validated if one of the bindings already exists on the server. +// Matching settings will result in no action, different settings will result in an error. +// If the 'Passive' property is set to false, missing bindings will be created on the server. +func WithDeclareBindings(bindings []Binding) func(*DeclareOptions) { + return func(options *DeclareOptions) { + options.Bindings = bindings + } +} + +// WithDeclareBindingsForRoutingKeys sets the bindings that should be declared prior to other RabbitMQ +// actions are being executed. +// This function must be called after the queue and exchange declaration settings have been set, +// otherwise this function has no effect. +func WithDeclareBindingsForRoutingKeys(routingKeys []string) func(*DeclareOptions) { + return func(options *DeclareOptions) { + options.SetBindings(routingKeys, BindingOption{}) + } +} + +// endregion general-options + +// region single-options + +// WithDeclareQueueName returns a function that sets the queue name. +func WithDeclareQueueName(name string) func(*DeclareOptions) { + return func(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).Name = name + } +} + +// WithDeclareQueueDurable sets the queue to durable, which means it won't +// be destroyed when the server restarts. It must only be bound to durable exchanges. +func WithDeclareQueueDurable(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).Durable = true +} + +// WithDeclareQueueAutoDelete sets the queue to auto delete, which means it will +// be deleted when there are no more consumers on it. +func WithDeclareQueueAutoDelete(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).AutoDelete = true +} + +// WithDeclareQueueExclusive sets the queue to exclusive, which means +// it's are only accessible by the connection that declares it and +// will be deleted when the connection closes. Channels on other connections +// will receive an error when attempting to declare, bind, consume, purge or +// delete a queue with the same name. +func WithDeclareQueueExclusive(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).Exclusive = true +} + +// WithDeclareQueueNoWait sets the queue to nowait, which means +// the queue will assume to be declared on the server. A channel +// exception will arrive if the conditions are met for existing queues +// or attempting to modify an existing queue from a different connection. +func WithDeclareQueueNoWait(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).NoWait = true +} + +// WithDeclareQueueNoDeclare sets the queue to no declare, which means +// the queue will be assumed to be declared on the server, and thus only will be validated. +func WithDeclareQueueNoDeclare(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).Passive = true +} + +// WithDeclareQueueArgs returns a function that sets the queue arguments. +func WithDeclareQueueArgs(args Table) func(*DeclareOptions) { + return func(options *DeclareOptions) { + getQueueOptionsOrSetDefault(options).Args = args + } +} + +// WithDeclareQueueQuorum sets the queue a quorum type, which means multiple nodes +// in the cluster will have the messages distributed amongst them for higher reliability. +func WithDeclareQueueQuorum(options *DeclareOptions) { + queue := getQueueOptionsOrSetDefault(options) + if queue.Args == nil { + queue.Args = Table{} + } + queue.Args["x-queue-type"] = "quorum" +} + +// WithDeclareExchangeName returns a function that sets the exchange name. +func WithDeclareExchangeName(name string) func(*DeclareOptions) { + return func(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).Name = name + } +} + +// WithDeclareExchangeKind returns a function that sets the binding exchange kind/type. +func WithDeclareExchangeKind(kind string) func(*DeclareOptions) { + return func(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).Kind = kind + } +} + +// WithDeclareExchangeDurable returns a function that sets the binding exchange durable flag. +func WithDeclareExchangeDurable(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).Durable = true +} + +// WithDeclareExchangeAutoDelete returns a function that sets the binding exchange autoDelete flag. +func WithDeclareExchangeAutoDelete(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).AutoDelete = true +} + +// WithDeclareExchangeInternal returns a function that sets the binding exchange internal flag. +func WithDeclareExchangeInternal(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).Internal = true +} + +// WithDeclareExchangeNoWait returns a function that sets the binding exchange noWait flag. +func WithDeclareExchangeNoWait(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).NoWait = true +} + +// WithDeclareExchangeArgs returns a function that sets the binding exchange arguments +// that are specific to the server's implementation of the exchange. +func WithDeclareExchangeArgs(args Table) func(*DeclareOptions) { + return func(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).Args = args + } +} + +// WithDeclareExchangeNoDeclare returns a function that skips the declaration of the +// binding exchange. Use this setting if the exchange already exists and you don't need to declare +// it on consumer start. +func WithDeclareExchangeNoDeclare(options *DeclareOptions) { + getExchangeOptionsOrSetDefault(options).Passive = true +} + +// WithDeclareBindingNoWait sets the bindings to nowait, which means if the queue can not be bound +// the channel will not be closed with an error. +// This function must be called after bindings have been defined, otherwise it has no effect. +func WithDeclareBindingNoWait(options *ConsumeOptions) { + for i := range options.Bindings { + options.Bindings[i].NoWait = true + } +} + +// WithDeclareBindingArgs sets the arguments of the bindings to args. +// This function must be called after bindings have been defined, otherwise it has no effect. +func WithDeclareBindingArgs(args Table) func(*DeclareOptions) { + return func(options *DeclareOptions) { + for i := range options.Bindings { + options.Bindings[i].Args = args + } + } +} + +// endregion single-options diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 21d3dad..4b416f8 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -34,13 +34,7 @@ func main() { return rabbitmq.Ack }, "my_queue", - []string{"routing_key", "routing_key_2"}, rabbitmq.WithConsumeOptionsConcurrency(10), - rabbitmq.WithConsumeOptionsQueueDurable, - rabbitmq.WithConsumeOptionsQuorum, - rabbitmq.WithConsumeOptionsBindingExchangeName("events"), - rabbitmq.WithConsumeOptionsBindingExchangeKind("topic"), - rabbitmq.WithConsumeOptionsBindingExchangeDurable, rabbitmq.WithConsumeOptionsConsumerName(consumerName), ) if err != nil { diff --git a/examples/consumer_with_declare/.gitignore b/examples/consumer_with_declare/.gitignore new file mode 100644 index 0000000..e8842b8 --- /dev/null +++ b/examples/consumer_with_declare/.gitignore @@ -0,0 +1 @@ +consumer_with_declare diff --git a/examples/consumer_with_declare/main.go b/examples/consumer_with_declare/main.go new file mode 100644 index 0000000..02540db --- /dev/null +++ b/examples/consumer_with_declare/main.go @@ -0,0 +1,74 @@ +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "syscall" + + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +var consumerName = "example_with_declare" + +func main() { + consumer, err := rabbitmq.NewConsumer( + "amqp://guest:guest@localhost", rabbitmq.Config{}, + rabbitmq.WithConsumerOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer func() { + err := consumer.Close() + if err != nil { + log.Fatal(err) + } + }() + + err = consumer.StartConsuming( + func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }, + "my_queue", + rabbitmq.WithConsumeDeclareOptions( + rabbitmq.WithDeclareQueueDurable, + rabbitmq.WithDeclareQueueQuorum, + rabbitmq.WithDeclareExchangeName("events"), + rabbitmq.WithDeclareExchangeKind("topic"), + rabbitmq.WithDeclareExchangeDurable, + rabbitmq.WithDeclareBindingsForRoutingKeys([]string{"routing_key", "routing_key_2"}), // implicit bindings + rabbitmq.WithDeclareBindings([]rabbitmq.Binding{ // custom bindings + { + QueueName: "my_queue", + ExchangeName: "events", + RoutingKey: "a_custom_key", + }, + }), + ), + rabbitmq.WithConsumeOptionsConsumerName(consumerName), + ) + if err != nil { + log.Fatal(err) + } + + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() + + fmt.Println("awaiting signal") + <-done + fmt.Println("stopping consumer") +}