From ba9044a7f7b306085d6a62bffcbecec94f4bccb5 Mon Sep 17 00:00:00 2001 From: Felix Huettner Date: Wed, 5 Apr 2023 10:50:36 +0200 Subject: [PATCH] add option to enable publisher confirmations this makes it easier to use `PublishWithDeferredConfirmWithContext` without generating a custom handler that does nothing. Add an additional example for this that also shows how to use PublishWithDeferredConfirmWithContext. Closes: https://github.com/wagslane/go-rabbitmq/issues/116 --- examples/publisher_confirm/.gitignore | 1 + examples/publisher_confirm/main.go | 91 +++++++++++++++++++++++++++ publish.go | 8 +++ publisher_options.go | 10 ++- 4 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 examples/publisher_confirm/.gitignore create mode 100644 examples/publisher_confirm/main.go diff --git a/examples/publisher_confirm/.gitignore b/examples/publisher_confirm/.gitignore new file mode 100644 index 0000000..3dde82f --- /dev/null +++ b/examples/publisher_confirm/.gitignore @@ -0,0 +1 @@ +publisher_confirm diff --git a/examples/publisher_confirm/main.go b/examples/publisher_confirm/main.go new file mode 100644 index 0000000..f5aecaf --- /dev/null +++ b/examples/publisher_confirm/main.go @@ -0,0 +1,91 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +func main() { + conn, err := rabbitmq.NewConn( + "amqp://guest:guest@localhost", + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + publisher, err := rabbitmq.NewPublisher( + conn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName("events"), + rabbitmq.WithPublisherOptionsExchangeDeclare, + rabbitmq.WithPublisherOptionsConfirm, + ) + if err != nil { + log.Fatal(err) + } + defer publisher.Close() + + publisher.NotifyReturn(func(r rabbitmq.Return) { + log.Printf("message returned from server: %s", string(r.Body)) + }) + + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() + + fmt.Println("awaiting signal") + + ticker := time.NewTicker(time.Second) + for { + select { + case <-ticker.C: + confirms, err := publisher.PublishWithDeferredConfirmWithContext( + context.Background(), + []byte("hello, world"), + []string{"my_routing_key"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + ) + if err != nil { + log.Println(err) + continue + } else if len(confirms) == 0 || confirms[0] == nil { + fmt.Println("message publishing not confirmed") + continue + } + fmt.Println("message published") + ok, err := confirms[0].WaitContext(context.Background()) + if err != nil { + log.Println(err) + } + if ok { + fmt.Println("message publishing confirmed") + } else { + fmt.Println("message publishing not confirmed") + } + case <-done: + fmt.Println("stopping publisher") + return + } + } +} diff --git a/publish.go b/publish.go index 8954018..59526d7 100644 --- a/publish.go +++ b/publish.go @@ -118,6 +118,10 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe } }() + if options.ConfirmMode { + publisher.NotifyPublish(func(_ Confirmation) {}) + } + return publisher, nil } @@ -202,6 +206,10 @@ func (publisher *Publisher) PublishWithContext( return nil } +// PublishWithContext publishes the provided data to the given routing keys over the connection. +// if the publisher is in confirm mode (which can be either done by calling `NotifyPublish` with a custom handler +// or by using `WithPublisherOptionsConfirm`) a publisher confirmation is returned. +// This confirmation can be used to check if the message was actually published or wait for this to happen. func (publisher *Publisher) PublishWithDeferredConfirmWithContext( ctx context.Context, data []byte, diff --git a/publisher_options.go b/publisher_options.go index 0e7a946..eb283e4 100644 --- a/publisher_options.go +++ b/publisher_options.go @@ -7,6 +7,7 @@ import amqp "github.com/rabbitmq/amqp091-go" type PublisherOptions struct { ExchangeOptions ExchangeOptions Logger Logger + ConfirmMode bool } // getDefaultPublisherOptions describes the options that will be used when a value isn't provided @@ -23,7 +24,8 @@ func getDefaultPublisherOptions() PublisherOptions { Args: Table{}, Declare: false, }, - Logger: stdDebugLogger{}, + Logger: stdDebugLogger{}, + ConfirmMode: false, } } @@ -91,3 +93,9 @@ func WithPublisherOptionsExchangeArgs(args Table) func(*PublisherOptions) { options.ExchangeOptions.Args = args } } + +// WithPublisherOptionsConfirm enables confirm mode on the connection +// this is required if publisher confirmations should be used +func WithPublisherOptionsConfirm(options *PublisherOptions) { + options.ConfirmMode = true +}