diff --git a/publish.go b/publish.go index 1d71fa8..0b41339 100644 --- a/publish.go +++ b/publish.go @@ -218,15 +218,17 @@ func (publisher *Publisher) Publish( } func (publisher *Publisher) startNotifyFlowHandler() { + // Listeners for active=true flow control. When true is sent to a listener, + // publishing should pause until false is sent to listeners. for ok := range publisher.notifyFlowChan { publisher.disablePublishDueToFlowMux.Lock() - publisher.logger.Printf("pausing publishing due to flow request from server") if ok { - publisher.disablePublishDueToFlow = false - } else { + publisher.logger.Printf("pausing publishing due to flow request from server") publisher.disablePublishDueToFlow = true + } else { + publisher.disablePublishDueToFlow = false + publisher.logger.Printf("resuming publishing due to flow request from server") } publisher.disablePublishDueToFlowMux.Unlock() - publisher.logger.Printf("resuming publishing due to flow request from server") } }