diff --git a/channel.go b/channel.go index d63b1c7..65a3a90 100644 --- a/channel.go +++ b/channel.go @@ -13,12 +13,13 @@ type channelManager struct { url string channel *amqp.Channel connection *amqp.Connection + config amqp.Config channelMux *sync.RWMutex notifyCancelOrClose chan error } -func newChannelManager(url string, log Logger) (*channelManager, error) { - conn, ch, err := getNewChannel(url) +func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManager, error) { + conn, ch, err := getNewChannel(url, conf) if err != nil { return nil, err } @@ -35,8 +36,8 @@ func newChannelManager(url string, log Logger) (*channelManager, error) { return &chManager, nil } -func getNewChannel(url string) (*amqp.Connection, *amqp.Channel, error) { - amqpConn, err := amqp.Dial(url) +func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channel, error) { + amqpConn, err := amqp.DialConfig(url, conf) if err != nil { return nil, nil, err } @@ -106,7 +107,7 @@ func (chManager *channelManager) reconnectWithBackoff() { func (chManager *channelManager) reconnect() error { chManager.channelMux.Lock() defer chManager.channelMux.Unlock() - newConn, newChannel, err := getNewChannel(chManager.url) + newConn, newChannel, err := getNewChannel(chManager.url, chManager.config) if err != nil { return err } diff --git a/consume.go b/consume.go index 393fa24..4f81e71 100644 --- a/consume.go +++ b/consume.go @@ -29,7 +29,7 @@ type Delivery struct { } // NewConsumer returns a new Consumer connected to the given rabbitmq server -func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { +func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) { options := &ConsumerOptions{} for _, optionFunc := range optionFuncs { optionFunc(options) @@ -38,7 +38,7 @@ func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, e options.Logger = &noLogger{} // default no logging } - chManager, err := newChannelManager(url, options.Logger) + chManager, err := newChannelManager(url, config, options.Logger) if err != nil { return Consumer{}, err } diff --git a/examples/consumer/main.go b/examples/consumer/main.go index 93c906d..bcd7b2c 100644 --- a/examples/consumer/main.go +++ b/examples/consumer/main.go @@ -3,12 +3,13 @@ package main import ( "log" + "github.com/streadway/amqp" rabbitmq "github.com/wagslane/go-rabbitmq" ) func main() { consumer, err := rabbitmq.NewConsumer( - "amqp://guest:guest@localhost", + "amqp://guest:guest@localhost", amqp.Config{}, rabbitmq.WithConsumerOptionsLogging, ) if err != nil { diff --git a/examples/logger/main.go b/examples/logger/main.go index 14d3855..1844112 100644 --- a/examples/logger/main.go +++ b/examples/logger/main.go @@ -3,6 +3,7 @@ package main import ( "log" + "github.com/streadway/amqp" rabbitmq "github.com/wagslane/go-rabbitmq" ) @@ -18,7 +19,7 @@ func main() { mylogger := &customLogger{} publisher, returns, err := rabbitmq.NewPublisher( - "amqp://guest:guest@localhost", + "amqp://guest:guest@localhost", amqp.Config{}, rabbitmq.WithPublisherOptionsLogger(mylogger), ) if err != nil { diff --git a/examples/publisher/main.go b/examples/publisher/main.go index def3b27..7ac33b0 100644 --- a/examples/publisher/main.go +++ b/examples/publisher/main.go @@ -3,12 +3,13 @@ package main import ( "log" + "github.com/streadway/amqp" rabbitmq "github.com/wagslane/go-rabbitmq" ) func main() { publisher, returns, err := rabbitmq.NewPublisher( - "amqp://guest:guest@localhost", + "amqp://guest:guest@localhost", amqp.Config{}, rabbitmq.WithPublisherOptionsLogging, ) if err != nil { diff --git a/publish.go b/publish.go index 1d71fa8..0bfa347 100644 --- a/publish.go +++ b/publish.go @@ -134,7 +134,7 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) { // on the channel of Returns that you should setup a listener on. // Flow controls are automatically handled as they are sent from the server, and publishing // will fail with an error when the server is requesting a slowdown -func NewPublisher(url string, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error) { +func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error) { options := &PublisherOptions{} for _, optionFunc := range optionFuncs { optionFunc(options) @@ -143,7 +143,7 @@ func NewPublisher(url string, optionFuncs ...func(*PublisherOptions)) (Publisher options.Logger = &noLogger{} // default no logging } - chManager, err := newChannelManager(url, options.Logger) + chManager, err := newChannelManager(url, config, options.Logger) if err != nil { return Publisher{}, nil, err } @@ -217,6 +217,13 @@ func (publisher *Publisher) Publish( return nil } +// StopPublishing stops the publishing of messages. +// The publisher should be discarded as it's not safe for re-use +func (publisher Publisher) StopPublishing() { + publisher.chManager.channel.Close() + publisher.chManager.connection.Close() +} + func (publisher *Publisher) startNotifyFlowHandler() { for ok := range publisher.notifyFlowChan { publisher.disablePublishDueToFlowMux.Lock()