Browse Source

Add cluster support via Resolver

Signed-off-by: Ruslan Bayandinov <wazsone@ya.ru>
pull/163/head
Ruslan Bayandinov 2 years ago
parent
commit
5802dd2286
No known key found for this signature in database GPG Key ID: CE79E74DC40CCD24
3 changed files with 93 additions and 12 deletions
  1. +34
    -6
      connection.go
  2. +25
    -0
      examples/cluster/main.go
  3. +34
    -6
      internal/connectionmanager/connection_manager.go

+ 34
- 6
connection.go View File

@ -1,6 +1,8 @@
package rabbitmq
import (
"math/rand"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager"
)
@ -21,19 +23,46 @@ type Conn struct {
// will be stored in the returned connection's Config field.
type Config amqp.Config
type Resolver = connectionmanager.Resolver
type StaticResolver struct {
urls []string
shuffe bool
}
func (r *StaticResolver) Resolve() ([]string, error) {
// TODO: move to slices.Clone when supported Go versions > 1.21
var urls []string
urls = append(urls, r.urls...)
if r.shuffe {
rand.Shuffle(len(urls), func(i, j int) {
urls[i], urls[j] = urls[j], urls[i]
})
}
return urls, nil
}
func NewStaticResolver(urls []string, shuffle bool) *StaticResolver {
return &StaticResolver{urls: urls}
}
// NewConn creates a new connection manager
func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) {
func NewConn(url string, opts ...func(*ConnectionOptions)) (*Conn, error) {
return NewClusterConn(NewStaticResolver([]string{url}, false), opts...)
}
func NewClusterConn(resolver Resolver, opts ...func(*ConnectionOptions)) (*Conn, error) {
defaultOptions := getDefaultConnectionOptions()
options := &defaultOptions
for _, optionFunc := range optionFuncs {
optionFunc(options)
for _, optFn := range opts {
optFn(options)
}
manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.Logger, options.ReconnectInterval)
manager, err := connectionmanager.NewConnectionManager(resolver, amqp.Config(options.Config), options.Logger, options.ReconnectInterval)
if err != nil {
return nil, err
}
reconnectErrCh, closeCh := manager.NotifyReconnect()
conn := &Conn{
connectionManager: manager,
@ -41,7 +70,6 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error)
closeConnectionToManagerCh: closeCh,
options: *options,
}
go conn.handleRestarts()
return conn, nil
}


+ 25
- 0
examples/cluster/main.go View File

@ -0,0 +1,25 @@
package main
import (
"log"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
func main() {
resolver := rabbitmq.NewStaticResolver(
[]string{
"amqp://guest:guest@host1",
"amqp://guest:guest@host2",
"amqp://guest:guest@host3",
},
false, /* shuffle */
)
conn, err := rabbitmq.NewClusterConn(resolver)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
}

+ 34
- 6
internal/connectionmanager/connection_manager.go View File

@ -1,6 +1,8 @@
package connectionmanager
import (
"errors"
"fmt"
"sync"
"time"
@ -12,7 +14,7 @@ import (
// ConnectionManager -
type ConnectionManager struct {
logger logger.Logger
url string
resolver Resolver
connection *amqp.Connection
amqpConfig amqp.Config
connectionMux *sync.RWMutex
@ -22,15 +24,40 @@ type ConnectionManager struct {
dispatcher *dispatcher.Dispatcher
}
type Resolver interface {
Resolve() ([]string, error)
}
// dial will attempt to connect to the a list of urls in the order they are
// given.
func dial(log logger.Logger, resolver Resolver, conf amqp.Config) (*amqp.Connection, error) {
urls, err := resolver.Resolve()
if err != nil {
return nil, fmt.Errorf("error resolving amqp server urls: %w", err)
}
var errs []error
for _, url := range urls {
conn, err := amqp.DialConfig(url, amqp.Config(conf))
if err == nil {
return conn, err
}
log.Warnf("failed to connect to amqp server %s: %v", url, err)
errs = append(errs, err)
}
return nil, errors.Join(errs...)
}
// NewConnectionManager creates a new connection manager
func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) {
conn, err := amqp.DialConfig(url, amqp.Config(conf))
func NewConnectionManager(resolver Resolver, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) {
conn, err := dial(log, resolver, amqp.Config(conf))
if err != nil {
return nil, err
}
connManager := ConnectionManager{
logger: log,
url: url,
resolver: resolver,
connection: conn,
amqpConfig: conf,
connectionMux: &sync.RWMutex{},
@ -125,7 +152,8 @@ func (connManager *ConnectionManager) reconnectLoop() {
func (connManager *ConnectionManager) reconnect() error {
connManager.connectionMux.Lock()
defer connManager.connectionMux.Unlock()
newConn, err := amqp.DialConfig(connManager.url, amqp.Config(connManager.amqpConfig))
conn, err := dial(connManager.logger, connManager.resolver, amqp.Config(connManager.amqpConfig))
if err != nil {
return err
}
@ -134,6 +162,6 @@ func (connManager *ConnectionManager) reconnect() error {
connManager.logger.Warnf("error closing connection while reconnecting: %v", err)
}
connManager.connection = newConn
connManager.connection = conn
return nil
}

Loading…
Cancel
Save