An AMQP tool.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

204 lines
7.8 KiB

//RabbitHunter (c) 2023-2024, RuhNet
//Author: Ruel Tmeizeh, RuhNet www.ruhnet.co
package main
import (
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
const appnameFull string = "RuhNet RabbitHunter AMQP Utility"
const appname string = "rabbithunter"
const version string = "0.4"
const serverVersion string = "RuhNet " + appname + " v" + version
const website string = "https://ruhnet.co"
var startupTime time.Time
var myHostname, myHostnameShort string
const banner = `
--⟶____ __ _ _ __
-⟶/ ___\ __ __/ /_ / \ / /__ __/ /_
⟶/ /_/ // /_/ / _ \/ / \/ //__\_ __/
/_/ \_\\ ___/_/ /_/_/ \__/ \__,/_/ %s
_____________________________________________________
`
var done = make(chan bool, 1)
var msgCatFilters []string
var msgNameFilters []string
var msgAppNameFilters []string
type AppConfig struct {
AmqpURI string `json:"amqp_uri" env:"AMQP_URI" default:"amqp://guest:guest@localhost:5672"`
AmqpSubExch string `json:"amqp_sub_exchange" env:"AMQP_SUB_EXCH" default:"callevt"`
AmqpPubExch string `json:"amqp_pub_exchange" env:"AMQP_PUB_EXCH" default:"pushes"`
AmqpExchType string `json:"amqp_exchange_type" env:"AMQP_EXCH_TYPE" default:"topic"`
AmqpSubRoutingKey string `json:"amqp_sub_routing_key" env:"AMQP_SUB_ROUTING_KEY" default:"call.*.*"`
AmqpPubRoutingKey string `json:"amqp_pub_routing_key" env:"AMQP_PUB_ROUTING_KEY" default:"notification.push.customapp.test"`
AmqpWorkers int `json:"amqp_workers" env:"AMQP_WORKERS" default:"2"`
LogFile string `json:"log_file" env:"LOG_FILE" default:"/tmp/rabbithunter.log"`
LogLevel int `json:"log_level" env:"LOG_LEVEL" default:"5"`
PubMessageFile string `json:"pub_message_file" env:"PUB_MSG_FILE" default:"./message.json"`
FilterEvtCat string `json:"filter_event_category" env:"FLT_EVT_CAT" default:"*"` //allow comma separated string for multiple
FilterEvtName string `json:"filter_event_name" env:"FLT_EVT_NAME" default:"*"` //allow comma separated string for multiple
FilterEvtAppName string `json:"filter_event_appname" env:"FLT_EVT_APPNAME" default:"*"` //allow comma separated string for multiple
}
func main() {
var err error
startupTime = time.Now()
/////////////////////////////////////////////
// Setup
initConfig()
//fmt.Println("Configuration OK, starting " + appname + "...")
logit(5, "Configuration OK, starting "+appname+"...")
/////////////////////////////////////////////
// Logging
appLogFile, err := os.OpenFile(appconf.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0664)
if err != nil {
log.Println("Could not open log file: " + appconf.LogFile + "\n" + err.Error())
appLogFile, err = os.OpenFile("/tmp/"+appname+".log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0664)
if err != nil {
log.Fatal("Can't open even /tmp log file!\n" + err.Error())
}
}
defer appLogFile.Close()
//set other logging to same file
log.SetOutput(appLogFile)
//Startup Banner
fmt.Printf(banner, website)
fmt.Println(serverVersion + "\n")
logit(5, appname+"v"+version+" starting up... ")
logit(5, fmt.Sprintf("Logging with level: %d", appconf.LogLevel))
//Hostname
myHostname, err = os.Hostname()
logit(5, "Detecting my hostname... "+myHostname)
if err != nil {
log.Fatal("Hostname could not be auto-detected from system: " + err.Error())
}
myHostnameShort = strings.SplitN(myHostname, ".", 2)[0]
//fmt.Println()
/////////////////////////////////////////////
// RabbitMQ Setup Connection
fmt.Println("Connecting to RabbitMQ: " + appconf.AmqpURI)
logit(6, "Connecting to RabbitMQ: "+appconf.AmqpURI)
amqpConn, err := rabbitmq.NewConn(
appconf.AmqpURI,
rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
fmt.Println("Unable to initialize RabbitMQ connection: " + err.Error())
log.Fatal("Unable to initialize RabbitMQ connection: " + err.Error())
}
defer amqpConn.Close()
/////////////////////////////////////////////
// RabbitMQ Setup Consumer
msgConsumer, err = rabbitmq.NewConsumer(
amqpConn,
handleAmqpMsg, //this is the function that we want to call to consume presence messages
"q_rabbithunter_"+myHostnameShort,
rabbitmq.WithConsumerOptionsExchangeName(appconf.AmqpSubExch),
rabbitmq.WithConsumerOptionsExchangeKind(appconf.AmqpExchType),
rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpSubRoutingKey),
rabbitmq.WithConsumerOptionsConsumerName("consumer_rabbithunter_"+myHostnameShort),
rabbitmq.WithConsumerOptionsQueueAutoDelete,
rabbitmq.WithConsumerOptionsConcurrency(appconf.AmqpWorkers),
//rabbitmq.WithConsumerOptionsQuorum,
//rabbitmq.WithConsumerOptionsQueueDurable,
//rabbitmq.WithConsumerOptionsExchangeDeclare,
//rabbitmq.WithConsumerOptionsBindingExchangeDurable,
)
if err != nil {
fmt.Println("Unable to initialize RabbitMQ consumer: " + err.Error())
log.Fatal("Unable to initialize RabbitMQ consumer: " + err.Error())
}
defer msgConsumer.Close()
logit(6, "Consuming on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpSubExch+"' with routing key: '"+appconf.AmqpSubRoutingKey+"' using queue: 'consumer_rabbithunter_"+myHostnameShort+"'.")
/////////////////////////////////////////////
// RabbitMQ Setup Publisher
publisher, err = rabbitmq.NewPublisher(
amqpConn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeKind(appconf.AmqpExchType),
rabbitmq.WithPublisherOptionsExchangeName(appconf.AmqpPubExch),
//rabbitmq.WithPublisherOptionsExchangeDeclare,
)
if err != nil {
fmt.Println("Unable to initialize RabbitMQ publisher: " + err.Error())
log.Fatal("Unable to initialize RabbitMQ publisher: " + err.Error())
}
defer publisher.Close()
logit(7, "Publisher configured on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpPubExch+"' with routing key: '"+appconf.AmqpPubRoutingKey+"'.")
publisher.NotifyReturn(func(r rabbitmq.Return) {
logit(4, fmt.Sprintf("RabbitMQ published message returned from server: %s\n", string(r.Body)))
})
publisher.NotifyPublish(func(c rabbitmq.Confirmation) {
logit(7, fmt.Sprintf("Message confirmed from RabbitMQ server. tag: %v, ack: %v\n", c.DeliveryTag, c.Ack))
})
fmt.Println("Publish Exchange: [" + appconf.AmqpExchType + "] '" + appconf.AmqpPubExch + "'\nPublish Routing Key: '" + appconf.AmqpPubRoutingKey + "'")
fmt.Println("Consume Exchange: [" + appconf.AmqpExchType + "] '" + appconf.AmqpSubExch + "'\nConsume Routing Key: '" + appconf.AmqpSubRoutingKey + "'\nConsume Queue: 'consumer_rabbithunter_" + myHostnameShort + "'")
/////////////////////////////////////////////
// Message Filters
msgCatFilters = strings.Split(appconf.FilterEvtCat, ",")
msgNameFilters = strings.Split(appconf.FilterEvtName, ",")
msgAppNameFilters = strings.Split(appconf.FilterEvtAppName, ",")
fmt.Println("Filtering event app names matching:", msgAppNameFilters)
fmt.Println("Filtering event categories matching:", msgCatFilters)
fmt.Println("Filtering event names matching:", msgNameFilters)
log.Println("Filtering event app names matching:", msgAppNameFilters)
log.Println("Filtering event categories matching:", msgCatFilters)
log.Println("Filtering event names matching:", msgNameFilters)
/////////////////////////////////////////////
logit(6, "Using "+fmt.Sprintf("%d", appconf.AmqpWorkers)+" concurrent workers to process AMQP messages.")
log.Println(appnameFull + " system started.\n-----> [READY]")
fmt.Println(appnameFull + " system started.\n-----> [READY]")
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Printf("Received signal: ")
fmt.Println(sig)
done <- true
}()
publishHandler() //this func will block the main thread, until receiving on the 'done' channel.
}
func logit(level int, message string) {
if appconf.LogLevel >= int(level) {
log.Println(message)
}
if appconf.LogLevel >= 8 {
fmt.Println(message)
}
}