Browse Source

RabbitHunter v0.3

master
Ruel Tmeizeh - RuhNet 2 years ago
commit
0315032b26
9 changed files with 615 additions and 0 deletions
  1. +6
    -0
      .gitignore
  2. +59
    -0
      amqpconsume.go
  3. +98
    -0
      amqppublish.go
  4. +151
    -0
      config.go
  5. +9
    -0
      go.mod
  6. +49
    -0
      go.sum
  7. +196
    -0
      main.go
  8. +27
    -0
      message.json
  9. +20
    -0
      rabbithunter_config.json.sample

+ 6
- 0
.gitignore View File

@ -0,0 +1,6 @@
rabbithunter_config.json
rabbithunter
*.swp

+ 59
- 0
amqpconsume.go View File

@ -0,0 +1,59 @@
package main
import (
"encoding/json"
"fmt"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
var msgConsumer *rabbitmq.Consumer
type KzMessage struct {
AccountId string `json:"Account-ID"`
AppName string `json:"App-Name"`
AppVersion string `json:"App-Version"`
CallId string `json:"Call-ID"`
EventCategory string `json:"Event-Category"`
EventName string `json:"Event-Name"`
MsgId string `json:"Msg-ID"`
Node string `json:"Node"`
ServerId string `json:"Server-ID"`
Description string `json:"Msg-Description"`
To string `json:"To"`
Payload map[string]interface{} `json:"Payload"`
}
func handleAmqpMsg(d rabbitmq.Delivery) rabbitmq.Action {
// rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue
logit(7, "AMQP message received: "+string(d.Body))
if appconf.FilterEvtCat == "*" && appconf.FilterEvtName == "*" && appconf.FilterEvtAppName == "*" {
fmt.Println(string(d.Body))
return rabbitmq.Ack
}
var msg KzMessage
err := json.Unmarshal(d.Body, &msg)
if err != nil {
logit(5, "handleAmqpMsg(): Error unmarshalling AMQP message into map[string]interface{}...discarding. Message body: "+string(d.Body)+"\nUnmarshalling error: "+err.Error())
return rabbitmq.NackDiscard
}
for _, appname := range msgAppNameFilters {
if appconf.FilterEvtAppName == "*" || appname == msg.AppName { //only print messges that match a filter, or any if the filter is "*"
for _, cat := range msgCatFilters {
if appconf.FilterEvtCat == "*" || cat == msg.EventCategory {
for _, name := range msgNameFilters {
if appconf.FilterEvtName == "*" || name == msg.EventName {
fmt.Println(string(d.Body))
}
}
}
}
}
}
return rabbitmq.Ack
}

+ 98
- 0
amqppublish.go View File

@ -0,0 +1,98 @@
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/signal"
"regexp"
"syscall"
"time"
"github.com/google/uuid"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
var publisher *rabbitmq.Publisher
func publishHandler() {
pubch := make(chan os.Signal, 1)
signal.Notify(pubch, syscall.SIGUSR1, syscall.SIGUSR2)
//create test message to send
ptest := KzMessage{
EventCategory: "notification",
EventName: "push_req",
AccountId: "12a0f57aec7a40a1ccf6d959521d5682",
AppName: appname,
AppVersion: version,
CallId: uuid.New().String(),
MsgId: uuid.New().String(),
Node: myHostname,
ServerId: appname + "@" + myHostname,
Description: "Test message from RuhNet RabbitHunter",
To: "1234",
}
msg1, err := json.Marshal(ptest)
if err != nil {
logit(3, "Unable to marshal ptest into JSON! Published test message will be blank.")
}
msg2 := []byte(`{"test":true, "message":"test from ` + appnameFull + `","node":"` + myHostname + `"}`) //default second message
for {
select {
case sig := <-pubch:
logit(5, "Received signal "+fmt.Sprintf("%s", sig)+" on pubch channel. Publishing message... ")
var msg []byte
switch sig {
case syscall.SIGUSR1:
msg = msg1
case syscall.SIGUSR2:
msg = msg2
msgFromFile, err := readPubMessageFile(appconf.PubMessageFile)
if err == nil {
msg = msgFromFile
}
}
logit(7, "Publishing message to exchange '"+appconf.AmqpPubExch+"' with routing key '"+appconf.AmqpPubRoutingKey+"': \n"+string(msg))
err := publisher.Publish(msg,
[]string{appconf.AmqpPubRoutingKey},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch),
//rabbitmq.WithPublishOptionsExchange("amq.fanout"),
)
if err != nil {
logit(3, "Error publishing message: "+err.Error())
fmt.Println("Error publishing message: " + err.Error())
} else {
fmt.Println("PUBLISHED: \n" + string(msg) + "\n")
}
case <-done:
fmt.Println("Shutting myself down...")
return
case <-time.After(100 * time.Millisecond):
//case <-time.After(time.Second):
//fmt.Println("tick")
}
}
}
func readPubMessageFile(filePath string) (msg []byte, err error) {
jsonFile, err := os.Open(filePath)
if err != nil {
logit(4, "Could not open config file: "+filePath+"\n"+err.Error())
return msg, err
}
defer jsonFile.Close()
fileBytes, _ := ioutil.ReadAll(jsonFile)
//strip out // comments from file:
re := regexp.MustCompile(`([\s]//.*)|(^//.*)`)
fileCleanedBytes := re.ReplaceAll(fileBytes, nil)
return fileCleanedBytes, err
}

+ 151
- 0
config.go View File

@ -0,0 +1,151 @@
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"reflect"
"regexp"
"strconv"
)
var appconf *AppConfig
func initConfig() {
//Read config:
configFilename := appname + "_config.json"
configDir := os.Getenv("CONFDIR")
if configDir == "" {
confDirs := []string{
"/opt/" + appname,
"/opt/" + appname + "/etc",
"/usr/lib/" + appname,
"/usr/lib/" + appname + "/etc",
"/var/lib/" + appname,
"/var/lib/" + appname + "/etc",
"/usr/local/etc",
"/etc",
".",
}
configDir = "." //the fallback
for _, cd := range confDirs {
if _, err := os.Stat(cd + "/" + configFilename); os.IsNotExist(err) { //doesn't exist...
continue //..so check next one
}
configDir = cd
}
}
configFile := configDir + "/" + configFilename
jsonFile, err := os.Open(configFile)
if err != nil {
log.Println("Could not open config file: " + configFile + "\n" + err.Error())
fmt.Println("Could not open config file: " + configFile + "\n" + err.Error())
} else {
defer jsonFile.Close()
fileBytes, _ := ioutil.ReadAll(jsonFile)
//strip out // comments from config file:
re := regexp.MustCompile(`([\s]//.*)|(^//.*)`)
fileCleanedBytes := re.ReplaceAll(fileBytes, nil)
err = json.Unmarshal(fileCleanedBytes, &appconf) //populate the config struct with JSON data from the config file
if err != nil {
log.Fatal("Could not parse config file: " + configFile + "\n" + err.Error())
}
}
appconf.checkConfig(configFilename)
}
func (f *AppConfig) checkConfig(configFileName string) {
var invalid bool
s := reflect.ValueOf(f).Elem() //the reflected struct
for i := 0; i < s.NumField(); i++ { //NumField() returns the number of fields in the struct
fieldValue := s.Field(i) //value of this i'th field
t := s.Type().Field(i)
//fmt.Println(t.Name + fmt.Sprintf(" is of kind: %d", t.Type.Kind()))
if fieldValue.Interface() == "" || fieldValue.Interface() == nil || fieldValue.Interface() == 0 || (t.Type.Kind() != reflect.Bool && fieldValue.IsZero()) { //field is not set already
//fmt.Println(t.Name + " is empty or zero.")
if t.Type.Kind() == reflect.String || t.Type.Kind() == reflect.Bool || t.Type.Kind() == reflect.Float64 || t.Type.Kind() == reflect.Int64 || t.Type.Kind() == reflect.Int {
if !fieldValue.CanSet() {
log.Printf("Config item '%s' cannot be set!\n", t.Name)
invalid = true
} else {
env, ok := os.LookupEnv(t.Tag.Get("env"))
if ok && len(env) > 0 {
//fmt.Println("ENV: " + t.Tag.Get("env") + " is found and is: " + env + " Setting...")
if err := setField(fieldValue, env); err != nil {
invalid = true
log.Println("Error setting '" + t.Name + "' to env '" + env + "'. Error: " + err.Error())
fmt.Println("Error setting '" + t.Name + "' to env '" + env + "'. Error: " + err.Error())
} else {
continue
}
} else { //env not found
//fmt.Println("ENV: '" + t.Tag.Get("env") + "' is NOT FOUND; checking for default value...")
// Look for user-defined default value
dflt, ok := t.Tag.Lookup("default")
//fmt.Println("DEFAULT is: " + dflt)
if ok {
if err := setField(fieldValue, dflt); err != nil {
log.Println("Error setting '" + t.Name + "' to default '" + dflt + "'. Error: " + err.Error())
fmt.Println("Error setting '" + t.Name + "' to default '" + dflt + "'. Error: " + err.Error())
invalid = true
} else {
continue
}
}
}
}
} else {
log.Printf("Config item '%s' of type %s cannot be set using environment variable %s.", s.Type().Field(i).Name, fieldValue.Type(), s.Type().Field(i).Tag.Get("env"))
invalid = true
}
if !invalid {
log.Println("===========| ERRORS IN '" + configFileName + "' CONFIG FILE: |===========")
fmt.Println("--------------------------------------------------------------------------------")
fmt.Println(" ===========| ERRORS IN '" + configFileName + "' CONFIG FILE: |===========")
fmt.Println("")
}
invalid = true
log.Printf(" - Required config item '%s' and/or environment variable '%s' is missing or invalid.\n", t.Tag.Get("json"), t.Tag.Get("env"))
fmt.Printf(" - Required config item '%s' and/or environment variable '%s' is missing or invalid.\n", t.Tag.Get("json"), t.Tag.Get("env"))
}
}
if invalid {
fmt.Println("--------------------------------------------------------------------------------")
log.Fatal("Exiting!")
}
}
func setField(fieldValue reflect.Value, value string) (err error) {
switch fieldValue.Kind() {
case reflect.Bool:
var b bool
if b, err = strconv.ParseBool(value); err != nil {
return err
}
fieldValue.SetBool(b)
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
var i int64
if i, err = strconv.ParseInt(value, 0, 64); err != nil {
return err
}
fieldValue.SetInt(int64(i))
case reflect.Float32, reflect.Float64:
var f float64
if f, err = strconv.ParseFloat(value, 64); err != nil {
return err
}
fieldValue.SetFloat(f)
case reflect.String:
fieldValue.SetString(value)
default:
return fmt.Errorf("%s is not a supported config type. Please use bool, float64 float32, int64, int, or string.", fieldValue.Kind())
}
return
}

+ 9
- 0
go.mod View File

@ -0,0 +1,9 @@
module rabbithunter
go 1.16
require (
github.com/google/uuid v1.4.0 // indirect
github.com/streadway/amqp v1.0.0 // indirect
github.com/wagslane/go-rabbitmq v0.12.4 // indirect
)

+ 49
- 0
go.sum View File

@ -0,0 +1,49 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo=
github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/wagslane/go-rabbitmq v0.12.4 h1:dxpmTew/wrBlltcu9kBZNTVftT7tsguF4n4IAawK2d8=
github.com/wagslane/go-rabbitmq v0.12.4/go.mod h1:1sUJ53rrW2AIA7LEp8ymmmebHqqq8ksH/gXIfUP0I0s=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 196
- 0
main.go View File

@ -0,0 +1,196 @@
package main
import (
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
const appnameFull string = "RuhNet RabbitHunter AMQP Utility"
const appname string = "rabbithunter"
const version string = "0.3"
const serverVersion string = "RuhNet " + appname + " v" + version
const website string = "https://ruhnet.co"
var startupTime time.Time
var myHostname, myHostnameShort string
const banner = `
____ __ _ _ __
/ ___\ __ __/ /_ / \ / /__ __/ /_
/ /_/ // /_/ / _ \/ / \/ //__\_ __/
/_/ \_\\ ___/_/ /_/_/ \__/ \__,/_/ %s
_____________________________________________________
`
var done = make(chan bool, 1)
var msgCatFilters []string
var msgNameFilters []string
var msgAppNameFilters []string
type AppConfig struct {
AmqpURI string `json:"amqp_uri" env:"AMQP_URI" default:"amqp://guest:guest@localhost:5672"`
AmqpSubExch string `json:"amqp_sub_exchange" env:"AMQP_SUB_EXCH" default:"callevt"`
AmqpPubExch string `json:"amqp_pub_exchange" env:"AMQP_PUB_EXCH" default:"pushes"`
AmqpExchType string `json:"amqp_exchange_type" env:"AMQP_EXCH_TYPE" default:"topic"`
AmqpSubRoutingKey string `json:"amqp_sub_routing_key" env:"AMQP_SUB_ROUTING_KEY" default:"call.*.*"`
AmqpPubRoutingKey string `json:"amqp_pub_routing_key" env:"AMQP_PUB_ROUTING_KEY" default:"notification.push.customapp.test"`
AmqpWorkers int `json:"amqp_workers" env:"AMQP_WORKERS" default:"2"`
LogFile string `json:"log_file" env:"LOG_FILE" default:"/tmp/rabbithunter.log"`
LogLevel int `json:"log_level" env:"LOG_LEVEL" default:"5"`
PubMessageFile string `json:"pub_message_file" env:"PUB_MSG_FILE" default:"./message.json"`
FilterEvtCat string `json:"filter_event_category" env:"FLT_EVT_CAT" default:"*"` //allow comma separated string for multiple
FilterEvtName string `json:"filter_event_name" env:"FLT_EVT_NAME" default:"*"` //allow comma separated string for multiple
FilterEvtAppName string `json:"filter_event_appname" env:"FLT_EVT_APPNAME" default:"*"` //allow comma separated string for multiple
}
func main() {
var err error
startupTime = time.Now()
/////////////////////////////////////////////
// Setup
initConfig()
fmt.Println("Configuration OK, starting " + appname + "...")
logit(5, "Configuration OK, starting "+appname+"...")
/////////////////////////////////////////////
// Logging
appLogFile, err := os.OpenFile(appconf.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0664)
if err != nil {
log.Println("Could not open log file: " + appconf.LogFile + "\n" + err.Error())
appLogFile, err = os.OpenFile("/tmp/"+appname+".log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0664)
if err != nil {
log.Fatal("Can't open even /tmp log file!\n" + err.Error())
}
}
defer appLogFile.Close()
//set other logging to same file
log.SetOutput(appLogFile)
//Startup Banner
fmt.Printf(banner, website)
fmt.Println(serverVersion + "\n")
logit(5, appname+"v"+version+" starting up... ")
logit(5, fmt.Sprintf("Logging with level: %d", appconf.LogLevel))
//Hostname
myHostname, err = os.Hostname()
logit(5, "Detecting my hostname... "+myHostname)
if err != nil {
log.Fatal("Hostname could not be auto-detected from system: " + err.Error())
}
myHostnameShort = strings.SplitN(myHostname, ".", 2)[0]
//fmt.Println()
/////////////////////////////////////////////
// RabbitMQ Setup Connection
fmt.Println("Connecting to RabbitMQ: " + appconf.AmqpURI)
logit(6, "Connecting to RabbitMQ: "+appconf.AmqpURI)
amqpConn, err := rabbitmq.NewConn(
appconf.AmqpURI,
rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
fmt.Println("Unable to initialize RabbitMQ connection: " + err.Error())
log.Fatal("Unable to initialize RabbitMQ connection: " + err.Error())
}
defer amqpConn.Close()
/////////////////////////////////////////////
// RabbitMQ Setup Consumer
msgConsumer, err = rabbitmq.NewConsumer(
amqpConn,
handleAmqpMsg, //this is the function that we want to call to consume presence messages
"q_rabbithunter_"+myHostnameShort,
rabbitmq.WithConsumerOptionsExchangeName(appconf.AmqpSubExch),
rabbitmq.WithConsumerOptionsExchangeKind(appconf.AmqpExchType),
rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpSubRoutingKey),
rabbitmq.WithConsumerOptionsConsumerName("consumer_rabbithunter_"+myHostnameShort),
rabbitmq.WithConsumerOptionsQueueAutoDelete,
rabbitmq.WithConsumerOptionsConcurrency(appconf.AmqpWorkers),
//rabbitmq.WithConsumerOptionsQuorum,
//rabbitmq.WithConsumerOptionsQueueDurable,
//rabbitmq.WithConsumerOptionsExchangeDeclare,
//rabbitmq.WithConsumerOptionsBindingExchangeDurable,
)
if err != nil {
fmt.Println("Unable to initialize RabbitMQ consumer: " + err.Error())
log.Fatal("Unable to initialize RabbitMQ consumer: " + err.Error())
}
defer msgConsumer.Close()
logit(6, "Consuming on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpSubExch+"' with routing key: '"+appconf.AmqpSubRoutingKey+"' using queue: 'consumer_rabbithunter_"+myHostnameShort+"'.")
/////////////////////////////////////////////
// RabbitMQ Setup Publisher
publisher, err = rabbitmq.NewPublisher(
amqpConn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeKind(appconf.AmqpExchType),
rabbitmq.WithPublisherOptionsExchangeName(appconf.AmqpPubExch),
//rabbitmq.WithPublisherOptionsExchangeDeclare,
)
if err != nil {
fmt.Println("Unable to initialize RabbitMQ publisher: " + err.Error())
log.Fatal("Unable to initialize RabbitMQ publisher: " + err.Error())
}
defer publisher.Close()
logit(7, "Publisher configured on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpPubExch+"' with routing key: '"+appconf.AmqpPubRoutingKey+"'.")
publisher.NotifyReturn(func(r rabbitmq.Return) {
logit(4, fmt.Sprintf("RabbitMQ published message returned from server: %s\n", string(r.Body)))
})
publisher.NotifyPublish(func(c rabbitmq.Confirmation) {
logit(7, fmt.Sprintf("Message confirmed from RabbitMQ server. tag: %v, ack: %v\n", c.DeliveryTag, c.Ack))
})
/////////////////////////////////////////////
// Message Filters
msgCatFilters = strings.Split(appconf.FilterEvtCat, ",")
msgNameFilters = strings.Split(appconf.FilterEvtName, ",")
msgAppNameFilters = strings.Split(appconf.FilterEvtAppName, ",")
fmt.Println("Filtering event app names matching:", msgAppNameFilters)
fmt.Println("Filtering event categories matching:", msgCatFilters)
fmt.Println("Filtering event names matching:", msgNameFilters)
/////////////////////////////////////////////
logit(6, "Using "+fmt.Sprintf("%d", appconf.AmqpWorkers)+" concurrent workers to process AMQP messages.")
log.Println(appnameFull + " system started.\n-----> [READY]")
fmt.Println(appnameFull + " system started.\n-----> [READY]")
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Printf("Received signal: ")
fmt.Println(sig)
done <- true
}()
publishHandler() //this func will block the main thread, until receiving on the 'done' channel.
}
func logit(level int, message string) {
if appconf.LogLevel >= int(level) {
log.Println(message)
}
if appconf.LogLevel >= 8 {
fmt.Println(message)
}
}

+ 27
- 0
message.json View File

@ -0,0 +1,27 @@
{
"Account-ID": "52e0c76bfa7a40a1cde6d959921d6218",
"To": "5026",
"Alert-Key": "IC_SIL",
"Alert-Params": [
"5032 - 5032 5032"
],
"App-Name": "kamailio",
"App-Version": "5.5.2",
"Call-ID": "48617fa3-55aa-44a0-9465-4606ac0d0c2c",
"Event-Category": "notification",
"Event-Name": "push_req",
"Msg-ID": "1f8a6478-e32e-48a6-b5d1-80416cd4a916",
"Node": "kamailio@ke-app1.domain.com",
"Payload": {
"call-id": "48617fa3-55aa-44a0-9465-4606ac0d0c2c",
"caller-id-name": "John Smith",
"caller-id-number": "4321",
"proxy": "sip:10.150.0.2:7000",
"registration-token": "f3651786-1a48-454d-a049-f03446ebadc8"
},
"Server-ID": "kamailio@ke-app1.domain.com-default<83.8790.22>",
"Sound": "ring.caf",
"Token-App": "io.domain.com.app",
"Token-ID": "121212",
"Token-Type": "domain.com"
}

+ 20
- 0
rabbithunter_config.json.sample View File

@ -0,0 +1,20 @@
//rabbithunter_config.json
//Commented lines represent default values.
//You may also set any config parameters via the environment variables listed.
{
// "amqp_uri":"amqp://guest:guest@localhost:5672", // env AMQP_URI
// "amqp_sub_exchange":"callevt", // env AMQP_SUB_EXCH
// "amqp_pub_exchange":"pushes", // env AMQP_PUB_EXCH
// "amqp_exchange_type":"topic", // env AMQP_EXCH_TYPE
// "amqp_sub_routing_key":"call.*.*", // env AMQP_SUB_ROUTING_KEY
// "amqp_pub_routing_key":"notification.push.customapp.test", // env AMQP_PUB_ROUTING_KEY
// "amqp_workers":2, // env AMQP_WORKERS
// "pub_message_file":"./message.json", // env PUB_MSG_FILE -- send signal USR2 to publish
// "filter_event_category":"*", // env FLT_EVT_CAT -- filter event category - comma separate
// "filter_event_name":"*", // env FLT_EVT_NAME -- filter event name - comma separate
// "filter_event_appname":"*", // env FLT_EVT_APPNAME -- filter event app name - comma separate
// "log_file":"/tmp/rabbithunter.log", // env LOG_FILE
// "log_level":5 // env LOG_LEVEL (default 5/Notice)
}

Loading…
Cancel
Save