Browse Source

Merge 4a0be05eac into 0958e3a881

pull/198/merge
tucura 5 months ago
committed by GitHub
parent
commit
16e5d998a4
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
4 changed files with 61 additions and 14 deletions
  1. +19
    -9
      internal/connectionmanager/connection_manager.go
  2. +38
    -5
      internal/connectionmanager/safe_wraps.go
  3. +3
    -0
      publish.go
  4. +1
    -0
      publish_flow_block.go

+ 19
- 9
internal/connectionmanager/connection_manager.go View File

@ -23,6 +23,13 @@ type ConnectionManager struct {
reconnectionCount uint
reconnectionCountMu *sync.Mutex
dispatcher *dispatcher.Dispatcher
// universalNotifyBlockingReceiver receives block signal from underlying
// connection which are broadcasted to all publisherNotifyBlockingReceivers
universalNotifyBlockingReceiver chan amqp.Blocking
universalNotifyBlockingReceiverUsed bool
publisherNotifyBlockingReceiversMu *sync.RWMutex
publisherNotifyBlockingReceivers []chan amqp.Blocking
}
type Resolver interface {
@ -62,17 +69,20 @@ func NewConnectionManager(resolver Resolver, conf amqp.Config, log logger.Logger
}
connManager := ConnectionManager{
logger: log,
resolver: resolver,
connection: conn,
amqpConfig: conf,
connectionMu: &sync.RWMutex{},
ReconnectInterval: reconnectInterval,
reconnectionCount: 0,
reconnectionCountMu: &sync.Mutex{},
dispatcher: dispatcher.NewDispatcher(),
logger: log,
resolver: resolver,
connection: conn,
amqpConfig: conf,
connectionMu: &sync.RWMutex{},
ReconnectInterval: reconnectInterval,
reconnectionCount: 0,
reconnectionCountMu: &sync.Mutex{},
dispatcher: dispatcher.NewDispatcher(),
universalNotifyBlockingReceiver: make(chan amqp.Blocking),
publisherNotifyBlockingReceiversMu: &sync.RWMutex{},
}
go connManager.startNotifyClose()
go connManager.readUniversalBlockReceiver()
return &connManager, nil
}


+ 38
- 5
internal/connectionmanager/safe_wraps.go View File

@ -8,10 +8,43 @@ import (
func (connManager *ConnectionManager) NotifyBlockedSafe(
receiver chan amqp.Blocking,
) chan amqp.Blocking {
connManager.connectionMu.RLock()
defer connManager.connectionMu.RUnlock()
connManager.connectionMu.Lock()
defer connManager.connectionMu.Unlock()
return connManager.connection.NotifyBlocked(
receiver,
)
// add receiver to connection manager.
connManager.publisherNotifyBlockingReceiversMu.Lock()
connManager.publisherNotifyBlockingReceivers = append(connManager.publisherNotifyBlockingReceivers, receiver)
connManager.publisherNotifyBlockingReceiversMu.Unlock()
if !connManager.universalNotifyBlockingReceiverUsed {
connManager.connection.NotifyBlocked(
connManager.universalNotifyBlockingReceiver,
)
connManager.universalNotifyBlockingReceiverUsed = true
}
return receiver
}
// readUniversalBlockReceiver reads on universal blocking receiver and broadcasts event to all blocking receivers of
// connection manager.
func (connManager *ConnectionManager) readUniversalBlockReceiver() {
for b := range connManager.universalNotifyBlockingReceiver {
connManager.publisherNotifyBlockingReceiversMu.RLock()
for _, br := range connManager.publisherNotifyBlockingReceivers {
br <- b
}
connManager.publisherNotifyBlockingReceiversMu.RUnlock()
}
}
func (connManager *ConnectionManager) RemovePublisherBlockingReceiver(receiver chan amqp.Blocking) {
connManager.publisherNotifyBlockingReceiversMu.Lock()
for i, br := range connManager.publisherNotifyBlockingReceivers {
if br == receiver {
connManager.publisherNotifyBlockingReceivers = append(connManager.publisherNotifyBlockingReceivers[:i], connManager.publisherNotifyBlockingReceivers[i+1:]...)
}
}
connManager.publisherNotifyBlockingReceiversMu.Unlock()
close(receiver)
}

+ 3
- 0
publish.go View File

@ -58,6 +58,8 @@ type Publisher struct {
notifyPublishHandler func(p Confirmation)
options PublisherOptions
blockings chan amqp.Blocking
}
type PublisherConfirmation []*amqp.DeferredConfirmation
@ -286,6 +288,7 @@ func (publisher *Publisher) Close() {
publisher.options.Logger.Warnf("error while closing the channel: %v", err)
}
publisher.options.Logger.Infof("closing publisher...")
publisher.connManager.RemovePublisherBlockingReceiver(publisher.blockings)
go func() {
publisher.closeConnectionToManagerCh <- struct{}{}
}()


+ 1
- 0
publish_flow_block.go View File

@ -26,6 +26,7 @@ func (publisher *Publisher) startNotifyFlowHandler() {
func (publisher *Publisher) startNotifyBlockedHandler() {
blockings := publisher.connManager.NotifyBlockedSafe(make(chan amqp.Blocking))
publisher.disablePublishDueToBlockedMu.Lock()
publisher.blockings = blockings
publisher.disablePublishDueToBlocked = false
publisher.disablePublishDueToBlockedMu.Unlock()


Loading…
Cancel
Save