|
|
@ -28,23 +28,31 @@ type Return struct { |
|
|
amqp.Return |
|
|
amqp.Return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag.
|
|
|
|
|
|
// Use NotifyPublish to consume these events.
|
|
|
|
|
|
type Confirmation struct { |
|
|
|
|
|
amqp.Confirmation |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Publisher allows you to publish messages safely across an open connection
|
|
|
// Publisher allows you to publish messages safely across an open connection
|
|
|
type Publisher struct { |
|
|
type Publisher struct { |
|
|
chManager *channelManager |
|
|
chManager *channelManager |
|
|
|
|
|
|
|
|
notifyReturnChan chan Return |
|
|
|
|
|
|
|
|
notifyReturnChan chan Return |
|
|
|
|
|
notifyPublishChan chan Confirmation |
|
|
|
|
|
|
|
|
disablePublishDueToFlow bool |
|
|
disablePublishDueToFlow bool |
|
|
disablePublishDueToFlowMux *sync.RWMutex |
|
|
disablePublishDueToFlowMux *sync.RWMutex |
|
|
|
|
|
|
|
|
logger Logger |
|
|
|
|
|
|
|
|
options PublisherOptions |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// PublisherOptions are used to describe a publisher's configuration.
|
|
|
// PublisherOptions are used to describe a publisher's configuration.
|
|
|
// Logging set to true will enable the consumer to print to stdout
|
|
|
// Logging set to true will enable the consumer to print to stdout
|
|
|
type PublisherOptions struct { |
|
|
type PublisherOptions struct { |
|
|
Logging bool |
|
|
|
|
|
Logger Logger |
|
|
|
|
|
|
|
|
Logging bool |
|
|
|
|
|
Logger Logger |
|
|
|
|
|
ConfirmPublishings bool |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
// WithPublisherOptionsLogging sets logging to true on the consumer options
|
|
|
// WithPublisherOptionsLogging sets logging to true on the consumer options
|
|
|
@ -53,6 +61,11 @@ func WithPublisherOptionsLogging(options *PublisherOptions) { |
|
|
options.Logger = &stdLogger{} |
|
|
options.Logger = &stdLogger{} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// WithPublishOptionsConfirmPublishings allows NotifyPublish to work
|
|
|
|
|
|
func WithPublishOptionsConfirmPublishings(options *PublisherOptions) { |
|
|
|
|
|
options.ConfirmPublishings = true |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// WithPublisherOptionsLogger sets logging to a custom interface.
|
|
|
// WithPublisherOptionsLogger sets logging to a custom interface.
|
|
|
// Use WithPublisherOptionsLogging to just log to stdout.
|
|
|
// Use WithPublisherOptionsLogging to just log to stdout.
|
|
|
func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { |
|
|
func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { |
|
|
@ -85,8 +98,9 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher |
|
|
chManager: chManager, |
|
|
chManager: chManager, |
|
|
disablePublishDueToFlow: false, |
|
|
disablePublishDueToFlow: false, |
|
|
disablePublishDueToFlowMux: &sync.RWMutex{}, |
|
|
disablePublishDueToFlowMux: &sync.RWMutex{}, |
|
|
logger: options.Logger, |
|
|
|
|
|
|
|
|
options: *options, |
|
|
notifyReturnChan: nil, |
|
|
notifyReturnChan: nil, |
|
|
|
|
|
notifyPublishChan: nil, |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
go publisher.startNotifyFlowHandler() |
|
|
go publisher.startNotifyFlowHandler() |
|
|
@ -94,11 +108,14 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher |
|
|
// restart notifiers when cancel/close is triggered
|
|
|
// restart notifiers when cancel/close is triggered
|
|
|
go func() { |
|
|
go func() { |
|
|
for err := range publisher.chManager.notifyCancelOrClose { |
|
|
for err := range publisher.chManager.notifyCancelOrClose { |
|
|
publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) |
|
|
|
|
|
|
|
|
publisher.options.Logger.Printf("publish cancel/close handler triggered. err: %v", err) |
|
|
go publisher.startNotifyFlowHandler() |
|
|
go publisher.startNotifyFlowHandler() |
|
|
if publisher.notifyReturnChan != nil { |
|
|
if publisher.notifyReturnChan != nil { |
|
|
go publisher.startNotifyReturnHandler() |
|
|
go publisher.startNotifyReturnHandler() |
|
|
} |
|
|
} |
|
|
|
|
|
if publisher.notifyPublishChan != nil && publisher.options.ConfirmPublishings { |
|
|
|
|
|
go publisher.startNotifyPublishHandler() |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
}() |
|
|
}() |
|
|
|
|
|
|
|
|
@ -113,6 +130,16 @@ func (publisher *Publisher) NotifyReturn() <-chan Return { |
|
|
return publisher.notifyReturnChan |
|
|
return publisher.notifyReturnChan |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option
|
|
|
|
|
|
func (publisher *Publisher) NotifyPublish() <-chan Confirmation { |
|
|
|
|
|
if !publisher.options.ConfirmPublishings { |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
publisher.notifyPublishChan = make(chan Confirmation) |
|
|
|
|
|
go publisher.startNotifyPublishHandler() |
|
|
|
|
|
return publisher.notifyPublishChan |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
// Publish publishes the provided data to the given routing keys over the connection
|
|
|
// Publish publishes the provided data to the given routing keys over the connection
|
|
|
func (publisher *Publisher) Publish( |
|
|
func (publisher *Publisher) Publish( |
|
|
data []byte, |
|
|
data []byte, |
|
|
@ -167,9 +194,12 @@ func (publisher *Publisher) Publish( |
|
|
|
|
|
|
|
|
// StopPublishing stops the publishing of messages.
|
|
|
// StopPublishing stops the publishing of messages.
|
|
|
// The publisher should be discarded as it's not safe for re-use
|
|
|
// The publisher should be discarded as it's not safe for re-use
|
|
|
func (publisher Publisher) StopPublishing() { |
|
|
|
|
|
publisher.chManager.channel.Close() |
|
|
|
|
|
publisher.chManager.connection.Close() |
|
|
|
|
|
|
|
|
func (publisher Publisher) StopPublishing() error { |
|
|
|
|
|
err := publisher.chManager.close() |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return err |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (publisher *Publisher) startNotifyFlowHandler() { |
|
|
func (publisher *Publisher) startNotifyFlowHandler() { |
|
|
@ -183,11 +213,11 @@ func (publisher *Publisher) startNotifyFlowHandler() { |
|
|
for ok := range notifyFlowChan { |
|
|
for ok := range notifyFlowChan { |
|
|
publisher.disablePublishDueToFlowMux.Lock() |
|
|
publisher.disablePublishDueToFlowMux.Lock() |
|
|
if ok { |
|
|
if ok { |
|
|
publisher.logger.Printf("pausing publishing due to flow request from server") |
|
|
|
|
|
|
|
|
publisher.options.Logger.Printf("pausing publishing due to flow request from server") |
|
|
publisher.disablePublishDueToFlow = true |
|
|
publisher.disablePublishDueToFlow = true |
|
|
} else { |
|
|
} else { |
|
|
publisher.disablePublishDueToFlow = false |
|
|
publisher.disablePublishDueToFlow = false |
|
|
publisher.logger.Printf("resuming publishing due to flow request from server") |
|
|
|
|
|
|
|
|
publisher.options.Logger.Printf("resuming publishing due to flow request from server") |
|
|
} |
|
|
} |
|
|
publisher.disablePublishDueToFlowMux.Unlock() |
|
|
publisher.disablePublishDueToFlowMux.Unlock() |
|
|
} |
|
|
} |
|
|
@ -199,3 +229,11 @@ func (publisher *Publisher) startNotifyReturnHandler() { |
|
|
publisher.notifyReturnChan <- Return{ret} |
|
|
publisher.notifyReturnChan <- Return{ret} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (publisher *Publisher) startNotifyPublishHandler() { |
|
|
|
|
|
publisher.chManager.channel.Confirm(false) |
|
|
|
|
|
publishAMQPCh := publisher.chManager.channel.NotifyPublish(make(chan amqp.Confirmation, 1)) |
|
|
|
|
|
for conf := range publishAMQPCh { |
|
|
|
|
|
publisher.notifyPublishChan <- Confirmation{conf} |
|
|
|
|
|
} |
|
|
|
|
|
} |