Browse Source

fix set channel in Confirm mode not immediately after creating

pull/188/head
Фиалковский Максим Сергеевич 1 year ago
parent
commit
9b72b31815
3 changed files with 12 additions and 3 deletions
  1. +1
    -1
      consume.go
  2. +9
    -1
      internal/channelmanager/channel_manager.go
  3. +2
    -1
      publish.go

+ 1
- 1
consume.go View File

@ -62,7 +62,7 @@ func NewConsumer(
return nil, errors.New("connection manager can't be nil") return nil, errors.New("connection manager can't be nil")
} }
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval)
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, false, options.Logger, conn.connectionManager.ReconnectInterval)
if err != nil { if err != nil {
return nil, err return nil, err
} }


+ 9
- 1
internal/channelmanager/channel_manager.go View File

@ -21,10 +21,11 @@ type ChannelManager struct {
reconnectionCount uint reconnectionCount uint
reconnectionCountMu *sync.Mutex reconnectionCountMu *sync.Mutex
dispatcher *dispatcher.Dispatcher dispatcher *dispatcher.Dispatcher
inConfirmMode bool
} }
// NewChannelManager creates a new connection manager // NewChannelManager creates a new connection manager
func NewChannelManager(connManager *connectionmanager.ConnectionManager, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) {
func NewChannelManager(connManager *connectionmanager.ConnectionManager, confirmMode bool, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) {
ch, err := getNewChannel(connManager) ch, err := getNewChannel(connManager)
if err != nil { if err != nil {
return nil, err return nil, err
@ -39,6 +40,7 @@ func NewChannelManager(connManager *connectionmanager.ConnectionManager, log log
reconnectionCount: 0, reconnectionCount: 0,
reconnectionCountMu: &sync.Mutex{}, reconnectionCountMu: &sync.Mutex{},
dispatcher: dispatcher.NewDispatcher(), dispatcher: dispatcher.NewDispatcher(),
inConfirmMode: confirmMode,
} }
go chanManager.startNotifyCancelOrClosed() go chanManager.startNotifyCancelOrClosed()
return &chanManager, nil return &chanManager, nil
@ -119,6 +121,12 @@ func (chanManager *ChannelManager) reconnect() error {
if err != nil { if err != nil {
return err return err
} }
// chaneel creating and setting confirm mode should be in the same mutex Lock interval
if chanManager.inConfirmMode {
if err = newChannel.Confirm(false); err != nil {
return err
}
}
if err = chanManager.channel.Close(); err != nil { if err = chanManager.channel.Close(); err != nil {
chanManager.logger.Warnf("error closing channel while reconnecting: %v", err) chanManager.logger.Warnf("error closing channel while reconnecting: %v", err)


+ 2
- 1
publish.go View File

@ -78,7 +78,7 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe
return nil, errors.New("connection manager can't be nil") return nil, errors.New("connection manager can't be nil")
} }
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval)
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.ConfirmMode, options.Logger, conn.connectionManager.ReconnectInterval)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -272,6 +272,7 @@ func (publisher *Publisher) PublishWithDeferredConfirmWithContext(
} }
deferredConfirmations = append(deferredConfirmations, conf) deferredConfirmations = append(deferredConfirmations, conf)
} }
return deferredConfirmations, nil return deferredConfirmations, nil
} }


Loading…
Cancel
Save