From 0315032b26d8f78a2d6ce282f051e8b44c4f45a0 Mon Sep 17 00:00:00 2001 From: Ruel Tmeizeh - RuhNet Date: Sat, 23 Dec 2023 18:02:59 -0500 Subject: [PATCH] RabbitHunter v0.3 --- .gitignore | 6 + amqpconsume.go | 59 ++++++++++ amqppublish.go | 98 ++++++++++++++++ config.go | 151 ++++++++++++++++++++++++ go.mod | 9 ++ go.sum | 49 ++++++++ main.go | 196 ++++++++++++++++++++++++++++++++ message.json | 27 +++++ rabbithunter_config.json.sample | 20 ++++ 9 files changed, 615 insertions(+) create mode 100644 .gitignore create mode 100644 amqpconsume.go create mode 100644 amqppublish.go create mode 100644 config.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 message.json create mode 100644 rabbithunter_config.json.sample diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d4631e1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +rabbithunter_config.json +rabbithunter +*.swp + + + diff --git a/amqpconsume.go b/amqpconsume.go new file mode 100644 index 0000000..8ccd923 --- /dev/null +++ b/amqpconsume.go @@ -0,0 +1,59 @@ +package main + +import ( + "encoding/json" + "fmt" + + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +var msgConsumer *rabbitmq.Consumer + +type KzMessage struct { + AccountId string `json:"Account-ID"` + AppName string `json:"App-Name"` + AppVersion string `json:"App-Version"` + CallId string `json:"Call-ID"` + EventCategory string `json:"Event-Category"` + EventName string `json:"Event-Name"` + MsgId string `json:"Msg-ID"` + Node string `json:"Node"` + ServerId string `json:"Server-ID"` + Description string `json:"Msg-Description"` + To string `json:"To"` + Payload map[string]interface{} `json:"Payload"` +} + +func handleAmqpMsg(d rabbitmq.Delivery) rabbitmq.Action { + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + logit(7, "AMQP message received: "+string(d.Body)) + + if appconf.FilterEvtCat == "*" && appconf.FilterEvtName == "*" && appconf.FilterEvtAppName == "*" { + fmt.Println(string(d.Body)) + return rabbitmq.Ack + } + + var msg KzMessage + + err := json.Unmarshal(d.Body, &msg) + if err != nil { + logit(5, "handleAmqpMsg(): Error unmarshalling AMQP message into map[string]interface{}...discarding. Message body: "+string(d.Body)+"\nUnmarshalling error: "+err.Error()) + return rabbitmq.NackDiscard + } + + for _, appname := range msgAppNameFilters { + if appconf.FilterEvtAppName == "*" || appname == msg.AppName { //only print messges that match a filter, or any if the filter is "*" + for _, cat := range msgCatFilters { + if appconf.FilterEvtCat == "*" || cat == msg.EventCategory { + for _, name := range msgNameFilters { + if appconf.FilterEvtName == "*" || name == msg.EventName { + fmt.Println(string(d.Body)) + } + } + } + } + } + } + + return rabbitmq.Ack +} diff --git a/amqppublish.go b/amqppublish.go new file mode 100644 index 0000000..6909501 --- /dev/null +++ b/amqppublish.go @@ -0,0 +1,98 @@ +package main + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "os/signal" + "regexp" + "syscall" + "time" + + "github.com/google/uuid" + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +var publisher *rabbitmq.Publisher + +func publishHandler() { + pubch := make(chan os.Signal, 1) + signal.Notify(pubch, syscall.SIGUSR1, syscall.SIGUSR2) + + //create test message to send + ptest := KzMessage{ + EventCategory: "notification", + EventName: "push_req", + AccountId: "12a0f57aec7a40a1ccf6d959521d5682", + AppName: appname, + AppVersion: version, + CallId: uuid.New().String(), + MsgId: uuid.New().String(), + Node: myHostname, + ServerId: appname + "@" + myHostname, + Description: "Test message from RuhNet RabbitHunter", + To: "1234", + } + + msg1, err := json.Marshal(ptest) + if err != nil { + logit(3, "Unable to marshal ptest into JSON! Published test message will be blank.") + } + + msg2 := []byte(`{"test":true, "message":"test from ` + appnameFull + `","node":"` + myHostname + `"}`) //default second message + + for { + select { + case sig := <-pubch: + logit(5, "Received signal "+fmt.Sprintf("%s", sig)+" on pubch channel. Publishing message... ") + var msg []byte + switch sig { + case syscall.SIGUSR1: + msg = msg1 + case syscall.SIGUSR2: + msg = msg2 + msgFromFile, err := readPubMessageFile(appconf.PubMessageFile) + if err == nil { + msg = msgFromFile + } + } + logit(7, "Publishing message to exchange '"+appconf.AmqpPubExch+"' with routing key '"+appconf.AmqpPubRoutingKey+"': \n"+string(msg)) + err := publisher.Publish(msg, + []string{appconf.AmqpPubRoutingKey}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch), + //rabbitmq.WithPublishOptionsExchange("amq.fanout"), + ) + if err != nil { + logit(3, "Error publishing message: "+err.Error()) + fmt.Println("Error publishing message: " + err.Error()) + } else { + fmt.Println("PUBLISHED: \n" + string(msg) + "\n") + } + case <-done: + fmt.Println("Shutting myself down...") + return + case <-time.After(100 * time.Millisecond): + //case <-time.After(time.Second): + //fmt.Println("tick") + } + } + +} + +func readPubMessageFile(filePath string) (msg []byte, err error) { + jsonFile, err := os.Open(filePath) + if err != nil { + logit(4, "Could not open config file: "+filePath+"\n"+err.Error()) + return msg, err + } + defer jsonFile.Close() + fileBytes, _ := ioutil.ReadAll(jsonFile) + + //strip out // comments from file: + re := regexp.MustCompile(`([\s]//.*)|(^//.*)`) + fileCleanedBytes := re.ReplaceAll(fileBytes, nil) + + return fileCleanedBytes, err +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..851cfce --- /dev/null +++ b/config.go @@ -0,0 +1,151 @@ +package main + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "log" + "os" + "reflect" + "regexp" + "strconv" +) + +var appconf *AppConfig + +func initConfig() { + //Read config: + configFilename := appname + "_config.json" + configDir := os.Getenv("CONFDIR") + if configDir == "" { + confDirs := []string{ + "/opt/" + appname, + "/opt/" + appname + "/etc", + "/usr/lib/" + appname, + "/usr/lib/" + appname + "/etc", + "/var/lib/" + appname, + "/var/lib/" + appname + "/etc", + "/usr/local/etc", + "/etc", + ".", + } + configDir = "." //the fallback + for _, cd := range confDirs { + if _, err := os.Stat(cd + "/" + configFilename); os.IsNotExist(err) { //doesn't exist... + continue //..so check next one + } + configDir = cd + } + } + configFile := configDir + "/" + configFilename + jsonFile, err := os.Open(configFile) + if err != nil { + log.Println("Could not open config file: " + configFile + "\n" + err.Error()) + fmt.Println("Could not open config file: " + configFile + "\n" + err.Error()) + } else { + defer jsonFile.Close() + fileBytes, _ := ioutil.ReadAll(jsonFile) + + //strip out // comments from config file: + re := regexp.MustCompile(`([\s]//.*)|(^//.*)`) + fileCleanedBytes := re.ReplaceAll(fileBytes, nil) + + err = json.Unmarshal(fileCleanedBytes, &appconf) //populate the config struct with JSON data from the config file + if err != nil { + log.Fatal("Could not parse config file: " + configFile + "\n" + err.Error()) + } + } + + appconf.checkConfig(configFilename) +} + +func (f *AppConfig) checkConfig(configFileName string) { + var invalid bool + + s := reflect.ValueOf(f).Elem() //the reflected struct + for i := 0; i < s.NumField(); i++ { //NumField() returns the number of fields in the struct + fieldValue := s.Field(i) //value of this i'th field + t := s.Type().Field(i) + //fmt.Println(t.Name + fmt.Sprintf(" is of kind: %d", t.Type.Kind())) + if fieldValue.Interface() == "" || fieldValue.Interface() == nil || fieldValue.Interface() == 0 || (t.Type.Kind() != reflect.Bool && fieldValue.IsZero()) { //field is not set already + //fmt.Println(t.Name + " is empty or zero.") + if t.Type.Kind() == reflect.String || t.Type.Kind() == reflect.Bool || t.Type.Kind() == reflect.Float64 || t.Type.Kind() == reflect.Int64 || t.Type.Kind() == reflect.Int { + if !fieldValue.CanSet() { + log.Printf("Config item '%s' cannot be set!\n", t.Name) + invalid = true + } else { + env, ok := os.LookupEnv(t.Tag.Get("env")) + if ok && len(env) > 0 { + //fmt.Println("ENV: " + t.Tag.Get("env") + " is found and is: " + env + " Setting...") + if err := setField(fieldValue, env); err != nil { + invalid = true + log.Println("Error setting '" + t.Name + "' to env '" + env + "'. Error: " + err.Error()) + fmt.Println("Error setting '" + t.Name + "' to env '" + env + "'. Error: " + err.Error()) + } else { + continue + } + } else { //env not found + //fmt.Println("ENV: '" + t.Tag.Get("env") + "' is NOT FOUND; checking for default value...") + // Look for user-defined default value + dflt, ok := t.Tag.Lookup("default") + //fmt.Println("DEFAULT is: " + dflt) + if ok { + if err := setField(fieldValue, dflt); err != nil { + log.Println("Error setting '" + t.Name + "' to default '" + dflt + "'. Error: " + err.Error()) + fmt.Println("Error setting '" + t.Name + "' to default '" + dflt + "'. Error: " + err.Error()) + invalid = true + } else { + continue + } + } + } + } + } else { + log.Printf("Config item '%s' of type %s cannot be set using environment variable %s.", s.Type().Field(i).Name, fieldValue.Type(), s.Type().Field(i).Tag.Get("env")) + invalid = true + } + + if !invalid { + log.Println("===========| ERRORS IN '" + configFileName + "' CONFIG FILE: |===========") + fmt.Println("--------------------------------------------------------------------------------") + fmt.Println(" ===========| ERRORS IN '" + configFileName + "' CONFIG FILE: |===========") + fmt.Println("") + } + invalid = true + log.Printf(" - Required config item '%s' and/or environment variable '%s' is missing or invalid.\n", t.Tag.Get("json"), t.Tag.Get("env")) + fmt.Printf(" - Required config item '%s' and/or environment variable '%s' is missing or invalid.\n", t.Tag.Get("json"), t.Tag.Get("env")) + } + } + if invalid { + fmt.Println("--------------------------------------------------------------------------------") + log.Fatal("Exiting!") + } +} + +func setField(fieldValue reflect.Value, value string) (err error) { + switch fieldValue.Kind() { + case reflect.Bool: + var b bool + if b, err = strconv.ParseBool(value); err != nil { + return err + } + fieldValue.SetBool(b) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + var i int64 + if i, err = strconv.ParseInt(value, 0, 64); err != nil { + return err + } + fieldValue.SetInt(int64(i)) + case reflect.Float32, reflect.Float64: + var f float64 + if f, err = strconv.ParseFloat(value, 64); err != nil { + return err + } + fieldValue.SetFloat(f) + case reflect.String: + fieldValue.SetString(value) + default: + return fmt.Errorf("%s is not a supported config type. Please use bool, float64 float32, int64, int, or string.", fieldValue.Kind()) + } + return +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cf69227 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module rabbithunter + +go 1.16 + +require ( + github.com/google/uuid v1.4.0 // indirect + github.com/streadway/amqp v1.0.0 // indirect + github.com/wagslane/go-rabbitmq v0.12.4 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b641eba --- /dev/null +++ b/go.sum @@ -0,0 +1,49 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo= +github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/wagslane/go-rabbitmq v0.12.4 h1:dxpmTew/wrBlltcu9kBZNTVftT7tsguF4n4IAawK2d8= +github.com/wagslane/go-rabbitmq v0.12.4/go.mod h1:1sUJ53rrW2AIA7LEp8ymmmebHqqq8ksH/gXIfUP0I0s= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..b806365 --- /dev/null +++ b/main.go @@ -0,0 +1,196 @@ +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "strings" + "syscall" + "time" + + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +const appnameFull string = "RuhNet RabbitHunter AMQP Utility" +const appname string = "rabbithunter" +const version string = "0.3" +const serverVersion string = "RuhNet " + appname + " v" + version +const website string = "https://ruhnet.co" + +var startupTime time.Time +var myHostname, myHostnameShort string + +const banner = ` + ____ __ _ _ __ + / ___\ __ __/ /_ / \ / /__ __/ /_ + / /_/ // /_/ / _ \/ / \/ //__\_ __/ +/_/ \_\\ ___/_/ /_/_/ \__/ \__,/_/ %s +_____________________________________________________ +` + +var done = make(chan bool, 1) + +var msgCatFilters []string +var msgNameFilters []string +var msgAppNameFilters []string + +type AppConfig struct { + AmqpURI string `json:"amqp_uri" env:"AMQP_URI" default:"amqp://guest:guest@localhost:5672"` + AmqpSubExch string `json:"amqp_sub_exchange" env:"AMQP_SUB_EXCH" default:"callevt"` + AmqpPubExch string `json:"amqp_pub_exchange" env:"AMQP_PUB_EXCH" default:"pushes"` + AmqpExchType string `json:"amqp_exchange_type" env:"AMQP_EXCH_TYPE" default:"topic"` + AmqpSubRoutingKey string `json:"amqp_sub_routing_key" env:"AMQP_SUB_ROUTING_KEY" default:"call.*.*"` + AmqpPubRoutingKey string `json:"amqp_pub_routing_key" env:"AMQP_PUB_ROUTING_KEY" default:"notification.push.customapp.test"` + AmqpWorkers int `json:"amqp_workers" env:"AMQP_WORKERS" default:"2"` + LogFile string `json:"log_file" env:"LOG_FILE" default:"/tmp/rabbithunter.log"` + LogLevel int `json:"log_level" env:"LOG_LEVEL" default:"5"` + PubMessageFile string `json:"pub_message_file" env:"PUB_MSG_FILE" default:"./message.json"` + FilterEvtCat string `json:"filter_event_category" env:"FLT_EVT_CAT" default:"*"` //allow comma separated string for multiple + FilterEvtName string `json:"filter_event_name" env:"FLT_EVT_NAME" default:"*"` //allow comma separated string for multiple + FilterEvtAppName string `json:"filter_event_appname" env:"FLT_EVT_APPNAME" default:"*"` //allow comma separated string for multiple +} + +func main() { + var err error + startupTime = time.Now() + + ///////////////////////////////////////////// + // Setup + initConfig() + + fmt.Println("Configuration OK, starting " + appname + "...") + logit(5, "Configuration OK, starting "+appname+"...") + + ///////////////////////////////////////////// + // Logging + appLogFile, err := os.OpenFile(appconf.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0664) + if err != nil { + log.Println("Could not open log file: " + appconf.LogFile + "\n" + err.Error()) + appLogFile, err = os.OpenFile("/tmp/"+appname+".log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0664) + if err != nil { + log.Fatal("Can't open even /tmp log file!\n" + err.Error()) + } + } + defer appLogFile.Close() + //set other logging to same file + log.SetOutput(appLogFile) + + //Startup Banner + fmt.Printf(banner, website) + fmt.Println(serverVersion + "\n") + + logit(5, appname+"v"+version+" starting up... ") + logit(5, fmt.Sprintf("Logging with level: %d", appconf.LogLevel)) + + //Hostname + myHostname, err = os.Hostname() + logit(5, "Detecting my hostname... "+myHostname) + if err != nil { + log.Fatal("Hostname could not be auto-detected from system: " + err.Error()) + } + myHostnameShort = strings.SplitN(myHostname, ".", 2)[0] + + //fmt.Println() + + ///////////////////////////////////////////// + // RabbitMQ Setup Connection + fmt.Println("Connecting to RabbitMQ: " + appconf.AmqpURI) + logit(6, "Connecting to RabbitMQ: "+appconf.AmqpURI) + amqpConn, err := rabbitmq.NewConn( + appconf.AmqpURI, + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + fmt.Println("Unable to initialize RabbitMQ connection: " + err.Error()) + log.Fatal("Unable to initialize RabbitMQ connection: " + err.Error()) + } + defer amqpConn.Close() + + ///////////////////////////////////////////// + // RabbitMQ Setup Consumer + msgConsumer, err = rabbitmq.NewConsumer( + amqpConn, + handleAmqpMsg, //this is the function that we want to call to consume presence messages + "q_rabbithunter_"+myHostnameShort, + rabbitmq.WithConsumerOptionsExchangeName(appconf.AmqpSubExch), + rabbitmq.WithConsumerOptionsExchangeKind(appconf.AmqpExchType), + rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpSubRoutingKey), + rabbitmq.WithConsumerOptionsConsumerName("consumer_rabbithunter_"+myHostnameShort), + rabbitmq.WithConsumerOptionsQueueAutoDelete, + rabbitmq.WithConsumerOptionsConcurrency(appconf.AmqpWorkers), + //rabbitmq.WithConsumerOptionsQuorum, + //rabbitmq.WithConsumerOptionsQueueDurable, + //rabbitmq.WithConsumerOptionsExchangeDeclare, + //rabbitmq.WithConsumerOptionsBindingExchangeDurable, + ) + + if err != nil { + fmt.Println("Unable to initialize RabbitMQ consumer: " + err.Error()) + log.Fatal("Unable to initialize RabbitMQ consumer: " + err.Error()) + } + defer msgConsumer.Close() + + logit(6, "Consuming on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpSubExch+"' with routing key: '"+appconf.AmqpSubRoutingKey+"' using queue: 'consumer_rabbithunter_"+myHostnameShort+"'.") + + ///////////////////////////////////////////// + // RabbitMQ Setup Publisher + publisher, err = rabbitmq.NewPublisher( + amqpConn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeKind(appconf.AmqpExchType), + rabbitmq.WithPublisherOptionsExchangeName(appconf.AmqpPubExch), + //rabbitmq.WithPublisherOptionsExchangeDeclare, + ) + if err != nil { + fmt.Println("Unable to initialize RabbitMQ publisher: " + err.Error()) + log.Fatal("Unable to initialize RabbitMQ publisher: " + err.Error()) + } + defer publisher.Close() + + logit(7, "Publisher configured on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpPubExch+"' with routing key: '"+appconf.AmqpPubRoutingKey+"'.") + + publisher.NotifyReturn(func(r rabbitmq.Return) { + logit(4, fmt.Sprintf("RabbitMQ published message returned from server: %s\n", string(r.Body))) + }) + publisher.NotifyPublish(func(c rabbitmq.Confirmation) { + logit(7, fmt.Sprintf("Message confirmed from RabbitMQ server. tag: %v, ack: %v\n", c.DeliveryTag, c.Ack)) + }) + + ///////////////////////////////////////////// + // Message Filters + msgCatFilters = strings.Split(appconf.FilterEvtCat, ",") + msgNameFilters = strings.Split(appconf.FilterEvtName, ",") + msgAppNameFilters = strings.Split(appconf.FilterEvtAppName, ",") + fmt.Println("Filtering event app names matching:", msgAppNameFilters) + fmt.Println("Filtering event categories matching:", msgCatFilters) + fmt.Println("Filtering event names matching:", msgNameFilters) + + ///////////////////////////////////////////// + logit(6, "Using "+fmt.Sprintf("%d", appconf.AmqpWorkers)+" concurrent workers to process AMQP messages.") + + log.Println(appnameFull + " system started.\n-----> [READY]") + fmt.Println(appnameFull + " system started.\n-----> [READY]") + + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigs + fmt.Printf("Received signal: ") + fmt.Println(sig) + done <- true + }() + + publishHandler() //this func will block the main thread, until receiving on the 'done' channel. +} + +func logit(level int, message string) { + if appconf.LogLevel >= int(level) { + log.Println(message) + } + if appconf.LogLevel >= 8 { + fmt.Println(message) + } +} diff --git a/message.json b/message.json new file mode 100644 index 0000000..daf06ba --- /dev/null +++ b/message.json @@ -0,0 +1,27 @@ +{ + "Account-ID": "52e0c76bfa7a40a1cde6d959921d6218", + "To": "5026", + "Alert-Key": "IC_SIL", + "Alert-Params": [ + "5032 - 5032 5032" + ], + "App-Name": "kamailio", + "App-Version": "5.5.2", + "Call-ID": "48617fa3-55aa-44a0-9465-4606ac0d0c2c", + "Event-Category": "notification", + "Event-Name": "push_req", + "Msg-ID": "1f8a6478-e32e-48a6-b5d1-80416cd4a916", + "Node": "kamailio@ke-app1.domain.com", + "Payload": { + "call-id": "48617fa3-55aa-44a0-9465-4606ac0d0c2c", + "caller-id-name": "John Smith", + "caller-id-number": "4321", + "proxy": "sip:10.150.0.2:7000", + "registration-token": "f3651786-1a48-454d-a049-f03446ebadc8" + }, + "Server-ID": "kamailio@ke-app1.domain.com-default<83.8790.22>", + "Sound": "ring.caf", + "Token-App": "io.domain.com.app", + "Token-ID": "121212", + "Token-Type": "domain.com" +} diff --git a/rabbithunter_config.json.sample b/rabbithunter_config.json.sample new file mode 100644 index 0000000..b878275 --- /dev/null +++ b/rabbithunter_config.json.sample @@ -0,0 +1,20 @@ +//rabbithunter_config.json +//Commented lines represent default values. +//You may also set any config parameters via the environment variables listed. + +{ +// "amqp_uri":"amqp://guest:guest@localhost:5672", // env AMQP_URI +// "amqp_sub_exchange":"callevt", // env AMQP_SUB_EXCH +// "amqp_pub_exchange":"pushes", // env AMQP_PUB_EXCH +// "amqp_exchange_type":"topic", // env AMQP_EXCH_TYPE +// "amqp_sub_routing_key":"call.*.*", // env AMQP_SUB_ROUTING_KEY +// "amqp_pub_routing_key":"notification.push.customapp.test", // env AMQP_PUB_ROUTING_KEY +// "amqp_workers":2, // env AMQP_WORKERS +// "pub_message_file":"./message.json", // env PUB_MSG_FILE -- send signal USR2 to publish +// "filter_event_category":"*", // env FLT_EVT_CAT -- filter event category - comma separate +// "filter_event_name":"*", // env FLT_EVT_NAME -- filter event name - comma separate +// "filter_event_appname":"*", // env FLT_EVT_APPNAME -- filter event app name - comma separate +// "log_file":"/tmp/rabbithunter.log", // env LOG_FILE +// "log_level":5 // env LOG_LEVEL (default 5/Notice) +} +