commit 0315032b26d8f78a2d6ce282f051e8b44c4f45a0 Author: Ruel Tmeizeh - RuhNet Date: Sat Dec 23 18:02:59 2023 -0500 RabbitHunter v0.3 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d4631e1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +rabbithunter_config.json +rabbithunter +*.swp + + + diff --git a/amqpconsume.go b/amqpconsume.go new file mode 100644 index 0000000..8ccd923 --- /dev/null +++ b/amqpconsume.go @@ -0,0 +1,59 @@ +package main + +import ( + "encoding/json" + "fmt" + + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +var msgConsumer *rabbitmq.Consumer + +type KzMessage struct { + AccountId string `json:"Account-ID"` + AppName string `json:"App-Name"` + AppVersion string `json:"App-Version"` + CallId string `json:"Call-ID"` + EventCategory string `json:"Event-Category"` + EventName string `json:"Event-Name"` + MsgId string `json:"Msg-ID"` + Node string `json:"Node"` + ServerId string `json:"Server-ID"` + Description string `json:"Msg-Description"` + To string `json:"To"` + Payload map[string]interface{} `json:"Payload"` +} + +func handleAmqpMsg(d rabbitmq.Delivery) rabbitmq.Action { + // rabbitmq.Ack, rabbitmq.NackDiscard, rabbitmq.NackRequeue + logit(7, "AMQP message received: "+string(d.Body)) + + if appconf.FilterEvtCat == "*" && appconf.FilterEvtName == "*" && appconf.FilterEvtAppName == "*" { + fmt.Println(string(d.Body)) + return rabbitmq.Ack + } + + var msg KzMessage + + err := json.Unmarshal(d.Body, &msg) + if err != nil { + logit(5, "handleAmqpMsg(): Error unmarshalling AMQP message into map[string]interface{}...discarding. Message body: "+string(d.Body)+"\nUnmarshalling error: "+err.Error()) + return rabbitmq.NackDiscard + } + + for _, appname := range msgAppNameFilters { + if appconf.FilterEvtAppName == "*" || appname == msg.AppName { //only print messges that match a filter, or any if the filter is "*" + for _, cat := range msgCatFilters { + if appconf.FilterEvtCat == "*" || cat == msg.EventCategory { + for _, name := range msgNameFilters { + if appconf.FilterEvtName == "*" || name == msg.EventName { + fmt.Println(string(d.Body)) + } + } + } + } + } + } + + return rabbitmq.Ack +} diff --git a/amqppublish.go b/amqppublish.go new file mode 100644 index 0000000..6909501 --- /dev/null +++ b/amqppublish.go @@ -0,0 +1,98 @@ +package main + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "os/signal" + "regexp" + "syscall" + "time" + + "github.com/google/uuid" + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +var publisher *rabbitmq.Publisher + +func publishHandler() { + pubch := make(chan os.Signal, 1) + signal.Notify(pubch, syscall.SIGUSR1, syscall.SIGUSR2) + + //create test message to send + ptest := KzMessage{ + EventCategory: "notification", + EventName: "push_req", + AccountId: "12a0f57aec7a40a1ccf6d959521d5682", + AppName: appname, + AppVersion: version, + CallId: uuid.New().String(), + MsgId: uuid.New().String(), + Node: myHostname, + ServerId: appname + "@" + myHostname, + Description: "Test message from RuhNet RabbitHunter", + To: "1234", + } + + msg1, err := json.Marshal(ptest) + if err != nil { + logit(3, "Unable to marshal ptest into JSON! Published test message will be blank.") + } + + msg2 := []byte(`{"test":true, "message":"test from ` + appnameFull + `","node":"` + myHostname + `"}`) //default second message + + for { + select { + case sig := <-pubch: + logit(5, "Received signal "+fmt.Sprintf("%s", sig)+" on pubch channel. Publishing message... ") + var msg []byte + switch sig { + case syscall.SIGUSR1: + msg = msg1 + case syscall.SIGUSR2: + msg = msg2 + msgFromFile, err := readPubMessageFile(appconf.PubMessageFile) + if err == nil { + msg = msgFromFile + } + } + logit(7, "Publishing message to exchange '"+appconf.AmqpPubExch+"' with routing key '"+appconf.AmqpPubRoutingKey+"': \n"+string(msg)) + err := publisher.Publish(msg, + []string{appconf.AmqpPubRoutingKey}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch), + //rabbitmq.WithPublishOptionsExchange("amq.fanout"), + ) + if err != nil { + logit(3, "Error publishing message: "+err.Error()) + fmt.Println("Error publishing message: " + err.Error()) + } else { + fmt.Println("PUBLISHED: \n" + string(msg) + "\n") + } + case <-done: + fmt.Println("Shutting myself down...") + return + case <-time.After(100 * time.Millisecond): + //case <-time.After(time.Second): + //fmt.Println("tick") + } + } + +} + +func readPubMessageFile(filePath string) (msg []byte, err error) { + jsonFile, err := os.Open(filePath) + if err != nil { + logit(4, "Could not open config file: "+filePath+"\n"+err.Error()) + return msg, err + } + defer jsonFile.Close() + fileBytes, _ := ioutil.ReadAll(jsonFile) + + //strip out // comments from file: + re := regexp.MustCompile(`([\s]//.*)|(^//.*)`) + fileCleanedBytes := re.ReplaceAll(fileBytes, nil) + + return fileCleanedBytes, err +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..851cfce --- /dev/null +++ b/config.go @@ -0,0 +1,151 @@ +package main + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "log" + "os" + "reflect" + "regexp" + "strconv" +) + +var appconf *AppConfig + +func initConfig() { + //Read config: + configFilename := appname + "_config.json" + configDir := os.Getenv("CONFDIR") + if configDir == "" { + confDirs := []string{ + "/opt/" + appname, + "/opt/" + appname + "/etc", + "/usr/lib/" + appname, + "/usr/lib/" + appname + "/etc", + "/var/lib/" + appname, + "/var/lib/" + appname + "/etc", + "/usr/local/etc", + "/etc", + ".", + } + configDir = "." //the fallback + for _, cd := range confDirs { + if _, err := os.Stat(cd + "/" + configFilename); os.IsNotExist(err) { //doesn't exist... + continue //..so check next one + } + configDir = cd + } + } + configFile := configDir + "/" + configFilename + jsonFile, err := os.Open(configFile) + if err != nil { + log.Println("Could not open config file: " + configFile + "\n" + err.Error()) + fmt.Println("Could not open config file: " + configFile + "\n" + err.Error()) + } else { + defer jsonFile.Close() + fileBytes, _ := ioutil.ReadAll(jsonFile) + + //strip out // comments from config file: + re := regexp.MustCompile(`([\s]//.*)|(^//.*)`) + fileCleanedBytes := re.ReplaceAll(fileBytes, nil) + + err = json.Unmarshal(fileCleanedBytes, &appconf) //populate the config struct with JSON data from the config file + if err != nil { + log.Fatal("Could not parse config file: " + configFile + "\n" + err.Error()) + } + } + + appconf.checkConfig(configFilename) +} + +func (f *AppConfig) checkConfig(configFileName string) { + var invalid bool + + s := reflect.ValueOf(f).Elem() //the reflected struct + for i := 0; i < s.NumField(); i++ { //NumField() returns the number of fields in the struct + fieldValue := s.Field(i) //value of this i'th field + t := s.Type().Field(i) + //fmt.Println(t.Name + fmt.Sprintf(" is of kind: %d", t.Type.Kind())) + if fieldValue.Interface() == "" || fieldValue.Interface() == nil || fieldValue.Interface() == 0 || (t.Type.Kind() != reflect.Bool && fieldValue.IsZero()) { //field is not set already + //fmt.Println(t.Name + " is empty or zero.") + if t.Type.Kind() == reflect.String || t.Type.Kind() == reflect.Bool || t.Type.Kind() == reflect.Float64 || t.Type.Kind() == reflect.Int64 || t.Type.Kind() == reflect.Int { + if !fieldValue.CanSet() { + log.Printf("Config item '%s' cannot be set!\n", t.Name) + invalid = true + } else { + env, ok := os.LookupEnv(t.Tag.Get("env")) + if ok && len(env) > 0 { + //fmt.Println("ENV: " + t.Tag.Get("env") + " is found and is: " + env + " Setting...") + if err := setField(fieldValue, env); err != nil { + invalid = true + log.Println("Error setting '" + t.Name + "' to env '" + env + "'. Error: " + err.Error()) + fmt.Println("Error setting '" + t.Name + "' to env '" + env + "'. Error: " + err.Error()) + } else { + continue + } + } else { //env not found + //fmt.Println("ENV: '" + t.Tag.Get("env") + "' is NOT FOUND; checking for default value...") + // Look for user-defined default value + dflt, ok := t.Tag.Lookup("default") + //fmt.Println("DEFAULT is: " + dflt) + if ok { + if err := setField(fieldValue, dflt); err != nil { + log.Println("Error setting '" + t.Name + "' to default '" + dflt + "'. Error: " + err.Error()) + fmt.Println("Error setting '" + t.Name + "' to default '" + dflt + "'. Error: " + err.Error()) + invalid = true + } else { + continue + } + } + } + } + } else { + log.Printf("Config item '%s' of type %s cannot be set using environment variable %s.", s.Type().Field(i).Name, fieldValue.Type(), s.Type().Field(i).Tag.Get("env")) + invalid = true + } + + if !invalid { + log.Println("===========| ERRORS IN '" + configFileName + "' CONFIG FILE: |===========") + fmt.Println("--------------------------------------------------------------------------------") + fmt.Println(" ===========| ERRORS IN '" + configFileName + "' CONFIG FILE: |===========") + fmt.Println("") + } + invalid = true + log.Printf(" - Required config item '%s' and/or environment variable '%s' is missing or invalid.\n", t.Tag.Get("json"), t.Tag.Get("env")) + fmt.Printf(" - Required config item '%s' and/or environment variable '%s' is missing or invalid.\n", t.Tag.Get("json"), t.Tag.Get("env")) + } + } + if invalid { + fmt.Println("--------------------------------------------------------------------------------") + log.Fatal("Exiting!") + } +} + +func setField(fieldValue reflect.Value, value string) (err error) { + switch fieldValue.Kind() { + case reflect.Bool: + var b bool + if b, err = strconv.ParseBool(value); err != nil { + return err + } + fieldValue.SetBool(b) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + var i int64 + if i, err = strconv.ParseInt(value, 0, 64); err != nil { + return err + } + fieldValue.SetInt(int64(i)) + case reflect.Float32, reflect.Float64: + var f float64 + if f, err = strconv.ParseFloat(value, 64); err != nil { + return err + } + fieldValue.SetFloat(f) + case reflect.String: + fieldValue.SetString(value) + default: + return fmt.Errorf("%s is not a supported config type. Please use bool, float64 float32, int64, int, or string.", fieldValue.Kind()) + } + return +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cf69227 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module rabbithunter + +go 1.16 + +require ( + github.com/google/uuid v1.4.0 // indirect + github.com/streadway/amqp v1.0.0 // indirect + github.com/wagslane/go-rabbitmq v0.12.4 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b641eba --- /dev/null +++ b/go.sum @@ -0,0 +1,49 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo= +github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/wagslane/go-rabbitmq v0.12.4 h1:dxpmTew/wrBlltcu9kBZNTVftT7tsguF4n4IAawK2d8= +github.com/wagslane/go-rabbitmq v0.12.4/go.mod h1:1sUJ53rrW2AIA7LEp8ymmmebHqqq8ksH/gXIfUP0I0s= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..b806365 --- /dev/null +++ b/main.go @@ -0,0 +1,196 @@ +package main + +import ( + "fmt" + "log" + "os" + "os/signal" + "strings" + "syscall" + "time" + + rabbitmq "github.com/wagslane/go-rabbitmq" +) + +const appnameFull string = "RuhNet RabbitHunter AMQP Utility" +const appname string = "rabbithunter" +const version string = "0.3" +const serverVersion string = "RuhNet " + appname + " v" + version +const website string = "https://ruhnet.co" + +var startupTime time.Time +var myHostname, myHostnameShort string + +const banner = ` + ____ __ _ _ __ + / ___\ __ __/ /_ / \ / /__ __/ /_ + / /_/ // /_/ / _ \/ / \/ //__\_ __/ +/_/ \_\\ ___/_/ /_/_/ \__/ \__,/_/ %s +_____________________________________________________ +` + +var done = make(chan bool, 1) + +var msgCatFilters []string +var msgNameFilters []string +var msgAppNameFilters []string + +type AppConfig struct { + AmqpURI string `json:"amqp_uri" env:"AMQP_URI" default:"amqp://guest:guest@localhost:5672"` + AmqpSubExch string `json:"amqp_sub_exchange" env:"AMQP_SUB_EXCH" default:"callevt"` + AmqpPubExch string `json:"amqp_pub_exchange" env:"AMQP_PUB_EXCH" default:"pushes"` + AmqpExchType string `json:"amqp_exchange_type" env:"AMQP_EXCH_TYPE" default:"topic"` + AmqpSubRoutingKey string `json:"amqp_sub_routing_key" env:"AMQP_SUB_ROUTING_KEY" default:"call.*.*"` + AmqpPubRoutingKey string `json:"amqp_pub_routing_key" env:"AMQP_PUB_ROUTING_KEY" default:"notification.push.customapp.test"` + AmqpWorkers int `json:"amqp_workers" env:"AMQP_WORKERS" default:"2"` + LogFile string `json:"log_file" env:"LOG_FILE" default:"/tmp/rabbithunter.log"` + LogLevel int `json:"log_level" env:"LOG_LEVEL" default:"5"` + PubMessageFile string `json:"pub_message_file" env:"PUB_MSG_FILE" default:"./message.json"` + FilterEvtCat string `json:"filter_event_category" env:"FLT_EVT_CAT" default:"*"` //allow comma separated string for multiple + FilterEvtName string `json:"filter_event_name" env:"FLT_EVT_NAME" default:"*"` //allow comma separated string for multiple + FilterEvtAppName string `json:"filter_event_appname" env:"FLT_EVT_APPNAME" default:"*"` //allow comma separated string for multiple +} + +func main() { + var err error + startupTime = time.Now() + + ///////////////////////////////////////////// + // Setup + initConfig() + + fmt.Println("Configuration OK, starting " + appname + "...") + logit(5, "Configuration OK, starting "+appname+"...") + + ///////////////////////////////////////////// + // Logging + appLogFile, err := os.OpenFile(appconf.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0664) + if err != nil { + log.Println("Could not open log file: " + appconf.LogFile + "\n" + err.Error()) + appLogFile, err = os.OpenFile("/tmp/"+appname+".log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0664) + if err != nil { + log.Fatal("Can't open even /tmp log file!\n" + err.Error()) + } + } + defer appLogFile.Close() + //set other logging to same file + log.SetOutput(appLogFile) + + //Startup Banner + fmt.Printf(banner, website) + fmt.Println(serverVersion + "\n") + + logit(5, appname+"v"+version+" starting up... ") + logit(5, fmt.Sprintf("Logging with level: %d", appconf.LogLevel)) + + //Hostname + myHostname, err = os.Hostname() + logit(5, "Detecting my hostname... "+myHostname) + if err != nil { + log.Fatal("Hostname could not be auto-detected from system: " + err.Error()) + } + myHostnameShort = strings.SplitN(myHostname, ".", 2)[0] + + //fmt.Println() + + ///////////////////////////////////////////// + // RabbitMQ Setup Connection + fmt.Println("Connecting to RabbitMQ: " + appconf.AmqpURI) + logit(6, "Connecting to RabbitMQ: "+appconf.AmqpURI) + amqpConn, err := rabbitmq.NewConn( + appconf.AmqpURI, + rabbitmq.WithConnectionOptionsLogging, + ) + if err != nil { + fmt.Println("Unable to initialize RabbitMQ connection: " + err.Error()) + log.Fatal("Unable to initialize RabbitMQ connection: " + err.Error()) + } + defer amqpConn.Close() + + ///////////////////////////////////////////// + // RabbitMQ Setup Consumer + msgConsumer, err = rabbitmq.NewConsumer( + amqpConn, + handleAmqpMsg, //this is the function that we want to call to consume presence messages + "q_rabbithunter_"+myHostnameShort, + rabbitmq.WithConsumerOptionsExchangeName(appconf.AmqpSubExch), + rabbitmq.WithConsumerOptionsExchangeKind(appconf.AmqpExchType), + rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpSubRoutingKey), + rabbitmq.WithConsumerOptionsConsumerName("consumer_rabbithunter_"+myHostnameShort), + rabbitmq.WithConsumerOptionsQueueAutoDelete, + rabbitmq.WithConsumerOptionsConcurrency(appconf.AmqpWorkers), + //rabbitmq.WithConsumerOptionsQuorum, + //rabbitmq.WithConsumerOptionsQueueDurable, + //rabbitmq.WithConsumerOptionsExchangeDeclare, + //rabbitmq.WithConsumerOptionsBindingExchangeDurable, + ) + + if err != nil { + fmt.Println("Unable to initialize RabbitMQ consumer: " + err.Error()) + log.Fatal("Unable to initialize RabbitMQ consumer: " + err.Error()) + } + defer msgConsumer.Close() + + logit(6, "Consuming on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpSubExch+"' with routing key: '"+appconf.AmqpSubRoutingKey+"' using queue: 'consumer_rabbithunter_"+myHostnameShort+"'.") + + ///////////////////////////////////////////// + // RabbitMQ Setup Publisher + publisher, err = rabbitmq.NewPublisher( + amqpConn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeKind(appconf.AmqpExchType), + rabbitmq.WithPublisherOptionsExchangeName(appconf.AmqpPubExch), + //rabbitmq.WithPublisherOptionsExchangeDeclare, + ) + if err != nil { + fmt.Println("Unable to initialize RabbitMQ publisher: " + err.Error()) + log.Fatal("Unable to initialize RabbitMQ publisher: " + err.Error()) + } + defer publisher.Close() + + logit(7, "Publisher configured on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpPubExch+"' with routing key: '"+appconf.AmqpPubRoutingKey+"'.") + + publisher.NotifyReturn(func(r rabbitmq.Return) { + logit(4, fmt.Sprintf("RabbitMQ published message returned from server: %s\n", string(r.Body))) + }) + publisher.NotifyPublish(func(c rabbitmq.Confirmation) { + logit(7, fmt.Sprintf("Message confirmed from RabbitMQ server. tag: %v, ack: %v\n", c.DeliveryTag, c.Ack)) + }) + + ///////////////////////////////////////////// + // Message Filters + msgCatFilters = strings.Split(appconf.FilterEvtCat, ",") + msgNameFilters = strings.Split(appconf.FilterEvtName, ",") + msgAppNameFilters = strings.Split(appconf.FilterEvtAppName, ",") + fmt.Println("Filtering event app names matching:", msgAppNameFilters) + fmt.Println("Filtering event categories matching:", msgCatFilters) + fmt.Println("Filtering event names matching:", msgNameFilters) + + ///////////////////////////////////////////// + logit(6, "Using "+fmt.Sprintf("%d", appconf.AmqpWorkers)+" concurrent workers to process AMQP messages.") + + log.Println(appnameFull + " system started.\n-----> [READY]") + fmt.Println(appnameFull + " system started.\n-----> [READY]") + + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigs + fmt.Printf("Received signal: ") + fmt.Println(sig) + done <- true + }() + + publishHandler() //this func will block the main thread, until receiving on the 'done' channel. +} + +func logit(level int, message string) { + if appconf.LogLevel >= int(level) { + log.Println(message) + } + if appconf.LogLevel >= 8 { + fmt.Println(message) + } +} diff --git a/message.json b/message.json new file mode 100644 index 0000000..daf06ba --- /dev/null +++ b/message.json @@ -0,0 +1,27 @@ +{ + "Account-ID": "52e0c76bfa7a40a1cde6d959921d6218", + "To": "5026", + "Alert-Key": "IC_SIL", + "Alert-Params": [ + "5032 - 5032 5032" + ], + "App-Name": "kamailio", + "App-Version": "5.5.2", + "Call-ID": "48617fa3-55aa-44a0-9465-4606ac0d0c2c", + "Event-Category": "notification", + "Event-Name": "push_req", + "Msg-ID": "1f8a6478-e32e-48a6-b5d1-80416cd4a916", + "Node": "kamailio@ke-app1.domain.com", + "Payload": { + "call-id": "48617fa3-55aa-44a0-9465-4606ac0d0c2c", + "caller-id-name": "John Smith", + "caller-id-number": "4321", + "proxy": "sip:10.150.0.2:7000", + "registration-token": "f3651786-1a48-454d-a049-f03446ebadc8" + }, + "Server-ID": "kamailio@ke-app1.domain.com-default<83.8790.22>", + "Sound": "ring.caf", + "Token-App": "io.domain.com.app", + "Token-ID": "121212", + "Token-Type": "domain.com" +} diff --git a/rabbithunter_config.json.sample b/rabbithunter_config.json.sample new file mode 100644 index 0000000..b878275 --- /dev/null +++ b/rabbithunter_config.json.sample @@ -0,0 +1,20 @@ +//rabbithunter_config.json +//Commented lines represent default values. +//You may also set any config parameters via the environment variables listed. + +{ +// "amqp_uri":"amqp://guest:guest@localhost:5672", // env AMQP_URI +// "amqp_sub_exchange":"callevt", // env AMQP_SUB_EXCH +// "amqp_pub_exchange":"pushes", // env AMQP_PUB_EXCH +// "amqp_exchange_type":"topic", // env AMQP_EXCH_TYPE +// "amqp_sub_routing_key":"call.*.*", // env AMQP_SUB_ROUTING_KEY +// "amqp_pub_routing_key":"notification.push.customapp.test", // env AMQP_PUB_ROUTING_KEY +// "amqp_workers":2, // env AMQP_WORKERS +// "pub_message_file":"./message.json", // env PUB_MSG_FILE -- send signal USR2 to publish +// "filter_event_category":"*", // env FLT_EVT_CAT -- filter event category - comma separate +// "filter_event_name":"*", // env FLT_EVT_NAME -- filter event name - comma separate +// "filter_event_appname":"*", // env FLT_EVT_APPNAME -- filter event app name - comma separate +// "log_file":"/tmp/rabbithunter.log", // env LOG_FILE +// "log_level":5 // env LOG_LEVEL (default 5/Notice) +} +