diff --git a/README.md b/README.md index 00e9ce9..9f9f303 100644 --- a/README.md +++ b/README.md @@ -44,11 +44,6 @@ defer conn.Close() consumer, err := rabbitmq.NewConsumer( conn, - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, "my_queue", rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), rabbitmq.WithConsumerOptionsExchangeName("events"), @@ -58,6 +53,15 @@ if err != nil { log.Fatal(err) } defer consumer.Close() + +err = consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack +}) +if err != nil { + log.Fatal(err) +} ``` ## 🚀 Quick Start Publisher diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 7c68733..3912b18 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -22,11 +22,6 @@ func main() { consumer, err := rabbitmq.NewConsumer( conn, - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, "my_queue", rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"), rabbitmq.WithConsumerOptionsExchangeName("events"), @@ -35,22 +30,29 @@ func main() { if err != nil { log.Fatal(err) } - defer consumer.Close() - // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { + fmt.Println("awaiting signal") sig := <-sigs + fmt.Println() fmt.Println(sig) - done <- true + fmt.Println("stopping consumer") + + consumer.Close() }() - fmt.Println("awaiting signal") - <-done - fmt.Println("stopping consumer") + // block main thread - wait for shutdown signal + err = consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }) + if err != nil { + log.Fatal(err) + } } diff --git a/examples/multiconsumer/main.go b/examples/multiconsumer/main.go index 571af8b..8074330 100644 --- a/examples/multiconsumer/main.go +++ b/examples/multiconsumer/main.go @@ -5,6 +5,7 @@ import ( "log" "os" "os/signal" + "sync" "syscall" rabbitmq "github.com/wagslane/go-rabbitmq" @@ -22,11 +23,6 @@ func main() { consumer, err := rabbitmq.NewConsumer( conn, - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, "my_queue", rabbitmq.WithConsumerOptionsConcurrency(2), rabbitmq.WithConsumerOptionsConsumerName("consumer_1"), @@ -37,15 +33,9 @@ func main() { if err != nil { log.Fatal(err) } - defer consumer.Close() consumer2, err := rabbitmq.NewConsumer( conn, - func(d rabbitmq.Delivery) rabbitmq.Action { - log.Printf("consumed 2: %v", string(d.Body)) - // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue - return rabbitmq.Ack - }, "my_queue", rabbitmq.WithConsumerOptionsConcurrency(2), rabbitmq.WithConsumerOptionsConsumerName("consumer_2"), @@ -55,22 +45,57 @@ func main() { if err != nil { log.Fatal(err) } - defer consumer2.Close() - // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1) - done := make(chan bool, 1) + errs := make(chan error, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { - sig := <-sigs - fmt.Println() - fmt.Println(sig) - done <- true + fmt.Println("awaiting signal") + select { + case sig := <-sigs: + fmt.Println() + fmt.Println(sig) + case err := <-errs: + log.Print(err) + } + + fmt.Println("stopping consumers") + + consumer.Close() + consumer2.Close() + }() + + var wg sync.WaitGroup + + wg.Add(2) + + go func() { + defer wg.Done() + + err := consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }) + if err != nil { + errs <- err + } + }() + + go func() { + defer wg.Done() + + err := consumer2.Run(func(d rabbitmq.Delivery) rabbitmq.Action { + log.Printf("consumed: %v", string(d.Body)) + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + return rabbitmq.Ack + }) + if err != nil { + errs <- err + } }() - fmt.Println("awaiting signal") - <-done - fmt.Println("stopping consumer") + wg.Wait() }