|
|
//RabbitHunter (c) 2023-2024, RuhNet
|
|
|
//Author: Ruel Tmeizeh, RuhNet www.ruhnet.co
|
|
|
|
|
|
package main
|
|
|
|
|
|
import (
|
|
|
"fmt"
|
|
|
"log"
|
|
|
"os"
|
|
|
"os/signal"
|
|
|
"strings"
|
|
|
"syscall"
|
|
|
"time"
|
|
|
|
|
|
rabbitmq "github.com/wagslane/go-rabbitmq"
|
|
|
)
|
|
|
|
|
|
const appnameFull string = "RuhNet RabbitHunter AMQP Utility"
|
|
|
const appname string = "rabbithunter"
|
|
|
const version string = "0.4"
|
|
|
const serverVersion string = "RuhNet " + appname + " v" + version
|
|
|
const website string = "https://ruhnet.co"
|
|
|
|
|
|
var startupTime time.Time
|
|
|
var myHostname, myHostnameShort string
|
|
|
|
|
|
const banner = `
|
|
|
--⟶____ __ _ _ __
|
|
|
-⟶/ ___\ __ __/ /_ / \ / /__ __/ /_
|
|
|
⟶/ /_/ // /_/ / _ \/ / \/ //__\_ __/
|
|
|
/_/ \_\\ ___/_/ /_/_/ \__/ \__,/_/ %s
|
|
|
_____________________________________________________
|
|
|
`
|
|
|
|
|
|
var done = make(chan bool, 1)
|
|
|
|
|
|
var msgCatFilters []string
|
|
|
var msgNameFilters []string
|
|
|
var msgAppNameFilters []string
|
|
|
|
|
|
type AppConfig struct {
|
|
|
AmqpURI string `json:"amqp_uri" env:"AMQP_URI" default:"amqp://guest:guest@localhost:5672"`
|
|
|
AmqpSubExch string `json:"amqp_sub_exchange" env:"AMQP_SUB_EXCH" default:"callevt"`
|
|
|
AmqpPubExch string `json:"amqp_pub_exchange" env:"AMQP_PUB_EXCH" default:"pushes"`
|
|
|
AmqpExchType string `json:"amqp_exchange_type" env:"AMQP_EXCH_TYPE" default:"topic"`
|
|
|
AmqpSubRoutingKey string `json:"amqp_sub_routing_key" env:"AMQP_SUB_ROUTING_KEY" default:"call.*.*"`
|
|
|
AmqpPubRoutingKey string `json:"amqp_pub_routing_key" env:"AMQP_PUB_ROUTING_KEY" default:"notification.push.customapp.test"`
|
|
|
AmqpWorkers int `json:"amqp_workers" env:"AMQP_WORKERS" default:"2"`
|
|
|
LogFile string `json:"log_file" env:"LOG_FILE" default:"/tmp/rabbithunter.log"`
|
|
|
LogLevel int `json:"log_level" env:"LOG_LEVEL" default:"5"`
|
|
|
PubMessageFile string `json:"pub_message_file" env:"PUB_MSG_FILE" default:"./message.json"`
|
|
|
FilterEvtCat string `json:"filter_event_category" env:"FLT_EVT_CAT" default:"*"` //allow comma separated string for multiple
|
|
|
FilterEvtName string `json:"filter_event_name" env:"FLT_EVT_NAME" default:"*"` //allow comma separated string for multiple
|
|
|
FilterEvtAppName string `json:"filter_event_appname" env:"FLT_EVT_APPNAME" default:"*"` //allow comma separated string for multiple
|
|
|
}
|
|
|
|
|
|
func main() {
|
|
|
var err error
|
|
|
startupTime = time.Now()
|
|
|
|
|
|
/////////////////////////////////////////////
|
|
|
// Setup
|
|
|
initConfig()
|
|
|
|
|
|
//fmt.Println("Configuration OK, starting " + appname + "...")
|
|
|
logit(5, "Configuration OK, starting "+appname+"...")
|
|
|
|
|
|
/////////////////////////////////////////////
|
|
|
// Logging
|
|
|
appLogFile, err := os.OpenFile(appconf.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0664)
|
|
|
if err != nil {
|
|
|
log.Println("Could not open log file: " + appconf.LogFile + "\n" + err.Error())
|
|
|
appLogFile, err = os.OpenFile("/tmp/"+appname+".log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0664)
|
|
|
if err != nil {
|
|
|
log.Fatal("Can't open even /tmp log file!\n" + err.Error())
|
|
|
}
|
|
|
}
|
|
|
defer appLogFile.Close()
|
|
|
//set other logging to same file
|
|
|
log.SetOutput(appLogFile)
|
|
|
|
|
|
//Startup Banner
|
|
|
fmt.Printf(banner, website)
|
|
|
fmt.Println(serverVersion + "\n")
|
|
|
|
|
|
logit(5, appname+"v"+version+" starting up... ")
|
|
|
logit(5, fmt.Sprintf("Logging with level: %d", appconf.LogLevel))
|
|
|
|
|
|
//Hostname
|
|
|
myHostname, err = os.Hostname()
|
|
|
logit(5, "Detecting my hostname... "+myHostname)
|
|
|
if err != nil {
|
|
|
log.Fatal("Hostname could not be auto-detected from system: " + err.Error())
|
|
|
}
|
|
|
myHostnameShort = strings.SplitN(myHostname, ".", 2)[0]
|
|
|
|
|
|
//fmt.Println()
|
|
|
|
|
|
/////////////////////////////////////////////
|
|
|
// RabbitMQ Setup Connection
|
|
|
fmt.Println("Connecting to RabbitMQ: " + appconf.AmqpURI)
|
|
|
logit(6, "Connecting to RabbitMQ: "+appconf.AmqpURI)
|
|
|
amqpConn, err := rabbitmq.NewConn(
|
|
|
appconf.AmqpURI,
|
|
|
rabbitmq.WithConnectionOptionsLogging,
|
|
|
)
|
|
|
if err != nil {
|
|
|
fmt.Println("Unable to initialize RabbitMQ connection: " + err.Error())
|
|
|
log.Fatal("Unable to initialize RabbitMQ connection: " + err.Error())
|
|
|
}
|
|
|
defer amqpConn.Close()
|
|
|
|
|
|
/////////////////////////////////////////////
|
|
|
// RabbitMQ Setup Consumer
|
|
|
msgConsumer, err = rabbitmq.NewConsumer(
|
|
|
amqpConn,
|
|
|
handleAmqpMsg, //this is the function that we want to call to consume presence messages
|
|
|
"q_rabbithunter_"+myHostnameShort,
|
|
|
rabbitmq.WithConsumerOptionsExchangeName(appconf.AmqpSubExch),
|
|
|
rabbitmq.WithConsumerOptionsExchangeKind(appconf.AmqpExchType),
|
|
|
rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpSubRoutingKey),
|
|
|
rabbitmq.WithConsumerOptionsConsumerName("consumer_rabbithunter_"+myHostnameShort),
|
|
|
rabbitmq.WithConsumerOptionsQueueAutoDelete,
|
|
|
rabbitmq.WithConsumerOptionsConcurrency(appconf.AmqpWorkers),
|
|
|
//rabbitmq.WithConsumerOptionsQuorum,
|
|
|
//rabbitmq.WithConsumerOptionsQueueDurable,
|
|
|
//rabbitmq.WithConsumerOptionsExchangeDeclare,
|
|
|
//rabbitmq.WithConsumerOptionsBindingExchangeDurable,
|
|
|
)
|
|
|
|
|
|
if err != nil {
|
|
|
fmt.Println("Unable to initialize RabbitMQ consumer: " + err.Error())
|
|
|
log.Fatal("Unable to initialize RabbitMQ consumer: " + err.Error())
|
|
|
}
|
|
|
defer msgConsumer.Close()
|
|
|
|
|
|
logit(6, "Consuming on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpSubExch+"' with routing key: '"+appconf.AmqpSubRoutingKey+"' using queue: 'consumer_rabbithunter_"+myHostnameShort+"'.")
|
|
|
|
|
|
/////////////////////////////////////////////
|
|
|
// RabbitMQ Setup Publisher
|
|
|
publisher, err = rabbitmq.NewPublisher(
|
|
|
amqpConn,
|
|
|
rabbitmq.WithPublisherOptionsLogging,
|
|
|
rabbitmq.WithPublisherOptionsExchangeKind(appconf.AmqpExchType),
|
|
|
rabbitmq.WithPublisherOptionsExchangeName(appconf.AmqpPubExch),
|
|
|
//rabbitmq.WithPublisherOptionsExchangeDeclare,
|
|
|
)
|
|
|
if err != nil {
|
|
|
fmt.Println("Unable to initialize RabbitMQ publisher: " + err.Error())
|
|
|
log.Fatal("Unable to initialize RabbitMQ publisher: " + err.Error())
|
|
|
}
|
|
|
defer publisher.Close()
|
|
|
|
|
|
logit(7, "Publisher configured on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpPubExch+"' with routing key: '"+appconf.AmqpPubRoutingKey+"'.")
|
|
|
|
|
|
publisher.NotifyReturn(func(r rabbitmq.Return) {
|
|
|
logit(4, fmt.Sprintf("RabbitMQ published message returned from server: %s\n", string(r.Body)))
|
|
|
})
|
|
|
publisher.NotifyPublish(func(c rabbitmq.Confirmation) {
|
|
|
logit(7, fmt.Sprintf("Message confirmed from RabbitMQ server. tag: %v, ack: %v\n", c.DeliveryTag, c.Ack))
|
|
|
})
|
|
|
|
|
|
fmt.Println("Publish Exchange: [" + appconf.AmqpExchType + "] '" + appconf.AmqpPubExch + "'\nPublish Routing Key: '" + appconf.AmqpPubRoutingKey + "'")
|
|
|
fmt.Println("Consume Exchange: [" + appconf.AmqpExchType + "] '" + appconf.AmqpSubExch + "'\nConsume Routing Key: '" + appconf.AmqpSubRoutingKey + "'\nConsume Queue: 'consumer_rabbithunter_" + myHostnameShort + "'")
|
|
|
/////////////////////////////////////////////
|
|
|
// Message Filters
|
|
|
msgCatFilters = strings.Split(appconf.FilterEvtCat, ",")
|
|
|
msgNameFilters = strings.Split(appconf.FilterEvtName, ",")
|
|
|
msgAppNameFilters = strings.Split(appconf.FilterEvtAppName, ",")
|
|
|
fmt.Println("Filtering event app names matching:", msgAppNameFilters)
|
|
|
fmt.Println("Filtering event categories matching:", msgCatFilters)
|
|
|
fmt.Println("Filtering event names matching:", msgNameFilters)
|
|
|
log.Println("Filtering event app names matching:", msgAppNameFilters)
|
|
|
log.Println("Filtering event categories matching:", msgCatFilters)
|
|
|
log.Println("Filtering event names matching:", msgNameFilters)
|
|
|
|
|
|
/////////////////////////////////////////////
|
|
|
logit(6, "Using "+fmt.Sprintf("%d", appconf.AmqpWorkers)+" concurrent workers to process AMQP messages.")
|
|
|
|
|
|
log.Println(appnameFull + " system started.\n-----> [READY]")
|
|
|
fmt.Println(appnameFull + " system started.\n-----> [READY]")
|
|
|
|
|
|
// block main thread - wait for shutdown signal
|
|
|
sigs := make(chan os.Signal, 1)
|
|
|
|
|
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
|
|
go func() {
|
|
|
sig := <-sigs
|
|
|
fmt.Printf("Received signal: ")
|
|
|
fmt.Println(sig)
|
|
|
done <- true
|
|
|
}()
|
|
|
|
|
|
publishHandler() //this func will block the main thread, until receiving on the 'done' channel.
|
|
|
}
|
|
|
|
|
|
func logit(level int, message string) {
|
|
|
if appconf.LogLevel >= int(level) {
|
|
|
log.Println(message)
|
|
|
}
|
|
|
if appconf.LogLevel >= 8 {
|
|
|
fmt.Println(message)
|
|
|
}
|
|
|
}
|