Browse Source

Replace `log` to `logger.Logger` in `Dispatcher.Dispatch`

pull/155/head
Yaroslav 2 years ago
parent
commit
87de266d4a
4 changed files with 18 additions and 8 deletions
  1. +1
    -1
      internal/channelmanager/channel_manager.go
  2. +1
    -1
      internal/connectionmanager/connection_manager.go
  3. +5
    -3
      internal/dispatcher/dispatcher.go
  4. +11
    -3
      internal/dispatcher/dispatcher_test.go

+ 1
- 1
internal/channelmanager/channel_manager.go View File

@ -38,7 +38,7 @@ func NewChannelManager(connManager *connectionmanager.ConnectionManager, log log
reconnectInterval: reconnectInterval, reconnectInterval: reconnectInterval,
reconnectionCount: 0, reconnectionCount: 0,
reconnectionCountMux: &sync.Mutex{}, reconnectionCountMux: &sync.Mutex{},
dispatcher: dispatcher.NewDispatcher(),
dispatcher: dispatcher.NewDispatcher(log),
} }
go chanManager.startNotifyCancelOrClosed() go chanManager.startNotifyCancelOrClosed()
return &chanManager, nil return &chanManager, nil


+ 1
- 1
internal/connectionmanager/connection_manager.go View File

@ -37,7 +37,7 @@ func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, recon
ReconnectInterval: reconnectInterval, ReconnectInterval: reconnectInterval,
reconnectionCount: 0, reconnectionCount: 0,
reconnectionCountMux: &sync.Mutex{}, reconnectionCountMux: &sync.Mutex{},
dispatcher: dispatcher.NewDispatcher(),
dispatcher: dispatcher.NewDispatcher(log),
} }
go connManager.startNotifyClose() go connManager.startNotifyClose()
return &connManager, nil return &connManager, nil


+ 5
- 3
internal/dispatcher/dispatcher.go View File

@ -1,7 +1,7 @@
package dispatcher package dispatcher
import ( import (
"log"
"github.com/wagslane/go-rabbitmq/internal/logger"
"math" "math"
"math/rand" "math/rand"
"sync" "sync"
@ -12,6 +12,7 @@ import (
type Dispatcher struct { type Dispatcher struct {
subscribers map[int]dispatchSubscriber subscribers map[int]dispatchSubscriber
subscribersMux *sync.Mutex subscribersMux *sync.Mutex
logger logger.Logger
} }
type dispatchSubscriber struct { type dispatchSubscriber struct {
@ -20,10 +21,11 @@ type dispatchSubscriber struct {
} }
// NewDispatcher - // NewDispatcher -
func NewDispatcher() *Dispatcher {
func NewDispatcher(logger logger.Logger) *Dispatcher {
return &Dispatcher{ return &Dispatcher{
subscribers: make(map[int]dispatchSubscriber), subscribers: make(map[int]dispatchSubscriber),
subscribersMux: &sync.Mutex{}, subscribersMux: &sync.Mutex{},
logger: logger,
} }
} }
@ -34,7 +36,7 @@ func (d *Dispatcher) Dispatch(err error) error {
for _, subscriber := range d.subscribers { for _, subscriber := range d.subscribers {
select { select {
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
log.Println("Unexpected rabbitmq error: timeout in dispatch")
d.logger.Errorf("Unexpected rabbitmq error: timeout in dispatch")
case subscriber.notifyCancelOrCloseChan <- err: case subscriber.notifyCancelOrCloseChan <- err:
} }
} }


+ 11
- 3
internal/dispatcher/dispatcher_test.go View File

@ -5,8 +5,16 @@ import (
"time" "time"
) )
type lgr struct{}
func (l *lgr) Fatalf(string, ...interface{}) {}
func (l *lgr) Errorf(string, ...interface{}) {}
func (l *lgr) Warnf(string, ...interface{}) {}
func (l *lgr) Infof(string, ...interface{}) {}
func (l *lgr) Debugf(string, ...interface{}) {}
func TestNewDispatcher(t *testing.T) { func TestNewDispatcher(t *testing.T) {
d := NewDispatcher()
d := NewDispatcher(&lgr{})
if d.subscribers == nil { if d.subscribers == nil {
t.Error("Dispatcher subscribers is nil") t.Error("Dispatcher subscribers is nil")
} }
@ -16,7 +24,7 @@ func TestNewDispatcher(t *testing.T) {
} }
func TestAddSubscriber(t *testing.T) { func TestAddSubscriber(t *testing.T) {
d := NewDispatcher()
d := NewDispatcher(&lgr{})
d.AddSubscriber() d.AddSubscriber()
if len(d.subscribers) != 1 { if len(d.subscribers) != 1 {
t.Error("Dispatcher subscribers length is not 1") t.Error("Dispatcher subscribers length is not 1")
@ -24,7 +32,7 @@ func TestAddSubscriber(t *testing.T) {
} }
func TestCloseSubscriber(t *testing.T) { func TestCloseSubscriber(t *testing.T) {
d := NewDispatcher()
d := NewDispatcher(&lgr{})
_, closeCh := d.AddSubscriber() _, closeCh := d.AddSubscriber()
close(closeCh) close(closeCh)
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)


Loading…
Cancel
Save