Browse Source

Доработал биндинги

pull/127/head
WiRight 3 years ago
parent
commit
8d6dbfa0bb
No known key found for this signature in database GPG Key ID: 427DBE0B77ED3FBD
3 changed files with 42 additions and 10 deletions
  1. +18
    -10
      declare.go
  2. +9
    -0
      declate_options.go
  3. +15
    -0
      internal/channelmanager/safe_wraps.go

+ 18
- 10
declare.go View File

@ -2,7 +2,6 @@ package rabbitmq
import (
"errors"
"fmt"
"github.com/DizoftTeam/go-rabbitmq/internal/channelmanager"
)
@ -28,17 +27,23 @@ func NewDeclarator(conn *Conn) (*Declarator, error) {
return result, nil
}
func (d *Declarator) Declare(queue string, optionFuncs ...func(*ConsumerOptions)) error {
defaultOptions := getDefaultConsumerOptions(queue)
options := &defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
}
func (d *Declarator) Close() {
d.chanManager.Close()
}
err := declareBindings(d.chanManager, *options)
func (d *Declarator) Declare(bindings []ExchangeBinding) error {
for _, binding := range bindings {
err := d.chanManager.ExchangeBindSafe(
binding.From,
binding.To,
binding.RoutingKey,
binding.NoWait,
tableToAMQPTable(binding.Args),
)
if err != nil {
return fmt.Errorf("declare bindings failed: %w", err)
if err != nil {
return err
}
}
return nil
@ -115,6 +120,7 @@ func declareBindings(chanManager *channelmanager.ChannelManager, options Consume
if !binding.Declare {
continue
}
err := chanManager.QueueBindSafe(
options.QueueOptions.Name,
binding.RoutingKey,
@ -122,9 +128,11 @@ func declareBindings(chanManager *channelmanager.ChannelManager, options Consume
binding.NoWait,
tableToAMQPTable(binding.Args),
)
if err != nil {
return err
}
}
return nil
}

+ 9
- 0
declate_options.go View File

@ -0,0 +1,9 @@
package rabbitmq
type ExchangeBinding struct {
From string
To string
RoutingKey string
Args Table
NoWait bool
}

+ 15
- 0
internal/channelmanager/safe_wraps.go View File

@ -105,6 +105,21 @@ func (chanManager *ChannelManager) ExchangeDeclareSafe(
)
}
func (chanManager *ChannelManager) ExchangeBindSafe(
from string, key string, to string, noWait bool, args amqp.Table,
) error {
chanManager.channelMux.RLock()
defer chanManager.channelMux.RUnlock()
return chanManager.channel.ExchangeBind(
to,
key,
from,
noWait,
args,
)
}
// QueueBindSafe safely wraps the (*amqp.Channel).QueueBind method
func (chanManager *ChannelManager) QueueBindSafe(
name string, key string, exchange string, noWait bool, args amqp.Table,


Loading…
Cancel
Save