Browse Source

Merge pull request #54 from wagslane/feature_40_publishconfirms

allow publishing with confirms
pull/57/head
Lane Wagner 4 years ago
committed by GitHub
parent
commit
bd23eaa091
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 122 additions and 27 deletions
  1. +19
    -5
      channel.go
  2. +54
    -11
      examples/publisher/main.go
  3. +49
    -11
      publish.go

+ 19
- 5
channel.go View File

@ -93,18 +93,32 @@ func (chManager *channelManager) reconnectWithBackoff() {
// reconnect safely closes the current channel and obtains a new one
func (chManager *channelManager) reconnect() error {
chManager.channelMux.Lock()
defer chManager.channelMux.Unlock()
newConn, newChannel, err := getNewChannel(chManager.url, chManager.config)
err := chManager.close()
if err != nil {
return err
}
chManager.channel.Close()
chManager.connection.Close()
newConn, newChannel, err := getNewChannel(chManager.url, chManager.config)
if err != nil {
return err
}
chManager.channelMux.Lock()
defer chManager.channelMux.Unlock()
chManager.connection = newConn
chManager.channel = newChannel
go chManager.startNotifyCancelOrClosed()
return nil
}
// close safely closes the current channel
func (chManager *channelManager) close() error {
chManager.channelMux.Lock()
defer chManager.channelMux.Unlock()
err := chManager.connection.Close()
if err != nil {
return err
}
return nil
}

+ 54
- 11
examples/publisher/main.go View File

@ -1,7 +1,12 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
amqp "github.com/rabbitmq/amqp091-go"
rabbitmq "github.com/wagslane/go-rabbitmq"
@ -11,17 +16,7 @@ func main() {
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", amqp.Config{},
rabbitmq.WithPublisherOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
err = publisher.Publish(
[]byte("hello, world"),
[]string{"routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
rabbitmq.WithPublishOptionsConfirmPublishings,
)
if err != nil {
log.Fatal(err)
@ -33,4 +28,52 @@ func main() {
log.Printf("message returned from server: %s", string(r.Body))
}
}()
confirmations := publisher.NotifyPublish()
go func() {
for c := range confirmations {
log.Printf("message confirmed from server. tag: %v, ack: %v", c.DeliveryTag, c.Ack)
}
}()
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println()
fmt.Println(sig)
done <- true
}()
fmt.Println("awaiting signal")
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
err = publisher.Publish(
[]byte("hello, world"),
[]string{"routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Fatal(err)
}
case <-done:
fmt.Println("stopping publisher")
err := publisher.StopPublishing()
if err != nil {
log.Fatal(err)
}
fmt.Println("publisher stopped")
return
}
}
}

+ 49
- 11
publish.go View File

@ -28,23 +28,31 @@ type Return struct {
amqp.Return
}
// Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag.
// Use NotifyPublish to consume these events.
type Confirmation struct {
amqp.Confirmation
}
// Publisher allows you to publish messages safely across an open connection
type Publisher struct {
chManager *channelManager
notifyReturnChan chan Return
notifyReturnChan chan Return
notifyPublishChan chan Confirmation
disablePublishDueToFlow bool
disablePublishDueToFlowMux *sync.RWMutex
logger Logger
options PublisherOptions
}
// PublisherOptions are used to describe a publisher's configuration.
// Logging set to true will enable the consumer to print to stdout
type PublisherOptions struct {
Logging bool
Logger Logger
Logging bool
Logger Logger
ConfirmPublishings bool
}
// WithPublisherOptionsLogging sets logging to true on the consumer options
@ -53,6 +61,11 @@ func WithPublisherOptionsLogging(options *PublisherOptions) {
options.Logger = &stdLogger{}
}
// WithPublishOptionsConfirmPublishings allows NotifyPublish to work
func WithPublishOptionsConfirmPublishings(options *PublisherOptions) {
options.ConfirmPublishings = true
}
// WithPublisherOptionsLogger sets logging to a custom interface.
// Use WithPublisherOptionsLogging to just log to stdout.
func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
@ -85,8 +98,9 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
chManager: chManager,
disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{},
logger: options.Logger,
options: *options,
notifyReturnChan: nil,
notifyPublishChan: nil,
}
go publisher.startNotifyFlowHandler()
@ -94,11 +108,14 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
// restart notifiers when cancel/close is triggered
go func() {
for err := range publisher.chManager.notifyCancelOrClose {
publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err)
publisher.options.Logger.Printf("publish cancel/close handler triggered. err: %v", err)
go publisher.startNotifyFlowHandler()
if publisher.notifyReturnChan != nil {
go publisher.startNotifyReturnHandler()
}
if publisher.notifyPublishChan != nil && publisher.options.ConfirmPublishings {
go publisher.startNotifyPublishHandler()
}
}
}()
@ -113,6 +130,16 @@ func (publisher *Publisher) NotifyReturn() <-chan Return {
return publisher.notifyReturnChan
}
// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option
func (publisher *Publisher) NotifyPublish() <-chan Confirmation {
if !publisher.options.ConfirmPublishings {
return nil
}
publisher.notifyPublishChan = make(chan Confirmation)
go publisher.startNotifyPublishHandler()
return publisher.notifyPublishChan
}
// Publish publishes the provided data to the given routing keys over the connection
func (publisher *Publisher) Publish(
data []byte,
@ -167,9 +194,12 @@ func (publisher *Publisher) Publish(
// StopPublishing stops the publishing of messages.
// The publisher should be discarded as it's not safe for re-use
func (publisher Publisher) StopPublishing() {
publisher.chManager.channel.Close()
publisher.chManager.connection.Close()
func (publisher Publisher) StopPublishing() error {
err := publisher.chManager.close()
if err != nil {
return err
}
return nil
}
func (publisher *Publisher) startNotifyFlowHandler() {
@ -183,11 +213,11 @@ func (publisher *Publisher) startNotifyFlowHandler() {
for ok := range notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock()
if ok {
publisher.logger.Printf("pausing publishing due to flow request from server")
publisher.options.Logger.Printf("pausing publishing due to flow request from server")
publisher.disablePublishDueToFlow = true
} else {
publisher.disablePublishDueToFlow = false
publisher.logger.Printf("resuming publishing due to flow request from server")
publisher.options.Logger.Printf("resuming publishing due to flow request from server")
}
publisher.disablePublishDueToFlowMux.Unlock()
}
@ -199,3 +229,11 @@ func (publisher *Publisher) startNotifyReturnHandler() {
publisher.notifyReturnChan <- Return{ret}
}
}
func (publisher *Publisher) startNotifyPublishHandler() {
publisher.chManager.channel.Confirm(false)
publishAMQPCh := publisher.chManager.channel.NotifyPublish(make(chan amqp.Confirmation, 1))
for conf := range publishAMQPCh {
publisher.notifyPublishChan <- Confirmation{conf}
}
}

Loading…
Cancel
Save