diff --git a/connection.go b/connection.go index 1f5e80c..727f86c 100644 --- a/connection.go +++ b/connection.go @@ -14,7 +14,7 @@ type Conn struct { reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} - reconnectHooks []func() + reconnectHooks []func(error) looseConnectionCh <-chan error mutex *sync.RWMutex @@ -59,13 +59,11 @@ func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) } func (conn *Conn) handleLooseConnection() { - for { - <-conn.looseConnectionCh - + for err := range conn.looseConnectionCh { conn.mutex.Lock() for _, fhook := range conn.reconnectHooks { - fhook() + fhook(err) } conn.mutex.Unlock() @@ -78,7 +76,7 @@ func (conn *Conn) handleRestarts() { } } -func (conn *Conn) RegisterReconnectHook(hook func()) { +func (conn *Conn) RegisterReconnectHook(hook func(error)) { conn.mutex.Lock() conn.reconnectHooks = append(conn.reconnectHooks, hook) conn.mutex.Unlock() diff --git a/internal/channelmanager/channel_manager.go b/internal/channelmanager/channel_manager.go index b6e7f18..457750b 100644 --- a/internal/channelmanager/channel_manager.go +++ b/internal/channelmanager/channel_manager.go @@ -40,7 +40,9 @@ func NewChannelManager(connManager *connectionmanager.ConnectionManager, log log reconnectionCountMux: &sync.Mutex{}, dispatcher: dispatcher.NewDispatcher(), } + go chanManager.startNotifyCancelOrClosed() + return &chanManager, nil } @@ -52,6 +54,7 @@ func getNewChannel(connManager *connectionmanager.ConnectionManager) (*amqp.Chan if err != nil { return nil, err } + return ch, nil } @@ -71,6 +74,7 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() { chanManager.logger.Warnf("successfully reconnected to amqp server") chanManager.dispatcher.Dispatch(err) } + if err == nil { chanManager.logger.Infof("amqp channel closed gracefully") } diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go index 291ad2f..5d107eb 100644 --- a/internal/connectionmanager/connection_manager.go +++ b/internal/connectionmanager/connection_manager.go @@ -88,6 +88,7 @@ func (connManager *ConnectionManager) startNotifyClose() { connManager.reconnectLoop() connManager.logger.Warnf("successfully reconnected to amqp server") connManager.dispatcher.Dispatch(err) + connManager.dispatcher.DispathLooseConnection(nil) } if err == nil {