From 9b72b3181570ff0d38ac7c8c28d38547b9fcbe07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A4=D0=B8=D0=B0=D0=BB=D0=BA=D0=BE=D0=B2=D1=81=D0=BA?= =?UTF-8?q?=D0=B8=D0=B9=20=D0=9C=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1?= =?UTF-8?q?=D0=B5=D1=80=D0=B3=D0=B5=D0=B5=D0=B2=D0=B8=D1=87?= Date: Fri, 27 Sep 2024 02:13:11 +0300 Subject: [PATCH] fix set channel in Confirm mode not immediately after creating --- consume.go | 2 +- internal/channelmanager/channel_manager.go | 10 +++++++++- publish.go | 3 ++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/consume.go b/consume.go index 3e11b45..a505953 100644 --- a/consume.go +++ b/consume.go @@ -62,7 +62,7 @@ func NewConsumer( return nil, errors.New("connection manager can't be nil") } - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, false, options.Logger, conn.connectionManager.ReconnectInterval) if err != nil { return nil, err } diff --git a/internal/channelmanager/channel_manager.go b/internal/channelmanager/channel_manager.go index f07ab9b..5e099c8 100644 --- a/internal/channelmanager/channel_manager.go +++ b/internal/channelmanager/channel_manager.go @@ -21,10 +21,11 @@ type ChannelManager struct { reconnectionCount uint reconnectionCountMu *sync.Mutex dispatcher *dispatcher.Dispatcher + inConfirmMode bool } // NewChannelManager creates a new connection manager -func NewChannelManager(connManager *connectionmanager.ConnectionManager, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) { +func NewChannelManager(connManager *connectionmanager.ConnectionManager, confirmMode bool, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) { ch, err := getNewChannel(connManager) if err != nil { return nil, err @@ -39,6 +40,7 @@ func NewChannelManager(connManager *connectionmanager.ConnectionManager, log log reconnectionCount: 0, reconnectionCountMu: &sync.Mutex{}, dispatcher: dispatcher.NewDispatcher(), + inConfirmMode: confirmMode, } go chanManager.startNotifyCancelOrClosed() return &chanManager, nil @@ -119,6 +121,12 @@ func (chanManager *ChannelManager) reconnect() error { if err != nil { return err } + // chaneel creating and setting confirm mode should be in the same mutex Lock interval + if chanManager.inConfirmMode { + if err = newChannel.Confirm(false); err != nil { + return err + } + } if err = chanManager.channel.Close(); err != nil { chanManager.logger.Warnf("error closing channel while reconnecting: %v", err) diff --git a/publish.go b/publish.go index 06f9cb0..33ab7d7 100644 --- a/publish.go +++ b/publish.go @@ -78,7 +78,7 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe return nil, errors.New("connection manager can't be nil") } - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.ConfirmMode, options.Logger, conn.connectionManager.ReconnectInterval) if err != nil { return nil, err } @@ -272,6 +272,7 @@ func (publisher *Publisher) PublishWithDeferredConfirmWithContext( } deferredConfirmations = append(deferredConfirmations, conf) } + return deferredConfirmations, nil }