diff --git a/consume.go b/consume.go index 3e11b45..a505953 100644 --- a/consume.go +++ b/consume.go @@ -62,7 +62,7 @@ func NewConsumer( return nil, errors.New("connection manager can't be nil") } - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, false, options.Logger, conn.connectionManager.ReconnectInterval) if err != nil { return nil, err } diff --git a/internal/channelmanager/channel_manager.go b/internal/channelmanager/channel_manager.go index f07ab9b..5e099c8 100644 --- a/internal/channelmanager/channel_manager.go +++ b/internal/channelmanager/channel_manager.go @@ -21,10 +21,11 @@ type ChannelManager struct { reconnectionCount uint reconnectionCountMu *sync.Mutex dispatcher *dispatcher.Dispatcher + inConfirmMode bool } // NewChannelManager creates a new connection manager -func NewChannelManager(connManager *connectionmanager.ConnectionManager, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) { +func NewChannelManager(connManager *connectionmanager.ConnectionManager, confirmMode bool, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) { ch, err := getNewChannel(connManager) if err != nil { return nil, err @@ -39,6 +40,7 @@ func NewChannelManager(connManager *connectionmanager.ConnectionManager, log log reconnectionCount: 0, reconnectionCountMu: &sync.Mutex{}, dispatcher: dispatcher.NewDispatcher(), + inConfirmMode: confirmMode, } go chanManager.startNotifyCancelOrClosed() return &chanManager, nil @@ -119,6 +121,12 @@ func (chanManager *ChannelManager) reconnect() error { if err != nil { return err } + // chaneel creating and setting confirm mode should be in the same mutex Lock interval + if chanManager.inConfirmMode { + if err = newChannel.Confirm(false); err != nil { + return err + } + } if err = chanManager.channel.Close(); err != nil { chanManager.logger.Warnf("error closing channel while reconnecting: %v", err) diff --git a/publish.go b/publish.go index 06f9cb0..33ab7d7 100644 --- a/publish.go +++ b/publish.go @@ -78,7 +78,7 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe return nil, errors.New("connection manager can't be nil") } - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.ConfirmMode, options.Logger, conn.connectionManager.ReconnectInterval) if err != nil { return nil, err } @@ -272,6 +272,7 @@ func (publisher *Publisher) PublishWithDeferredConfirmWithContext( } deferredConfirmations = append(deferredConfirmations, conf) } + return deferredConfirmations, nil }