Browse Source

lowercase f

pull/80/head
wagslane 4 years ago
parent
commit
751df3fe63
8 changed files with 46 additions and 52 deletions
  1. +7
    -6
      .github/workflows/tests.yml
  2. +0
    -7
      Makefile
  3. +7
    -7
      channel.go
  4. +8
    -8
      consume.go
  5. +6
    -6
      examples/logger/main.go
  6. +12
    -12
      logger.go
  7. +2
    -2
      publish.go
  8. +4
    -4
      publish_flow_block.go

+ 7
- 6
.github/workflows/tests.yml View File

@ -9,21 +9,22 @@ jobs:
name: Test name: Test
runs-on: ubuntu-latest runs-on: ubuntu-latest
env: env:
GOFLAGS: -mod=vendor
GOPROXY: "off"
GOPROXY: "https://proxy.golang.org,direct"
steps: steps:
- name: Set up Go 1.16
- name: Set up Go 1.18
uses: actions/setup-go@v2 uses: actions/setup-go@v2
with: with:
go-version: 1.16
go-version: 1.18
id: go id: go
- name: Check out code into the Go module directory - name: Check out code into the Go module directory
uses: actions/checkout@v1 uses: actions/checkout@v1
- name: install staticcheck
run: |
cd /tmp && go install honnef.co/go/tools/cmd/staticcheck@latest
- name: Make tests - name: Make tests
run: | run: |
make install-lint
make install-staticcheck
make make

+ 0
- 7
Makefile View File

@ -6,15 +6,8 @@ test:
vet: vet:
go vet ./... go vet ./...
install-lint:
GO111MODULE=off go get -u golang.org/x/lint/golint
GO111MODULE=off go list -f {{.Target}} golang.org/x/lint/golint
lint: lint:
go list ./... | grep -v /vendor/ | xargs -L1 golint -set_exit_status go list ./... | grep -v /vendor/ | xargs -L1 golint -set_exit_status
install-staticcheck:
cd /tmp && GOPROXY="" go get honnef.co/go/tools/cmd/staticcheck
staticcheck: staticcheck:
staticcheck ./... staticcheck ./...

+ 7
- 7
channel.go View File

@ -62,18 +62,18 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
select { select {
case err := <-notifyCloseChan: case err := <-notifyCloseChan:
if err != nil { if err != nil {
chManager.logger.ErrorF("attempting to reconnect to amqp server after close with error: %v", err)
chManager.logger.Errorf("attempting to reconnect to amqp server after close with error: %v", err)
chManager.reconnectLoop() chManager.reconnectLoop()
chManager.logger.WarnF("successfully reconnected to amqp server")
chManager.logger.Warnf("successfully reconnected to amqp server")
chManager.notifyCancelOrClose <- err chManager.notifyCancelOrClose <- err
} }
if err == nil { if err == nil {
chManager.logger.InfoF("amqp channel closed gracefully")
chManager.logger.Infof("amqp channel closed gracefully")
} }
case err := <-notifyCancelChan: case err := <-notifyCancelChan:
chManager.logger.ErrorF("attempting to reconnect to amqp server after cancel with error: %s", err)
chManager.logger.Errorf("attempting to reconnect to amqp server after cancel with error: %s", err)
chManager.reconnectLoop() chManager.reconnectLoop()
chManager.logger.WarnF("successfully reconnected to amqp server after cancel")
chManager.logger.Warnf("successfully reconnected to amqp server after cancel")
chManager.notifyCancelOrClose <- errors.New(err) chManager.notifyCancelOrClose <- errors.New(err)
} }
} }
@ -81,11 +81,11 @@ func (chManager *channelManager) startNotifyCancelOrClosed() {
// reconnectLoop continuously attempts to reconnect // reconnectLoop continuously attempts to reconnect
func (chManager *channelManager) reconnectLoop() { func (chManager *channelManager) reconnectLoop() {
for { for {
chManager.logger.InfoF("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval)
chManager.logger.Infof("waiting %s seconds to attempt to reconnect to amqp server", chManager.reconnectInterval)
time.Sleep(chManager.reconnectInterval) time.Sleep(chManager.reconnectInterval)
err := chManager.reconnect() err := chManager.reconnect()
if err != nil { if err != nil {
chManager.logger.ErrorF("error reconnecting to amqp server: %v", err)
chManager.logger.Errorf("error reconnecting to amqp server: %v", err)
} else { } else {
chManager.reconnectionCount++ chManager.reconnectionCount++
go chManager.startNotifyCancelOrClosed() go chManager.startNotifyCancelOrClosed()


+ 8
- 8
consume.go View File

@ -112,7 +112,7 @@ func (consumer Consumer) StartConsuming(
go func() { go func() {
for err := range consumer.chManager.notifyCancelOrClose { for err := range consumer.chManager.notifyCancelOrClose {
consumer.logger.InfoF("successful recovery from: %v", err)
consumer.logger.Infof("successful recovery from: %v", err)
err = consumer.startGoroutines( err = consumer.startGoroutines(
handler, handler,
queue, queue,
@ -120,7 +120,7 @@ func (consumer Consumer) StartConsuming(
*options, *options,
) )
if err != nil { if err != nil {
consumer.logger.ErrorF("error restarting consumer goroutines after cancel or close: %v", err)
consumer.logger.Errorf("error restarting consumer goroutines after cancel or close: %v", err)
} }
} }
}() }()
@ -130,7 +130,7 @@ func (consumer Consumer) StartConsuming(
// Close cleans up resources and closes the consumer. // Close cleans up resources and closes the consumer.
// The consumer is not safe for reuse // The consumer is not safe for reuse
func (consumer Consumer) Close() error { func (consumer Consumer) Close() error {
consumer.chManager.logger.InfoF("closing consumer...")
consumer.chManager.logger.Infof("closing consumer...")
return consumer.chManager.close() return consumer.chManager.close()
} }
@ -218,7 +218,7 @@ func (consumer Consumer) startGoroutines(
for i := 0; i < consumeOptions.Concurrency; i++ { for i := 0; i < consumeOptions.Concurrency; i++ {
go handlerGoroutine(consumer, msgs, consumeOptions, handler) go handlerGoroutine(consumer, msgs, consumeOptions, handler)
} }
consumer.logger.InfoF("Processing messages on %v goroutines", consumeOptions.Concurrency)
consumer.logger.Infof("Processing messages on %v goroutines", consumeOptions.Concurrency)
return nil return nil
} }
@ -232,19 +232,19 @@ func handlerGoroutine(consumer Consumer, msgs <-chan amqp.Delivery, consumeOptio
case Ack: case Ack:
err := msg.Ack(false) err := msg.Ack(false)
if err != nil { if err != nil {
consumer.logger.ErrorF("can't ack message: %v", err)
consumer.logger.Errorf("can't ack message: %v", err)
} }
case NackDiscard: case NackDiscard:
err := msg.Nack(false, false) err := msg.Nack(false, false)
if err != nil { if err != nil {
consumer.logger.ErrorF("can't nack message: %v", err)
consumer.logger.Errorf("can't nack message: %v", err)
} }
case NackRequeue: case NackRequeue:
err := msg.Nack(false, true) err := msg.Nack(false, true)
if err != nil { if err != nil {
consumer.logger.ErrorF("can't nack message: %v", err)
consumer.logger.Errorf("can't nack message: %v", err)
} }
} }
} }
consumer.logger.InfoF("rabbit consumer goroutine closed")
consumer.logger.Infof("rabbit consumer goroutine closed")
} }

+ 6
- 6
examples/logger/main.go View File

@ -10,24 +10,24 @@ import (
// that only logs ERROR and FATAL log levels // that only logs ERROR and FATAL log levels
type errorLogger struct{} type errorLogger struct{}
func (l errorLogger) FatalF(format string, v ...interface{}) {
func (l errorLogger) Fatalf(format string, v ...interface{}) {
log.Printf("mylogger: "+format, v...) log.Printf("mylogger: "+format, v...)
} }
func (l errorLogger) ErrorF(format string, v ...interface{}) {
func (l errorLogger) Errorf(format string, v ...interface{}) {
log.Printf("mylogger: "+format, v...) log.Printf("mylogger: "+format, v...)
} }
func (l errorLogger) WarnF(format string, v ...interface{}) {
func (l errorLogger) Warnf(format string, v ...interface{}) {
} }
func (l errorLogger) InfoF(format string, v ...interface{}) {
func (l errorLogger) Infof(format string, v ...interface{}) {
} }
func (l errorLogger) DebugF(format string, v ...interface{}) {
func (l errorLogger) Debugf(format string, v ...interface{}) {
} }
func (l errorLogger) TraceF(format string, v ...interface{}) {}
func (l errorLogger) Tracef(format string, v ...interface{}) {}
func main() { func main() {
mylogger := &errorLogger{} mylogger := &errorLogger{}


+ 12
- 12
logger.go View File

@ -8,12 +8,12 @@ import (
// Logger is the interface to send logs to. It can be set using // Logger is the interface to send logs to. It can be set using
// WithPublisherOptionsLogger() or WithConsumerOptionsLogger(). // WithPublisherOptionsLogger() or WithConsumerOptionsLogger().
type Logger interface { type Logger interface {
FatalF(string, ...interface{})
ErrorF(string, ...interface{})
WarnF(string, ...interface{})
InfoF(string, ...interface{})
DebugF(string, ...interface{})
TraceF(string, ...interface{})
Fatalf(string, ...interface{})
Errorf(string, ...interface{})
Warnf(string, ...interface{})
Infof(string, ...interface{})
Debugf(string, ...interface{})
Tracef(string, ...interface{})
} }
const loggingPrefix = "gorabbit" const loggingPrefix = "gorabbit"
@ -21,24 +21,24 @@ const loggingPrefix = "gorabbit"
// stdDebugLogger logs to stdout up to the `DebugF` level // stdDebugLogger logs to stdout up to the `DebugF` level
type stdDebugLogger struct{} type stdDebugLogger struct{}
func (l stdDebugLogger) FatalF(format string, v ...interface{}) {
func (l stdDebugLogger) Fatalf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...) log.Printf(fmt.Sprintf("%s FATAL: %s", loggingPrefix, format), v...)
} }
func (l stdDebugLogger) ErrorF(format string, v ...interface{}) {
func (l stdDebugLogger) Errorf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...) log.Printf(fmt.Sprintf("%s ERROR: %s", loggingPrefix, format), v...)
} }
func (l stdDebugLogger) WarnF(format string, v ...interface{}) {
func (l stdDebugLogger) Warnf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...) log.Printf(fmt.Sprintf("%s WARN: %s", loggingPrefix, format), v...)
} }
func (l stdDebugLogger) InfoF(format string, v ...interface{}) {
func (l stdDebugLogger) Infof(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...) log.Printf(fmt.Sprintf("%s INFO: %s", loggingPrefix, format), v...)
} }
func (l stdDebugLogger) DebugF(format string, v ...interface{}) {
func (l stdDebugLogger) Debugf(format string, v ...interface{}) {
log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...) log.Printf(fmt.Sprintf("%s DEBUG: %s", loggingPrefix, format), v...)
} }
func (l stdDebugLogger) TraceF(format string, v ...interface{}) {}
func (l stdDebugLogger) Tracef(format string, v ...interface{}) {}

+ 2
- 2
publish.go View File

@ -122,7 +122,7 @@ func NewPublisher(url string, config Config, optionFuncs ...func(*PublisherOptio
func (publisher *Publisher) handleRestarts() { func (publisher *Publisher) handleRestarts() {
for err := range publisher.chManager.notifyCancelOrClose { for err := range publisher.chManager.notifyCancelOrClose {
publisher.options.Logger.InfoF("successful publisher recovery from: %v", err)
publisher.options.Logger.Infof("successful publisher recovery from: %v", err)
go publisher.startNotifyFlowHandler() go publisher.startNotifyFlowHandler()
go publisher.startNotifyBlockedHandler() go publisher.startNotifyBlockedHandler()
if publisher.notifyReturnChan != nil { if publisher.notifyReturnChan != nil {
@ -210,7 +210,7 @@ func (publisher *Publisher) Publish(
// Close closes the publisher and releases resources // Close closes the publisher and releases resources
// The publisher should be discarded as it's not safe for re-use // The publisher should be discarded as it's not safe for re-use
func (publisher Publisher) Close() error { func (publisher Publisher) Close() error {
publisher.chManager.logger.InfoF("closing publisher...")
publisher.chManager.logger.Infof("closing publisher...")
return publisher.chManager.close() return publisher.chManager.close()
} }


+ 4
- 4
publish_flow_block.go View File

@ -13,11 +13,11 @@ func (publisher *Publisher) startNotifyFlowHandler() {
for ok := range notifyFlowChan { for ok := range notifyFlowChan {
publisher.disablePublishDueToFlowMux.Lock() publisher.disablePublishDueToFlowMux.Lock()
if ok { if ok {
publisher.options.Logger.WarnF("pausing publishing due to flow request from server")
publisher.options.Logger.Warnf("pausing publishing due to flow request from server")
publisher.disablePublishDueToFlow = true publisher.disablePublishDueToFlow = true
} else { } else {
publisher.disablePublishDueToFlow = false publisher.disablePublishDueToFlow = false
publisher.options.Logger.WarnF("resuming publishing due to flow request from server")
publisher.options.Logger.Warnf("resuming publishing due to flow request from server")
} }
publisher.disablePublishDueToFlowMux.Unlock() publisher.disablePublishDueToFlowMux.Unlock()
} }
@ -32,11 +32,11 @@ func (publisher *Publisher) startNotifyBlockedHandler() {
for b := range blockings { for b := range blockings {
publisher.disablePublishDueToBlockedMux.Lock() publisher.disablePublishDueToBlockedMux.Lock()
if b.Active { if b.Active {
publisher.options.Logger.WarnF("pausing publishing due to TCP blocking from server")
publisher.options.Logger.Warnf("pausing publishing due to TCP blocking from server")
publisher.disablePublishDueToBlocked = true publisher.disablePublishDueToBlocked = true
} else { } else {
publisher.disablePublishDueToBlocked = false publisher.disablePublishDueToBlocked = false
publisher.options.Logger.WarnF("resuming publishing due to TCP blocking from server")
publisher.options.Logger.Warnf("resuming publishing due to TCP blocking from server")
} }
publisher.disablePublishDueToBlockedMux.Unlock() publisher.disablePublishDueToBlockedMux.Unlock()
} }


Loading…
Cancel
Save