Browse Source

add option to enable publisher confirmations

this makes it easier to use `PublishWithDeferredConfirmWithContext`
without generating a custom handler that does nothing.

Add an additional example for this that also shows how to use
PublishWithDeferredConfirmWithContext.

Closes: https://github.com/wagslane/go-rabbitmq/issues/116
pull/121/head
Felix Huettner 3 years ago
parent
commit
ba9044a7f7
4 changed files with 109 additions and 1 deletions
  1. +1
    -0
      examples/publisher_confirm/.gitignore
  2. +91
    -0
      examples/publisher_confirm/main.go
  3. +8
    -0
      publish.go
  4. +9
    -1
      publisher_options.go

+ 1
- 0
examples/publisher_confirm/.gitignore View File

@ -0,0 +1 @@
publisher_confirm

+ 91
- 0
examples/publisher_confirm/main.go View File

@ -0,0 +1,91 @@
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
func main() {
conn, err := rabbitmq.NewConn(
"amqp://guest:guest@localhost",
rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
publisher, err := rabbitmq.NewPublisher(
conn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeName("events"),
rabbitmq.WithPublisherOptionsExchangeDeclare,
rabbitmq.WithPublisherOptionsConfirm,
)
if err != nil {
log.Fatal(err)
}
defer publisher.Close()
publisher.NotifyReturn(func(r rabbitmq.Return) {
log.Printf("message returned from server: %s", string(r.Body))
})
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println()
fmt.Println(sig)
done <- true
}()
fmt.Println("awaiting signal")
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
confirms, err := publisher.PublishWithDeferredConfirmWithContext(
context.Background(),
[]byte("hello, world"),
[]string{"my_routing_key"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Println(err)
continue
} else if len(confirms) == 0 || confirms[0] == nil {
fmt.Println("message publishing not confirmed")
continue
}
fmt.Println("message published")
ok, err := confirms[0].WaitContext(context.Background())
if err != nil {
log.Println(err)
}
if ok {
fmt.Println("message publishing confirmed")
} else {
fmt.Println("message publishing not confirmed")
}
case <-done:
fmt.Println("stopping publisher")
return
}
}
}

+ 8
- 0
publish.go View File

@ -118,6 +118,10 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe
}
}()
if options.ConfirmMode {
publisher.NotifyPublish(func(_ Confirmation) {})
}
return publisher, nil
}
@ -202,6 +206,10 @@ func (publisher *Publisher) PublishWithContext(
return nil
}
// PublishWithContext publishes the provided data to the given routing keys over the connection.
// if the publisher is in confirm mode (which can be either done by calling `NotifyPublish` with a custom handler
// or by using `WithPublisherOptionsConfirm`) a publisher confirmation is returned.
// This confirmation can be used to check if the message was actually published or wait for this to happen.
func (publisher *Publisher) PublishWithDeferredConfirmWithContext(
ctx context.Context,
data []byte,


+ 9
- 1
publisher_options.go View File

@ -7,6 +7,7 @@ import amqp "github.com/rabbitmq/amqp091-go"
type PublisherOptions struct {
ExchangeOptions ExchangeOptions
Logger Logger
ConfirmMode bool
}
// getDefaultPublisherOptions describes the options that will be used when a value isn't provided
@ -23,7 +24,8 @@ func getDefaultPublisherOptions() PublisherOptions {
Args: Table{},
Declare: false,
},
Logger: stdDebugLogger{},
Logger: stdDebugLogger{},
ConfirmMode: false,
}
}
@ -91,3 +93,9 @@ func WithPublisherOptionsExchangeArgs(args Table) func(*PublisherOptions) {
options.ExchangeOptions.Args = args
}
}
// WithPublisherOptionsConfirm enables confirm mode on the connection
// this is required if publisher confirmations should be used
func WithPublisherOptionsConfirm(options *PublisherOptions) {
options.ConfirmMode = true
}

Loading…
Cancel
Save