diff --git a/channel.go b/channel.go index 13f69bb..42c3853 100644 --- a/channel.go +++ b/channel.go @@ -93,18 +93,32 @@ func (chManager *channelManager) reconnectWithBackoff() { // reconnect safely closes the current channel and obtains a new one func (chManager *channelManager) reconnect() error { - chManager.channelMux.Lock() - defer chManager.channelMux.Unlock() - newConn, newChannel, err := getNewChannel(chManager.url, chManager.config) + err := chManager.close() if err != nil { return err } - chManager.channel.Close() - chManager.connection.Close() + newConn, newChannel, err := getNewChannel(chManager.url, chManager.config) + if err != nil { + return err + } + chManager.channelMux.Lock() + defer chManager.channelMux.Unlock() chManager.connection = newConn chManager.channel = newChannel go chManager.startNotifyCancelOrClosed() return nil } + +// close safely closes the current channel +func (chManager *channelManager) close() error { + chManager.channelMux.Lock() + defer chManager.channelMux.Unlock() + + err := chManager.connection.Close() + if err != nil { + return err + } + return nil +} diff --git a/examples/publisher/main.go b/examples/publisher/main.go index b5b0947..c89da82 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -1,7 +1,12 @@ package main import ( + "fmt" "log" + "os" + "os/signal" + "syscall" + "time" amqp "github.com/rabbitmq/amqp091-go" rabbitmq "github.com/wagslane/go-rabbitmq" @@ -11,17 +16,7 @@ func main() { publisher, err := rabbitmq.NewPublisher( "amqp://guest:guest@localhost", amqp.Config{}, rabbitmq.WithPublisherOptionsLogging, - ) - if err != nil { - log.Fatal(err) - } - err = publisher.Publish( - []byte("hello, world"), - []string{"routing_key"}, - rabbitmq.WithPublishOptionsContentType("application/json"), - rabbitmq.WithPublishOptionsMandatory, - rabbitmq.WithPublishOptionsPersistentDelivery, - rabbitmq.WithPublishOptionsExchange("events"), + rabbitmq.WithPublishOptionsConfirmPublishings, ) if err != nil { log.Fatal(err) @@ -33,4 +28,52 @@ func main() { log.Printf("message returned from server: %s", string(r.Body)) } }() + + confirmations := publisher.NotifyPublish() + go func() { + for c := range confirmations { + log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack) + } + }() + + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() + + fmt.Println("awaiting signal") + + ticker := time.NewTicker(time.Second) + for { + select { + case <-ticker.C: + err = publisher.Publish( + []byte("hello, world"), + []string{"routing_key"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + ) + if err != nil { + log.Fatal(err) + } + case <-done: + fmt.Println("stopping publisher") + err := publisher.StopPublishing() + if err != nil { + log.Fatal(err) + } + fmt.Println("publisher stopped") + return + } + } } diff --git a/publish.go b/publish.go index db250d4..ec464d0 100644 --- a/publish.go +++ b/publish.go @@ -28,23 +28,31 @@ type Return struct { amqp.Return } +// Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag. +// Use NotifyPublish to consume these events. +type Confirmation struct { + amqp.Confirmation +} + // Publisher allows you to publish messages safely across an open connection type Publisher struct { chManager *channelManager - notifyReturnChan chan Return + notifyReturnChan chan Return + notifyPublishChan chan Confirmation disablePublishDueToFlow bool disablePublishDueToFlowMux *sync.RWMutex - logger Logger + options PublisherOptions } // PublisherOptions are used to describe a publisher's configuration. // Logging set to true will enable the consumer to print to stdout type PublisherOptions struct { - Logging bool - Logger Logger + Logging bool + Logger Logger + ConfirmPublishings bool } // WithPublisherOptionsLogging sets logging to true on the consumer options @@ -53,6 +61,11 @@ func WithPublisherOptionsLogging(options *PublisherOptions) { options.Logger = &stdLogger{} } +// WithPublishOptionsConfirmPublishings allows NotifyPublish to work +func WithPublishOptionsConfirmPublishings(options *PublisherOptions) { + options.ConfirmPublishings = true +} + // WithPublisherOptionsLogger sets logging to a custom interface. // Use WithPublisherOptionsLogging to just log to stdout. func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { @@ -85,8 +98,9 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher chManager: chManager, disablePublishDueToFlow: false, disablePublishDueToFlowMux: &sync.RWMutex{}, - logger: options.Logger, + options: *options, notifyReturnChan: nil, + notifyPublishChan: nil, } go publisher.startNotifyFlowHandler() @@ -94,11 +108,14 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher // restart notifiers when cancel/close is triggered go func() { for err := range publisher.chManager.notifyCancelOrClose { - publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err) + publisher.options.Logger.Printf("publish cancel/close handler triggered. err: %v", err) go publisher.startNotifyFlowHandler() if publisher.notifyReturnChan != nil { go publisher.startNotifyReturnHandler() } + if publisher.notifyPublishChan != nil && publisher.options.ConfirmPublishings { + go publisher.startNotifyPublishHandler() + } } }() @@ -113,6 +130,16 @@ func (publisher *Publisher) NotifyReturn() <-chan Return { return publisher.notifyReturnChan } +// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option +func (publisher *Publisher) NotifyPublish() <-chan Confirmation { + if !publisher.options.ConfirmPublishings { + return nil + } + publisher.notifyPublishChan = make(chan Confirmation) + go publisher.startNotifyPublishHandler() + return publisher.notifyPublishChan +} + // Publish publishes the provided data to the given routing keys over the connection func (publisher *Publisher) Publish( data []byte, @@ -167,9 +194,12 @@ func (publisher *Publisher) Publish( // StopPublishing stops the publishing of messages. // The publisher should be discarded as it's not safe for re-use -func (publisher Publisher) StopPublishing() { - publisher.chManager.channel.Close() - publisher.chManager.connection.Close() +func (publisher Publisher) StopPublishing() error { + err := publisher.chManager.close() + if err != nil { + return err + } + return nil } func (publisher *Publisher) startNotifyFlowHandler() { @@ -183,11 +213,11 @@ func (publisher *Publisher) startNotifyFlowHandler() { for ok := range notifyFlowChan { publisher.disablePublishDueToFlowMux.Lock() if ok { - publisher.logger.Printf("pausing publishing due to flow request from server") + publisher.options.Logger.Printf("pausing publishing due to flow request from server") publisher.disablePublishDueToFlow = true } else { publisher.disablePublishDueToFlow = false - publisher.logger.Printf("resuming publishing due to flow request from server") + publisher.options.Logger.Printf("resuming publishing due to flow request from server") } publisher.disablePublishDueToFlowMux.Unlock() } @@ -199,3 +229,11 @@ func (publisher *Publisher) startNotifyReturnHandler() { publisher.notifyReturnChan <- Return{ret} } } + +func (publisher *Publisher) startNotifyPublishHandler() { + publisher.chManager.channel.Confirm(false) + publishAMQPCh := publisher.chManager.channel.NotifyPublish(make(chan amqp.Confirmation, 1)) + for conf := range publishAMQPCh { + publisher.notifyPublishChan <- Confirmation{conf} + } +}