Browse Source

Merge pull request #35 from wagslane/lw_notify

notify return safety
pull/44/head
Lane Wagner 4 years ago
committed by GitHub
parent
commit
10fb3e8bca
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 20 deletions
  1. +2
    -1
      examples/logger/main.go
  2. +2
    -1
      examples/publisher/main.go
  3. +27
    -18
      publish.go

+ 2
- 1
examples/logger/main.go View File

@ -18,7 +18,7 @@ func (c *customLogger) Printf(fmt string, args ...interface{}) {
func main() {
mylogger := &customLogger{}
publisher, returns, err := rabbitmq.NewPublisher(
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", amqp.Config{},
rabbitmq.WithPublisherOptionsLogger(mylogger),
)
@ -37,6 +37,7 @@ func main() {
log.Fatal(err)
}
returns := publisher.NotifyReturn()
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))


+ 2
- 1
examples/publisher/main.go View File

@ -8,7 +8,7 @@ import (
)
func main() {
publisher, returns, err := rabbitmq.NewPublisher(
publisher, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", amqp.Config{},
rabbitmq.WithPublisherOptionsLogging,
)
@ -27,6 +27,7 @@ func main() {
log.Fatal(err)
}
returns := publisher.NotifyReturn()
go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))


+ 27
- 18
publish.go View File

@ -134,7 +134,7 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
// on the channel of Returns that you should setup a listener on.
// Flow controls are automatically handled as they are sent from the server, and publishing
// will fail with an error when the server is requesting a slowdown
func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error) {
func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, error) {
options := &PublisherOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
@ -145,26 +145,39 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
chManager, err := newChannelManager(url, config, options.Logger)
if err != nil {
return Publisher{}, nil, err
return Publisher{}, err
}
publisher := Publisher{
chManager: chManager,
notifyReturnChan: make(chan Return, 1),
disablePublishDueToFlow: false,
disablePublishDueToFlowMux: &sync.RWMutex{},
logger: options.Logger,
notifyReturnChan: nil,
}
go publisher.startNotifyFlowHandler()
// restart notifiers when cancel/close is triggered
go func() {
publisher.startNotifyHandlers()
for err := range publisher.chManager.notifyCancelOrClose {
publisher.logger.Printf("publish cancel/close handler triggered. err: %v", err)
publisher.startNotifyHandlers()
go publisher.startNotifyFlowHandler()
if publisher.notifyReturnChan != nil {
go publisher.startNotifyReturnHandler()
}
}
}()
return publisher, publisher.notifyReturnChan, nil
return publisher, nil
}
// NotifyReturn registers a listener for basic.return methods.
// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags.
func (publisher *Publisher) NotifyReturn() <-chan Return {
publisher.notifyReturnChan = make(chan Return)
go publisher.startNotifyReturnHandler()
return publisher.notifyReturnChan
}
// Publish publishes the provided data to the given routing keys over the connection
@ -217,19 +230,8 @@ func (publisher Publisher) StopPublishing() {
publisher.chManager.connection.Close()
}
func (publisher *Publisher) startNotifyHandlers() {
returnAMQPChan := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1))
go func() {
for ret := range returnAMQPChan {
publisher.notifyReturnChan <- Return{ret}
}
}()
func (publisher *Publisher) startNotifyFlowHandler() {
notifyFlowChan := publisher.chManager.channel.NotifyFlow(make(chan bool))
go publisher.startNotifyFlowHandler(notifyFlowChan)
}
func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) {
publisher.disablePublishDueToFlowMux.Lock()
publisher.disablePublishDueToFlow = false
publisher.disablePublishDueToFlowMux.Unlock()
@ -248,3 +250,10 @@ func (publisher *Publisher) startNotifyFlowHandler(notifyFlowChan chan bool) {
publisher.disablePublishDueToFlowMux.Unlock()
}
}
func (publisher *Publisher) startNotifyReturnHandler() {
returnAMQPCh := publisher.chManager.channel.NotifyReturn(make(chan amqp.Return, 1))
for ret := range returnAMQPCh {
publisher.notifyReturnChan <- Return{ret}
}
}

Loading…
Cancel
Save