From 8d6dbfa0bbb346e25aaa8dca7d4bce12c06dc7d8 Mon Sep 17 00:00:00 2001 From: WiRight Date: Mon, 24 Apr 2023 15:03:13 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D1=80=D0=B0=D0=B1=D0=BE=D1=82?= =?UTF-8?q?=D0=B0=D0=BB=20=D0=B1=D0=B8=D0=BD=D0=B4=D0=B8=D0=BD=D0=B3=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- declare.go | 28 +++++++++++++++++---------- declate_options.go | 9 +++++++++ internal/channelmanager/safe_wraps.go | 15 ++++++++++++++ 3 files changed, 42 insertions(+), 10 deletions(-) create mode 100644 declate_options.go diff --git a/declare.go b/declare.go index 08ff897..af46c35 100644 --- a/declare.go +++ b/declare.go @@ -2,7 +2,6 @@ package rabbitmq import ( "errors" - "fmt" "github.com/DizoftTeam/go-rabbitmq/internal/channelmanager" ) @@ -28,17 +27,23 @@ func NewDeclarator(conn *Conn) (*Declarator, error) { return result, nil } -func (d *Declarator) Declare(queue string, optionFuncs ...func(*ConsumerOptions)) error { - defaultOptions := getDefaultConsumerOptions(queue) - options := &defaultOptions - for _, optionFunc := range optionFuncs { - optionFunc(options) - } +func (d *Declarator) Close() { + d.chanManager.Close() +} - err := declareBindings(d.chanManager, *options) +func (d *Declarator) Declare(bindings []ExchangeBinding) error { + for _, binding := range bindings { + err := d.chanManager.ExchangeBindSafe( + binding.From, + binding.To, + binding.RoutingKey, + binding.NoWait, + tableToAMQPTable(binding.Args), + ) - if err != nil { - return fmt.Errorf("declare bindings failed: %w", err) + if err != nil { + return err + } } return nil @@ -115,6 +120,7 @@ func declareBindings(chanManager *channelmanager.ChannelManager, options Consume if !binding.Declare { continue } + err := chanManager.QueueBindSafe( options.QueueOptions.Name, binding.RoutingKey, @@ -122,9 +128,11 @@ func declareBindings(chanManager *channelmanager.ChannelManager, options Consume binding.NoWait, tableToAMQPTable(binding.Args), ) + if err != nil { return err } } + return nil } diff --git a/declate_options.go b/declate_options.go new file mode 100644 index 0000000..fc32518 --- /dev/null +++ b/declate_options.go @@ -0,0 +1,9 @@ +package rabbitmq + +type ExchangeBinding struct { + From string + To string + RoutingKey string + Args Table + NoWait bool +} diff --git a/internal/channelmanager/safe_wraps.go b/internal/channelmanager/safe_wraps.go index 0e96b8d..35c2557 100644 --- a/internal/channelmanager/safe_wraps.go +++ b/internal/channelmanager/safe_wraps.go @@ -105,6 +105,21 @@ func (chanManager *ChannelManager) ExchangeDeclareSafe( ) } +func (chanManager *ChannelManager) ExchangeBindSafe( + from string, key string, to string, noWait bool, args amqp.Table, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.ExchangeBind( + to, + key, + from, + noWait, + args, + ) +} + // QueueBindSafe safely wraps the (*amqp.Channel).QueueBind method func (chanManager *ChannelManager) QueueBindSafe( name string, key string, exchange string, noWait bool, args amqp.Table,