diff --git a/README.md b/README.md index f23b704..1800302 100644 --- a/README.md +++ b/README.md @@ -44,11 +44,6 @@ defer conn.Close() consumer, err := rabbitmq.NewConsumer( conn, - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, "my_queue", rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), rabbitmq.WithConsumerOptionsExchangeName("events"), @@ -58,6 +53,15 @@ if err != nil { log.Fatal(err) } defer consumer.Close() + +err = consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack +}) +if err != nil { + log.Fatal(err) +} ``` ## 🚀 Quick Start Publisher diff --git a/consume.go b/consume.go index 3d05a77..4517c5d 100644 --- a/consume.go +++ b/consume.go @@ -44,12 +44,9 @@ type Delivery struct { amqp.Delivery } -// NewConsumer returns a new Consumer connected to the given rabbitmq server -// it also starts consuming on the given connection with automatic reconnection handling -// Do not reuse the returned consumer for anything other than to close it +// NewConsumer returns a new Consumer connected to the given rabbitmq server. func NewConsumer( conn *Conn, - handler Handler, queue string, optionFuncs ...func(*ConsumerOptions), ) (*Consumer, error) { @@ -78,30 +75,32 @@ func NewConsumer( isClosed: false, } - err = consumer.startGoroutines( + return consumer, nil +} + +// Run starts consuming with automatic reconnection handling. Do not reuse the +// consumer for anything other than to close it. +func (consumer *Consumer) Run(handler Handler) error { + err := consumer.startGoroutines( handler, - *options, + consumer.options, ) if err != nil { - return nil, err + return err } - go func() { - for err := range consumer.reconnectErrCh { - consumer.options.Logger.Infof("successful consumer recovery from: %v", err) - err = consumer.startGoroutines( - handler, - *options, - ) - if err != nil { - consumer.options.Logger.Fatalf("error restarting consumer goroutines after cancel or close: %v", err) - consumer.options.Logger.Fatalf("consumer closing, unable to recover") - return - } + for err := range consumer.reconnectErrCh { + consumer.options.Logger.Infof("successful consumer recovery from: %v", err) + err = consumer.startGoroutines( + handler, + consumer.options, + ) + if err != nil { + return fmt.Errorf("error restarting consumer goroutines after cancel or close: %w", err) } - }() + } - return consumer, nil + return nil } // Close cleans up resources and closes the consumer. diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 7c68733..3912b18 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -22,11 +22,6 @@ func main() { consumer, err := rabbitmq.NewConsumer( conn, - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, "my_queue", rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), rabbitmq.WithConsumerOptionsExchangeName("events"), @@ -35,22 +30,29 @@ func main() { if err != nil { log.Fatal(err) } - defer consumer.Close() - // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { + fmt.Println("awaiting signal") sig := <-sigs + fmt.Println() fmt.Println(sig) - done <- true + fmt.Println("stopping consumer") + + consumer.Close() }() - fmt.Println("awaiting signal") - <-done - fmt.Println("stopping consumer") + // block main thread - wait for shutdown signal + err = consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }) + if err != nil { + log.Fatal(err) + } } diff --git a/examples/multiconsumer/main.go b/examples/multiconsumer/main.go index 42abf66..90a4cc6 100644 --- a/examples/multiconsumer/main.go +++ b/examples/multiconsumer/main.go @@ -5,6 +5,7 @@ import ( "log" "os" "os/signal" + "sync" "syscall" rabbitmq "github.com/wagslane/go-rabbitmq" @@ -22,11 +23,6 @@ func main() { consumer, err := rabbitmq.NewConsumer( conn, - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, "my_queue", rabbitmq.WithConsumerOptionsConcurrency(2), rabbitmq.WithConsumerOptionsConsumerName("consumer_1"), @@ -38,15 +34,9 @@ func main() { if err != nil { log.Fatal(err) } - defer consumer.Close() consumer2, err := rabbitmq.NewConsumer( conn, - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed 2: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, "my_queue", rabbitmq.WithConsumerOptionsConcurrency(2), rabbitmq.WithConsumerOptionsConsumerName("consumer_2"), @@ -56,22 +46,57 @@ func main() { if err != nil { log.Fatal(err) } - defer consumer2.Close() - // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) + errs := make(chan error, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { - sig := <-sigs - fmt.Println() - fmt.Println(sig) - done <- true + fmt.Println("awaiting signal") + select { + case sig := <-sigs: + fmt.Println() + fmt.Println(sig) + case err := <-errs: + log.Print(err) + } + + fmt.Println("stopping consumers") + + consumer.Close() + consumer2.Close() + }() + + var wg sync.WaitGroup + + wg.Add(2) + + go func() { + defer wg.Done() + + err := consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }) + if err != nil { + errs <- err + } + }() + + go func() { + defer wg.Done() + + err := consumer2.Run(func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }) + if err != nil { + errs <- err + } }() - fmt.Println("awaiting signal") - <-done - fmt.Println("stopping consumer") + wg.Wait() }