diff --git a/consume.go b/consume.go index ec1b4b0..f1c9829 100644 --- a/consume.go +++ b/consume.go @@ -33,10 +33,10 @@ type Consumer struct { reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} options ConsumerOptions - handlerMux *sync.RWMutex + handlerMu *sync.RWMutex - isClosedMux *sync.RWMutex - isClosed bool + isClosedMu *sync.RWMutex + isClosed bool } // Delivery captures the fields for a previously delivered message resident in @@ -73,7 +73,7 @@ func NewConsumer( reconnectErrCh: reconnectErrCh, closeConnectionToManagerCh: closeCh, options: *options, - isClosedMux: &sync.RWMutex{}, + isClosedMu: &sync.RWMutex{}, isClosed: false, } @@ -92,10 +92,10 @@ func (consumer *Consumer) Run(handler Handler) error { } handler = func(d Delivery) (action Action) { - if !consumer.handlerMux.TryRLock() { + if !consumer.handlerMu.TryRLock() { return NackRequeue } - defer consumer.handlerMux.RUnlock() + defer consumer.handlerMu.RUnlock() return handler(d) } @@ -134,8 +134,8 @@ func (consumer *Consumer) Close() { } func (consumer *Consumer) cleanupResources() { - consumer.isClosedMux.Lock() - defer consumer.isClosedMux.Unlock() + consumer.isClosedMu.Lock() + defer consumer.isClosedMu.Unlock() consumer.isClosed = true // close the channel so that rabbitmq server knows that the // consumer has been stopped. @@ -175,8 +175,8 @@ func (consumer *Consumer) startGoroutines( handler Handler, options ConsumerOptions, ) error { - consumer.isClosedMux.Lock() - defer consumer.isClosedMux.Unlock() + consumer.isClosedMu.Lock() + defer consumer.isClosedMu.Unlock() err := consumer.chanManager.QosSafe( options.QOSPrefetch, 0, @@ -221,8 +221,8 @@ func (consumer *Consumer) startGoroutines( } func (consumer *Consumer) getIsClosed() bool { - consumer.isClosedMux.RLock() - defer consumer.isClosedMux.RUnlock() + consumer.isClosedMu.RLock() + defer consumer.isClosedMu.RUnlock() return consumer.isClosed } @@ -266,8 +266,8 @@ func (consumer *Consumer) waitForHandlerCompletion(ctx context.Context) error { } c := make(chan struct{}) go func() { - consumer.handlerMux.Lock() - defer consumer.handlerMux.Unlock() + consumer.handlerMu.Lock() + defer consumer.handlerMu.Unlock() close(c) }() select { diff --git a/internal/channelmanager/channel_manager.go b/internal/channelmanager/channel_manager.go index 67faf0a..f07ab9b 100644 --- a/internal/channelmanager/channel_manager.go +++ b/internal/channelmanager/channel_manager.go @@ -13,14 +13,14 @@ import ( // ChannelManager - type ChannelManager struct { - logger logger.Logger - channel *amqp.Channel - connManager *connectionmanager.ConnectionManager - channelMux *sync.RWMutex - reconnectInterval time.Duration - reconnectionCount uint - reconnectionCountMux *sync.Mutex - dispatcher *dispatcher.Dispatcher + logger logger.Logger + channel *amqp.Channel + connManager *connectionmanager.ConnectionManager + channelMu *sync.RWMutex + reconnectInterval time.Duration + reconnectionCount uint + reconnectionCountMu *sync.Mutex + dispatcher *dispatcher.Dispatcher } // NewChannelManager creates a new connection manager @@ -31,14 +31,14 @@ func NewChannelManager(connManager *connectionmanager.ConnectionManager, log log } chanManager := ChannelManager{ - logger: log, - connManager: connManager, - channel: ch, - channelMux: &sync.RWMutex{}, - reconnectInterval: reconnectInterval, - reconnectionCount: 0, - reconnectionCountMux: &sync.Mutex{}, - dispatcher: dispatcher.NewDispatcher(), + logger: log, + connManager: connManager, + channel: ch, + channelMu: &sync.RWMutex{}, + reconnectInterval: reconnectInterval, + reconnectionCount: 0, + reconnectionCountMu: &sync.Mutex{}, + dispatcher: dispatcher.NewDispatcher(), } go chanManager.startNotifyCancelOrClosed() return &chanManager, nil @@ -84,14 +84,14 @@ func (chanManager *ChannelManager) startNotifyCancelOrClosed() { // GetReconnectionCount - func (chanManager *ChannelManager) GetReconnectionCount() uint { - chanManager.reconnectionCountMux.Lock() - defer chanManager.reconnectionCountMux.Unlock() + chanManager.reconnectionCountMu.Lock() + defer chanManager.reconnectionCountMu.Unlock() return chanManager.reconnectionCount } func (chanManager *ChannelManager) incrementReconnectionCount() { - chanManager.reconnectionCountMux.Lock() - defer chanManager.reconnectionCountMux.Unlock() + chanManager.reconnectionCountMu.Lock() + defer chanManager.reconnectionCountMu.Unlock() chanManager.reconnectionCount++ } @@ -113,8 +113,8 @@ func (chanManager *ChannelManager) reconnectLoop() { // reconnect safely closes the current channel and obtains a new one func (chanManager *ChannelManager) reconnect() error { - chanManager.channelMux.Lock() - defer chanManager.channelMux.Unlock() + chanManager.channelMu.Lock() + defer chanManager.channelMu.Unlock() newChannel, err := getNewChannel(chanManager.connManager) if err != nil { return err @@ -131,8 +131,8 @@ func (chanManager *ChannelManager) reconnect() error { // Close safely closes the current channel and connection func (chanManager *ChannelManager) Close() error { chanManager.logger.Infof("closing channel manager...") - chanManager.channelMux.Lock() - defer chanManager.channelMux.Unlock() + chanManager.channelMu.Lock() + defer chanManager.channelMu.Unlock() err := chanManager.channel.Close() if err != nil { diff --git a/internal/channelmanager/safe_wraps.go b/internal/channelmanager/safe_wraps.go index b75a5f5..8d02019 100644 --- a/internal/channelmanager/safe_wraps.go +++ b/internal/channelmanager/safe_wraps.go @@ -16,8 +16,8 @@ func (chanManager *ChannelManager) ConsumeSafe( noWait bool, args amqp.Table, ) (<-chan amqp.Delivery, error) { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.Consume( queue, @@ -39,8 +39,8 @@ func (chanManager *ChannelManager) QueueDeclarePassiveSafe( noWait bool, args amqp.Table, ) (amqp.Queue, error) { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.QueueDeclarePassive( name, @@ -56,8 +56,8 @@ func (chanManager *ChannelManager) QueueDeclarePassiveSafe( func (chanManager *ChannelManager) QueueDeclareSafe( name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table, ) (amqp.Queue, error) { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.QueueDeclare( name, @@ -73,8 +73,8 @@ func (chanManager *ChannelManager) QueueDeclareSafe( func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe( name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, ) error { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.ExchangeDeclarePassive( name, @@ -91,8 +91,8 @@ func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe( func (chanManager *ChannelManager) ExchangeDeclareSafe( name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, ) error { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.ExchangeDeclare( name, @@ -109,8 +109,8 @@ func (chanManager *ChannelManager) ExchangeDeclareSafe( func (chanManager *ChannelManager) QueueBindSafe( name string, key string, exchange string, noWait bool, args amqp.Table, ) error { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.QueueBind( name, @@ -125,8 +125,8 @@ func (chanManager *ChannelManager) QueueBindSafe( func (chanManager *ChannelManager) QosSafe( prefetchCount int, prefetchSize int, global bool, ) error { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.Qos( prefetchCount, @@ -141,8 +141,8 @@ PublishSafe safely wraps the (*amqp.Channel).Publish method. func (chanManager *ChannelManager) PublishSafe( exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, ) error { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.PublishWithContext( context.Background(), @@ -158,8 +158,8 @@ func (chanManager *ChannelManager) PublishSafe( func (chanManager *ChannelManager) PublishWithContextSafe( ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, ) error { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.PublishWithContext( ctx, @@ -174,8 +174,8 @@ func (chanManager *ChannelManager) PublishWithContextSafe( func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe( ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, ) (*amqp.DeferredConfirmation, error) { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.PublishWithDeferredConfirmWithContext( ctx, @@ -191,8 +191,8 @@ func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe( func (chanManager *ChannelManager) NotifyReturnSafe( c chan amqp.Return, ) chan amqp.Return { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.NotifyReturn( c, @@ -203,8 +203,8 @@ func (chanManager *ChannelManager) NotifyReturnSafe( func (chanManager *ChannelManager) ConfirmSafe( noWait bool, ) error { - chanManager.channelMux.Lock() - defer chanManager.channelMux.Unlock() + chanManager.channelMu.Lock() + defer chanManager.channelMu.Unlock() return chanManager.channel.Confirm( noWait, @@ -215,8 +215,8 @@ func (chanManager *ChannelManager) ConfirmSafe( func (chanManager *ChannelManager) NotifyPublishSafe( confirm chan amqp.Confirmation, ) chan amqp.Confirmation { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.NotifyPublish( confirm, @@ -227,8 +227,8 @@ func (chanManager *ChannelManager) NotifyPublishSafe( func (chanManager *ChannelManager) NotifyFlowSafe( c chan bool, ) chan bool { - chanManager.channelMux.RLock() - defer chanManager.channelMux.RUnlock() + chanManager.channelMu.RLock() + defer chanManager.channelMu.RUnlock() return chanManager.channel.NotifyFlow( c, diff --git a/internal/connectionmanager/connection_manager.go b/internal/connectionmanager/connection_manager.go index 541c57f..8e659ea 100644 --- a/internal/connectionmanager/connection_manager.go +++ b/internal/connectionmanager/connection_manager.go @@ -13,15 +13,15 @@ import ( // ConnectionManager - type ConnectionManager struct { - logger logger.Logger - resolver Resolver - connection *amqp.Connection - amqpConfig amqp.Config - connectionMux *sync.RWMutex - ReconnectInterval time.Duration - reconnectionCount uint - reconnectionCountMux *sync.Mutex - dispatcher *dispatcher.Dispatcher + logger logger.Logger + resolver Resolver + connection *amqp.Connection + amqpConfig amqp.Config + connectionMu *sync.RWMutex + ReconnectInterval time.Duration + reconnectionCount uint + reconnectionCountMu *sync.Mutex + dispatcher *dispatcher.Dispatcher } type Resolver interface { @@ -56,15 +56,15 @@ func NewConnectionManager(resolver Resolver, conf amqp.Config, log logger.Logger } connManager := ConnectionManager{ - logger: log, - resolver: resolver, - connection: conn, - amqpConfig: conf, - connectionMux: &sync.RWMutex{}, - ReconnectInterval: reconnectInterval, - reconnectionCount: 0, - reconnectionCountMux: &sync.Mutex{}, - dispatcher: dispatcher.NewDispatcher(), + logger: log, + resolver: resolver, + connection: conn, + amqpConfig: conf, + connectionMu: &sync.RWMutex{}, + ReconnectInterval: reconnectInterval, + reconnectionCount: 0, + reconnectionCountMu: &sync.Mutex{}, + dispatcher: dispatcher.NewDispatcher(), } go connManager.startNotifyClose() return &connManager, nil @@ -73,8 +73,8 @@ func NewConnectionManager(resolver Resolver, conf amqp.Config, log logger.Logger // Close safely closes the current channel and connection func (connManager *ConnectionManager) Close() error { connManager.logger.Infof("closing connection manager...") - connManager.connectionMux.Lock() - defer connManager.connectionMux.Unlock() + connManager.connectionMu.Lock() + defer connManager.connectionMu.Unlock() err := connManager.connection.Close() if err != nil { @@ -91,13 +91,13 @@ func (connManager *ConnectionManager) NotifyReconnect() (<-chan error, chan<- st // CheckoutConnection - func (connManager *ConnectionManager) CheckoutConnection() *amqp.Connection { - connManager.connectionMux.RLock() + connManager.connectionMu.RLock() return connManager.connection } // CheckinConnection - func (connManager *ConnectionManager) CheckinConnection() { - connManager.connectionMux.RUnlock() + connManager.connectionMu.RUnlock() } // startNotifyCancelOrClosed listens on the channel's cancelled and closed @@ -121,14 +121,14 @@ func (connManager *ConnectionManager) startNotifyClose() { // GetReconnectionCount - func (connManager *ConnectionManager) GetReconnectionCount() uint { - connManager.reconnectionCountMux.Lock() - defer connManager.reconnectionCountMux.Unlock() + connManager.reconnectionCountMu.Lock() + defer connManager.reconnectionCountMu.Unlock() return connManager.reconnectionCount } func (connManager *ConnectionManager) incrementReconnectionCount() { - connManager.reconnectionCountMux.Lock() - defer connManager.reconnectionCountMux.Unlock() + connManager.reconnectionCountMu.Lock() + defer connManager.reconnectionCountMu.Unlock() connManager.reconnectionCount++ } @@ -150,8 +150,8 @@ func (connManager *ConnectionManager) reconnectLoop() { // reconnect safely closes the current channel and obtains a new one func (connManager *ConnectionManager) reconnect() error { - connManager.connectionMux.Lock() - defer connManager.connectionMux.Unlock() + connManager.connectionMu.Lock() + defer connManager.connectionMu.Unlock() conn, err := dial(connManager.logger, connManager.resolver, amqp.Config(connManager.amqpConfig)) if err != nil { diff --git a/internal/connectionmanager/safe_wraps.go b/internal/connectionmanager/safe_wraps.go index b6702af..6a6abbc 100644 --- a/internal/connectionmanager/safe_wraps.go +++ b/internal/connectionmanager/safe_wraps.go @@ -8,8 +8,8 @@ import ( func (connManager *ConnectionManager) NotifyBlockedSafe( receiver chan amqp.Blocking, ) chan amqp.Blocking { - connManager.connectionMux.RLock() - defer connManager.connectionMux.RUnlock() + connManager.connectionMu.RLock() + defer connManager.connectionMu.RUnlock() return connManager.connection.NotifyBlocked( receiver, diff --git a/internal/dispatcher/dispatcher.go b/internal/dispatcher/dispatcher.go index 52385c6..a4592a1 100644 --- a/internal/dispatcher/dispatcher.go +++ b/internal/dispatcher/dispatcher.go @@ -10,8 +10,8 @@ import ( // Dispatcher - type Dispatcher struct { - subscribers map[int]dispatchSubscriber - subscribersMux *sync.Mutex + subscribers map[int]dispatchSubscriber + subscribersMu *sync.Mutex } type dispatchSubscriber struct { @@ -22,15 +22,15 @@ type dispatchSubscriber struct { // NewDispatcher - func NewDispatcher() *Dispatcher { return &Dispatcher{ - subscribers: make(map[int]dispatchSubscriber), - subscribersMux: &sync.Mutex{}, + subscribers: make(map[int]dispatchSubscriber), + subscribersMu: &sync.Mutex{}, } } // Dispatch - func (d *Dispatcher) Dispatch(err error) error { - d.subscribersMux.Lock() - defer d.subscribersMux.Unlock() + d.subscribersMu.Lock() + defer d.subscribersMu.Unlock() for _, subscriber := range d.subscribers { select { case <-time.After(time.Second * 5): @@ -50,17 +50,17 @@ func (d *Dispatcher) AddSubscriber() (<-chan error, chan<- struct{}) { closeCh := make(chan struct{}) notifyCancelOrCloseChan := make(chan error) - d.subscribersMux.Lock() + d.subscribersMu.Lock() d.subscribers[id] = dispatchSubscriber{ notifyCancelOrCloseChan: notifyCancelOrCloseChan, closeCh: closeCh, } - d.subscribersMux.Unlock() + d.subscribersMu.Unlock() go func(id int) { <-closeCh - d.subscribersMux.Lock() - defer d.subscribersMux.Unlock() + d.subscribersMu.Lock() + defer d.subscribersMu.Unlock() sub, ok := d.subscribers[id] if !ok { return diff --git a/internal/dispatcher/dispatcher_test.go b/internal/dispatcher/dispatcher_test.go index afc5509..3e80dcb 100644 --- a/internal/dispatcher/dispatcher_test.go +++ b/internal/dispatcher/dispatcher_test.go @@ -10,8 +10,8 @@ func TestNewDispatcher(t *testing.T) { if d.subscribers == nil { t.Error("Dispatcher subscribers is nil") } - if d.subscribersMux == nil { - t.Error("Dispatcher subscribersMux is nil") + if d.subscribersMu == nil { + t.Error("Dispatcher subscribersMu is nil") } } diff --git a/publish.go b/publish.go index a58b48d..06f9cb0 100644 --- a/publish.go +++ b/publish.go @@ -47,13 +47,13 @@ type Publisher struct { reconnectErrCh <-chan error closeConnectionToManagerCh chan<- struct{} - disablePublishDueToFlow bool - disablePublishDueToFlowMux *sync.RWMutex + disablePublishDueToFlow bool + disablePublishDueToFlowMu *sync.RWMutex - disablePublishDueToBlocked bool - disablePublishDueToBlockedMux *sync.RWMutex + disablePublishDueToBlocked bool + disablePublishDueToBlockedMu *sync.RWMutex - handlerMux *sync.Mutex + handlerMu *sync.Mutex notifyReturnHandler func(r Return) notifyPublishHandler func(p Confirmation) @@ -85,18 +85,18 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe reconnectErrCh, closeCh := chanManager.NotifyReconnect() publisher := &Publisher{ - chanManager: chanManager, - connManager: conn.connectionManager, - reconnectErrCh: reconnectErrCh, - closeConnectionToManagerCh: closeCh, - disablePublishDueToFlow: false, - disablePublishDueToFlowMux: &sync.RWMutex{}, - disablePublishDueToBlocked: false, - disablePublishDueToBlockedMux: &sync.RWMutex{}, - handlerMux: &sync.Mutex{}, - notifyReturnHandler: nil, - notifyPublishHandler: nil, - options: *options, + chanManager: chanManager, + connManager: conn.connectionManager, + reconnectErrCh: reconnectErrCh, + closeConnectionToManagerCh: closeCh, + disablePublishDueToFlow: false, + disablePublishDueToFlowMu: &sync.RWMutex{}, + disablePublishDueToBlocked: false, + disablePublishDueToBlockedMu: &sync.RWMutex{}, + handlerMu: &sync.Mutex{}, + notifyReturnHandler: nil, + notifyPublishHandler: nil, + options: *options, } err = publisher.startup() @@ -155,14 +155,14 @@ func (publisher *Publisher) PublishWithContext( routingKeys []string, optionFuncs ...func(*PublishOptions), ) error { - publisher.disablePublishDueToFlowMux.RLock() - defer publisher.disablePublishDueToFlowMux.RUnlock() + publisher.disablePublishDueToFlowMu.RLock() + defer publisher.disablePublishDueToFlowMu.RUnlock() if publisher.disablePublishDueToFlow { return fmt.Errorf("publishing blocked due to high flow on the server") } - publisher.disablePublishDueToBlockedMux.RLock() - defer publisher.disablePublishDueToBlockedMux.RUnlock() + publisher.disablePublishDueToBlockedMu.RLock() + defer publisher.disablePublishDueToBlockedMu.RUnlock() if publisher.disablePublishDueToBlocked { return fmt.Errorf("publishing blocked due to TCP block on the server") } @@ -219,14 +219,14 @@ func (publisher *Publisher) PublishWithDeferredConfirmWithContext( routingKeys []string, optionFuncs ...func(*PublishOptions), ) (PublisherConfirmation, error) { - publisher.disablePublishDueToFlowMux.RLock() - defer publisher.disablePublishDueToFlowMux.RUnlock() + publisher.disablePublishDueToFlowMu.RLock() + defer publisher.disablePublishDueToFlowMu.RUnlock() if publisher.disablePublishDueToFlow { return nil, fmt.Errorf("publishing blocked due to high flow on the server") } - publisher.disablePublishDueToBlockedMux.RLock() - defer publisher.disablePublishDueToBlockedMux.RUnlock() + publisher.disablePublishDueToBlockedMu.RLock() + defer publisher.disablePublishDueToBlockedMu.RUnlock() if publisher.disablePublishDueToBlocked { return nil, fmt.Errorf("publishing blocked due to TCP block on the server") } @@ -296,10 +296,10 @@ func (publisher *Publisher) Close() { // These notifications are shared across an entire connection, so if you're creating multiple // publishers on the same connection keep that in mind func (publisher *Publisher) NotifyReturn(handler func(r Return)) { - publisher.handlerMux.Lock() + publisher.handlerMu.Lock() start := publisher.notifyReturnHandler == nil publisher.notifyReturnHandler = handler - publisher.handlerMux.Unlock() + publisher.handlerMu.Unlock() if start { publisher.startReturnHandler() @@ -310,10 +310,10 @@ func (publisher *Publisher) NotifyReturn(handler func(r Return)) { // These notifications are shared across an entire connection, so if you're creating multiple // publishers on the same connection keep that in mind func (publisher *Publisher) NotifyPublish(handler func(p Confirmation)) { - publisher.handlerMux.Lock() + publisher.handlerMu.Lock() shouldStart := publisher.notifyPublishHandler == nil publisher.notifyPublishHandler = handler - publisher.handlerMux.Unlock() + publisher.handlerMu.Unlock() if shouldStart { publisher.startPublishHandler() @@ -321,12 +321,12 @@ func (publisher *Publisher) NotifyPublish(handler func(p Confirmation)) { } func (publisher *Publisher) startReturnHandler() { - publisher.handlerMux.Lock() + publisher.handlerMu.Lock() if publisher.notifyReturnHandler == nil { - publisher.handlerMux.Unlock() + publisher.handlerMu.Unlock() return } - publisher.handlerMux.Unlock() + publisher.handlerMu.Unlock() go func() { returns := publisher.chanManager.NotifyReturnSafe(make(chan amqp.Return, 1)) @@ -337,12 +337,12 @@ func (publisher *Publisher) startReturnHandler() { } func (publisher *Publisher) startPublishHandler() { - publisher.handlerMux.Lock() + publisher.handlerMu.Lock() if publisher.notifyPublishHandler == nil { - publisher.handlerMux.Unlock() + publisher.handlerMu.Unlock() return } - publisher.handlerMux.Unlock() + publisher.handlerMu.Unlock() publisher.chanManager.ConfirmSafe(false) go func() { diff --git a/publish_flow_block.go b/publish_flow_block.go index 5033037..b978a21 100644 --- a/publish_flow_block.go +++ b/publish_flow_block.go @@ -6,12 +6,12 @@ import ( func (publisher *Publisher) startNotifyFlowHandler() { notifyFlowChan := publisher.chanManager.NotifyFlowSafe(make(chan bool)) - publisher.disablePublishDueToFlowMux.Lock() + publisher.disablePublishDueToFlowMu.Lock() publisher.disablePublishDueToFlow = false - publisher.disablePublishDueToFlowMux.Unlock() + publisher.disablePublishDueToFlowMu.Unlock() for ok := range notifyFlowChan { - publisher.disablePublishDueToFlowMux.Lock() + publisher.disablePublishDueToFlowMu.Lock() if ok { publisher.options.Logger.Warnf("pausing publishing due to flow request from server") publisher.disablePublishDueToFlow = true @@ -19,18 +19,18 @@ func (publisher *Publisher) startNotifyFlowHandler() { publisher.disablePublishDueToFlow = false publisher.options.Logger.Warnf("resuming publishing due to flow request from server") } - publisher.disablePublishDueToFlowMux.Unlock() + publisher.disablePublishDueToFlowMu.Unlock() } } func (publisher *Publisher) startNotifyBlockedHandler() { blockings := publisher.connManager.NotifyBlockedSafe(make(chan amqp.Blocking)) - publisher.disablePublishDueToBlockedMux.Lock() + publisher.disablePublishDueToBlockedMu.Lock() publisher.disablePublishDueToBlocked = false - publisher.disablePublishDueToBlockedMux.Unlock() + publisher.disablePublishDueToBlockedMu.Unlock() for b := range blockings { - publisher.disablePublishDueToBlockedMux.Lock() + publisher.disablePublishDueToBlockedMu.Lock() if b.Active { publisher.options.Logger.Warnf("pausing publishing due to TCP blocking from server") publisher.disablePublishDueToBlocked = true @@ -38,6 +38,6 @@ func (publisher *Publisher) startNotifyBlockedHandler() { publisher.disablePublishDueToBlocked = false publisher.options.Logger.Warnf("resuming publishing due to TCP blocking from server") } - publisher.disablePublishDueToBlockedMux.Unlock() + publisher.disablePublishDueToBlockedMu.Unlock() } }