Browse Source

Merge branch 'main' of https://github.com/wagslane/go-rabbitmq

pull/155/head v0.13.0
wagslane 2 years ago
parent
commit
325b04909f
4 changed files with 89 additions and 59 deletions
  1. +9
    -5
      README.md
  2. +20
    -21
      consume.go
  3. +14
    -12
      examples/consumer/main.go
  4. +46
    -21
      examples/multiconsumer/main.go

+ 9
- 5
README.md View File

@ -44,11 +44,6 @@ defer conn.Close()
consumer, err := rabbitmq.NewConsumer(
conn,
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue",
rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"),
rabbitmq.WithConsumerOptionsExchangeName("events"),
@ -58,6 +53,15 @@ if err != nil {
log.Fatal(err)
}
defer consumer.Close()
err = consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
})
if err != nil {
log.Fatal(err)
}
```
## 🚀 Quick Start Publisher


+ 20
- 21
consume.go View File

@ -44,12 +44,9 @@ type Delivery struct {
amqp.Delivery
}
// NewConsumer returns a new Consumer connected to the given rabbitmq server
// it also starts consuming on the given connection with automatic reconnection handling
// Do not reuse the returned consumer for anything other than to close it
// NewConsumer returns a new Consumer connected to the given rabbitmq server.
func NewConsumer(
conn *Conn,
handler Handler,
queue string,
optionFuncs ...func(*ConsumerOptions),
) (*Consumer, error) {
@ -78,30 +75,32 @@ func NewConsumer(
isClosed: false,
}
err = consumer.startGoroutines(
return consumer, nil
}
// Run starts consuming with automatic reconnection handling. Do not reuse the
// consumer for anything other than to close it.
func (consumer *Consumer) Run(handler Handler) error {
err := consumer.startGoroutines(
handler,
*options,
consumer.options,
)
if err != nil {
return nil, err
return err
}
go func() {
for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
err = consumer.startGoroutines(
handler,
*options,
)
if err != nil {
consumer.options.Logger.Fatalf("error restarting consumer goroutines after cancel or close: %v", err)
consumer.options.Logger.Fatalf("consumer closing, unable to recover")
return
}
for err := range consumer.reconnectErrCh {
consumer.options.Logger.Infof("successful consumer recovery from: %v", err)
err = consumer.startGoroutines(
handler,
consumer.options,
)
if err != nil {
return fmt.Errorf("error restarting consumer goroutines after cancel or close: %w", err)
}
}()
}
return consumer, nil
return nil
}
// Close cleans up resources and closes the consumer.


+ 14
- 12
examples/consumer/main.go View File

@ -22,11 +22,6 @@ func main() {
consumer, err := rabbitmq.NewConsumer(
conn,
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue",
rabbitmq.WithConsumerOptionsRoutingKey("my_routing_key"),
rabbitmq.WithConsumerOptionsExchangeName("events"),
@ -35,22 +30,29 @@ func main() {
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
fmt.Println("awaiting signal")
sig := <-sigs
fmt.Println()
fmt.Println(sig)
done <- true
fmt.Println("stopping consumer")
consumer.Close()
}()
fmt.Println("awaiting signal")
<-done
fmt.Println("stopping consumer")
// block main thread - wait for shutdown signal
err = consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
})
if err != nil {
log.Fatal(err)
}
}

+ 46
- 21
examples/multiconsumer/main.go View File

@ -5,6 +5,7 @@ import (
"log"
"os"
"os/signal"
"sync"
"syscall"
rabbitmq "github.com/wagslane/go-rabbitmq"
@ -22,11 +23,6 @@ func main() {
consumer, err := rabbitmq.NewConsumer(
conn,
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue",
rabbitmq.WithConsumerOptionsConcurrency(2),
rabbitmq.WithConsumerOptionsConsumerName("consumer_1"),
@ -38,15 +34,9 @@ func main() {
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
consumer2, err := rabbitmq.NewConsumer(
conn,
func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed 2: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
},
"my_queue",
rabbitmq.WithConsumerOptionsConcurrency(2),
rabbitmq.WithConsumerOptionsConsumerName("consumer_2"),
@ -56,22 +46,57 @@ func main() {
if err != nil {
log.Fatal(err)
}
defer consumer2.Close()
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
errs := make(chan error, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println()
fmt.Println(sig)
done <- true
fmt.Println("awaiting signal")
select {
case sig := <-sigs:
fmt.Println()
fmt.Println(sig)
case err := <-errs:
log.Print(err)
}
fmt.Println("stopping consumers")
consumer.Close()
consumer2.Close()
}()
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
err := consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
})
if err != nil {
errs <- err
}
}()
go func() {
defer wg.Done()
err := consumer2.Run(func(d rabbitmq.Delivery) rabbitmq.Action {
log.Printf("consumed: %v", string(d.Body))
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
return rabbitmq.Ack
})
if err != nil {
errs <- err
}
}()
fmt.Println("awaiting signal")
<-done
fmt.Println("stopping consumer")
wg.Wait()
}

Loading…
Cancel
Save