|
|
|
@ -5,6 +5,7 @@ import ( |
|
|
|
"log" |
|
|
|
"os" |
|
|
|
"os/signal" |
|
|
|
"sync" |
|
|
|
"syscall" |
|
|
|
|
|
|
|
rabbitmq "github.com/wagslane/go-rabbitmq" |
|
|
|
@ -22,11 +23,6 @@ func main() { |
|
|
|
|
|
|
|
consumer, err := rabbitmq.NewConsumer( |
|
|
|
conn, |
|
|
|
func(d rabbitmq.Delivery) rabbitmq.Action { |
|
|
|
log.Printf("consumed: %v", string(d.Body)) |
|
|
|
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
|
|
|
|
return rabbitmq.Ack |
|
|
|
}, |
|
|
|
"my_queue", |
|
|
|
rabbitmq.WithConsumerOptionsConcurrency(2), |
|
|
|
rabbitmq.WithConsumerOptionsConsumerName("consumer_1"), |
|
|
|
@ -37,15 +33,9 @@ func main() { |
|
|
|
if err != nil { |
|
|
|
log.Fatal(err) |
|
|
|
} |
|
|
|
defer consumer.Close() |
|
|
|
|
|
|
|
consumer2, err := rabbitmq.NewConsumer( |
|
|
|
conn, |
|
|
|
func(d rabbitmq.Delivery) rabbitmq.Action { |
|
|
|
log.Printf("consumed 2: %v", string(d.Body)) |
|
|
|
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
|
|
|
|
return rabbitmq.Ack |
|
|
|
}, |
|
|
|
"my_queue", |
|
|
|
rabbitmq.WithConsumerOptionsConcurrency(2), |
|
|
|
rabbitmq.WithConsumerOptionsConsumerName("consumer_2"), |
|
|
|
@ -55,22 +45,57 @@ func main() { |
|
|
|
if err != nil { |
|
|
|
log.Fatal(err) |
|
|
|
} |
|
|
|
defer consumer2.Close() |
|
|
|
|
|
|
|
// block main thread - wait for shutdown signal
|
|
|
|
sigs := make(chan os.Signal, 1) |
|
|
|
done := make(chan bool, 1) |
|
|
|
errs := make(chan error, 1) |
|
|
|
|
|
|
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) |
|
|
|
|
|
|
|
go func() { |
|
|
|
sig := <-sigs |
|
|
|
fmt.Println() |
|
|
|
fmt.Println(sig) |
|
|
|
done <- true |
|
|
|
fmt.Println("awaiting signal") |
|
|
|
select { |
|
|
|
case sig := <-sigs: |
|
|
|
fmt.Println() |
|
|
|
fmt.Println(sig) |
|
|
|
case err := <-errs: |
|
|
|
log.Print(err) |
|
|
|
} |
|
|
|
|
|
|
|
fmt.Println("stopping consumers") |
|
|
|
|
|
|
|
consumer.Close() |
|
|
|
consumer2.Close() |
|
|
|
}() |
|
|
|
|
|
|
|
var wg sync.WaitGroup |
|
|
|
|
|
|
|
wg.Add(2) |
|
|
|
|
|
|
|
go func() { |
|
|
|
defer wg.Done() |
|
|
|
|
|
|
|
err := consumer.Run(func(d rabbitmq.Delivery) rabbitmq.Action { |
|
|
|
log.Printf("consumed: %v", string(d.Body)) |
|
|
|
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
|
|
|
|
return rabbitmq.Ack |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
errs <- err |
|
|
|
} |
|
|
|
}() |
|
|
|
|
|
|
|
go func() { |
|
|
|
defer wg.Done() |
|
|
|
|
|
|
|
err := consumer2.Run(func(d rabbitmq.Delivery) rabbitmq.Action { |
|
|
|
log.Printf("consumed: %v", string(d.Body)) |
|
|
|
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
|
|
|
|
return rabbitmq.Ack |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
errs <- err |
|
|
|
} |
|
|
|
}() |
|
|
|
|
|
|
|
fmt.Println("awaiting signal") |
|
|
|
<-done |
|
|
|
fmt.Println("stopping consumer") |
|
|
|
wg.Wait() |
|
|
|
} |