package rabbitmq import ( "math/rand" amqp "github.com/rabbitmq/amqp091-go" "github.com/wagslane/go-rabbitmq/internal/connectionmanager" ) // Conn manages the connection to a rabbit cluster // it is intended to be shared across publishers and consumers type Conn struct { connectionManager *connectionmanager.ConnectionManager reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} options ConnectionOptions } // Config wraps amqp.Config // Config is used in DialConfig and Open to specify the desired tuning // parameters used during a connection open handshake. The negotiated tuning // will be stored in the returned connection's Config field. type Config amqp.Config type Resolver = connectionmanager.Resolver type StaticResolver struct { urls []string shuffle bool } func (r *StaticResolver) Resolve() ([]string, error) { // TODO: move to slices.Clone when supported Go versions > 1.21 var urls []string urls = append(urls, r.urls...) if r.shuffle { rand.Shuffle(len(urls), func(i, j int) { urls[i], urls[j] = urls[j], urls[i] }) } return urls, nil } func NewStaticResolver(urls []string, shuffle bool) *StaticResolver { return &StaticResolver{urls: urls, shuffle: shuffle} } // NewConn creates a new connection manager func NewConn(url string, opts ...func(*ConnectionOptions)) (*Conn, error) { return NewClusterConn(NewStaticResolver([]string{url}, false), opts...) } func NewClusterConn(resolver Resolver, opts ...func(*ConnectionOptions)) (*Conn, error) { defaultOptions := getDefaultConnectionOptions() options := &defaultOptions for _, optFn := range opts { optFn(options) } manager, err := connectionmanager.NewConnectionManager(resolver, amqp.Config(options.Config), options.Logger, options.ReconnectInterval) if err != nil { return nil, err } reconnectErrCh, closeCh := manager.NotifyReconnect() conn := &Conn{ connectionManager: manager, reconnectErrCh: reconnectErrCh, closeConnectionToManagerCh: closeCh, options: *options, } go conn.handleRestarts() return conn, nil } func (conn *Conn) handleRestarts() { for err := range conn.reconnectErrCh { conn.options.Logger.Infof("successful connection recovery from: %v", err) } } // Close closes the connection, it's not safe for re-use. // You should also close any consumers and publishers before // closing the connection func (conn *Conn) Close() error { conn.closeConnectionToManagerCh <- struct{}{} return conn.connectionManager.Close() } // IsClosed returns whether the connection is closed or not func (conn *Conn) IsClosed() bool { return conn.connectionManager.IsClosed() }