Browse Source

add connection options and stop publish

pull/20/head
Brian Mori 5 years ago
parent
commit
f2a2b169a1
6 changed files with 23 additions and 12 deletions
  1. +6
    -5
      channel.go
  2. +2
    -2
      consume.go
  3. +2
    -1
      examples/consumer/main.go
  4. +2
    -1
      examples/logger/main.go
  5. +2
    -1
      examples/publisher/main.go
  6. +9
    -2
      publish.go

+ 6
- 5
channel.go View File

@ -13,12 +13,13 @@ type channelManager struct {
url string
channel *amqp.Channel
connection *amqp.Connection
config amqp.Config
channelMux *sync.RWMutex
notifyCancelOrClose chan error
}
func newChannelManager(url string, log Logger) (*channelManager, error) {
conn, ch, err := getNewChannel(url)
func newChannelManager(url string, conf amqp.Config, log Logger) (*channelManager, error) {
conn, ch, err := getNewChannel(url, conf)
if err != nil {
return nil, err
}
@ -35,8 +36,8 @@ func newChannelManager(url string, log Logger) (*channelManager, error) {
return &chManager, nil
}
func getNewChannel(url string) (*amqp.Connection, *amqp.Channel, error) {
amqpConn, err := amqp.Dial(url)
func getNewChannel(url string, conf amqp.Config) (*amqp.Connection, *amqp.Channel, error) {
amqpConn, err := amqp.DialConfig(url, conf)
if err != nil {
return nil, nil, err
}
@ -106,7 +107,7 @@ func (chManager *channelManager) reconnectWithBackoff() {
func (chManager *channelManager) reconnect() error {
chManager.channelMux.Lock()
defer chManager.channelMux.Unlock()
newConn, newChannel, err := getNewChannel(chManager.url)
newConn, newChannel, err := getNewChannel(chManager.url, chManager.config)
if err != nil {
return err
}


+ 2
- 2
consume.go View File

@ -29,7 +29,7 @@ type Delivery struct {
}
// NewConsumer returns a new Consumer connected to the given rabbitmq server
func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
func NewConsumer(url string, config amqp.Config, optionFuncs ...func(*ConsumerOptions)) (Consumer, error) {
options := &ConsumerOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
@ -38,7 +38,7 @@ func NewConsumer(url string, optionFuncs ...func(*ConsumerOptions)) (Consumer, e
options.Logger = &noLogger{} // default no logging
}
chManager, err := newChannelManager(url, options.Logger)
chManager, err := newChannelManager(url, config, options.Logger)
if err != nil {
return Consumer{}, err
}


+ 2
- 1
examples/consumer/main.go View File

@ -3,12 +3,13 @@ package main
import (
"log"
"github.com/streadway/amqp"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
func main() {
consumer, err := rabbitmq.NewConsumer(
"amqp://guest:guest@localhost",
"amqp://guest:guest@localhost", amqp.Config{},
rabbitmq.WithConsumerOptionsLogging,
)
if err != nil {


+ 2
- 1
examples/logger/main.go View File

@ -3,6 +3,7 @@ package main
import (
"log"
"github.com/streadway/amqp"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
@ -18,7 +19,7 @@ func main() {
mylogger := &customLogger{}
publisher, returns, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost",
"amqp://guest:guest@localhost", amqp.Config{},
rabbitmq.WithPublisherOptionsLogger(mylogger),
)
if err != nil {


+ 2
- 1
examples/publisher/main.go View File

@ -3,12 +3,13 @@ package main
import (
"log"
"github.com/streadway/amqp"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
func main() {
publisher, returns, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost",
"amqp://guest:guest@localhost", amqp.Config{},
rabbitmq.WithPublisherOptionsLogging,
)
if err != nil {


+ 9
- 2
publish.go View File

@ -134,7 +134,7 @@ func WithPublisherOptionsLogger(log Logger) func(options *PublisherOptions) {
// on the channel of Returns that you should setup a listener on.
// Flow controls are automatically handled as they are sent from the server, and publishing
// will fail with an error when the server is requesting a slowdown
func NewPublisher(url string, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error) {
func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*PublisherOptions)) (Publisher, <-chan Return, error) {
options := &PublisherOptions{}
for _, optionFunc := range optionFuncs {
optionFunc(options)
@ -143,7 +143,7 @@ func NewPublisher(url string, optionFuncs ...func(*PublisherOptions)) (Publisher
options.Logger = &noLogger{} // default no logging
}
chManager, err := newChannelManager(url, options.Logger)
chManager, err := newChannelManager(url, config, options.Logger)
if err != nil {
return Publisher{}, nil, err
}
@ -217,6 +217,13 @@ func (publisher *Publisher) Publish(
return nil
}
// StopPublishing stops the publishing of messages.
// The publisher should be discarded as it's not safe for re-use
func (publisher Publisher) StopPublishing() {
publisher.chManager.channel.Close()
publisher.chManager.connection.Close()
}
func (publisher *Publisher) startNotifyFlowHandler() {
for ok := range publisher.notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock()


Loading…
Cancel
Save