|
|
//LEAPI - ACME Certificate Renewal Control API - Copyright 2022-2025 Ruel Tmeizeh All Rights Reserved
|
|
|
package main
|
|
|
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"log"
|
|
|
"os"
|
|
|
"os/signal"
|
|
|
"syscall"
|
|
|
"time"
|
|
|
|
|
|
"github.com/google/uuid"
|
|
|
rabbitmq "github.com/wagslane/go-rabbitmq"
|
|
|
)
|
|
|
|
|
|
var publisher *rabbitmq.Publisher
|
|
|
var consumer *rabbitmq.Consumer
|
|
|
var amqpExchange string = "nexus"
|
|
|
var routingKey string = "nexus.dns_challenge_request"
|
|
|
var routingKeyConsume string = "nexus.leapi"
|
|
|
|
|
|
type KzMessage struct {
|
|
|
MsgId string `json:"Msg-ID"`
|
|
|
AppName string `json:"App-Name"`
|
|
|
AppVersion string `json:"App-Version"`
|
|
|
EventName string `json:"Event-Name"`
|
|
|
EventCategory string `json:"Event-Category"`
|
|
|
ServerId string `json:"Server-ID"`
|
|
|
Node string `json:"Node,omitempty"`
|
|
|
Server string `json:"Server,omitempty"`
|
|
|
Domain string `json:"Domain,omitempty"` //e.g. pbx.company.com
|
|
|
ChallengeDomain string `json:"Challenge-Domain,omitempty"` //usually a subdomain like _acme-challenge.pbx.company.com
|
|
|
ChallengePhrase string `json:"Challenge-Phrase,omitempty"`
|
|
|
Servers string `json:"servers,omitempty"`
|
|
|
Domains string `json:"domains,omitempty"`
|
|
|
}
|
|
|
|
|
|
func handleAmqpMsg(d rabbitmq.Delivery) rabbitmq.Action {
|
|
|
if appconf.Debug {
|
|
|
log.Println("AMQP message received: " + string(d.Body))
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
var msg KzMessage
|
|
|
err := json.Unmarshal(d.Body, &msg)
|
|
|
if err != nil {
|
|
|
log.Println("handleAmqpMsg(): Error unmarshalling AMQP message into map[string]interface{}...discarding. Message body: " + string(d.Body) + "\nUnmarshalling error: " + err.Error())
|
|
|
return rabbitmq.NackDiscard
|
|
|
}
|
|
|
|
|
|
err := kazooDnsPublish(ta.Domain, input.Data.ChallengeText, "request")
|
|
|
if err != nil {
|
|
|
log.Println(err.Error())
|
|
|
return c.JSON(errorOut(http.StatusInternalServerError, "Error publishing DNS ADD message."+err.Error()))
|
|
|
}
|
|
|
*/
|
|
|
|
|
|
return rabbitmq.Ack
|
|
|
}
|
|
|
|
|
|
func kazooDnsPublish(domain, challenge, eventName string) error {
|
|
|
dnsmsg := KzMessage{
|
|
|
MsgId: uuid.New().String(),
|
|
|
AppName: appname,
|
|
|
AppVersion: version,
|
|
|
EventCategory: "dns_challenge",
|
|
|
EventName: eventName, //"request" or "delete"
|
|
|
ServerId: appname + "@" + appconf.Hostname,
|
|
|
Domain: domain,
|
|
|
ChallengeDomain: "_acme-challenge." + domain,
|
|
|
ChallengePhrase: challenge,
|
|
|
//Node: myHostname,
|
|
|
}
|
|
|
|
|
|
dnsmsg_json, err := json.Marshal(dnsmsg)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
err = publisher.Publish(dnsmsg_json,
|
|
|
[]string{routingKey},
|
|
|
rabbitmq.WithPublishOptionsContentType("application/json"),
|
|
|
rabbitmq.WithPublishOptionsExchange(amqpExchange),
|
|
|
)
|
|
|
//return errors.New("Error publishing to exchange '" + amqpExchange + "' with routing key '" + routingKey + "': \n" + string(dnsmsg_json) + " ERROR: " + err.Error())
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
func amqp() {
|
|
|
/////////////////////////////////////////////
|
|
|
// RabbitMQ Setup Connection
|
|
|
log.Println("Connecting to RabbitMQ: " + appconf.AmqpURI)
|
|
|
fmt.Println("Connecting to RabbitMQ: " + appconf.AmqpURI)
|
|
|
amqpConn, err := rabbitmq.NewConn(
|
|
|
appconf.AmqpURI,
|
|
|
rabbitmq.WithConnectionOptionsLogging,
|
|
|
)
|
|
|
if err != nil {
|
|
|
fmt.Println("Unable to initialize RabbitMQ connection: " + err.Error())
|
|
|
log.Fatal("Unable to initialize RabbitMQ connection: " + err.Error())
|
|
|
}
|
|
|
defer amqpConn.Close()
|
|
|
|
|
|
/////////////////////////////////////////////
|
|
|
// RabbitMQ Setup Consumer
|
|
|
log.Println("Starting AMQP consumer...")
|
|
|
consumer, err = rabbitmq.NewConsumer(
|
|
|
amqpConn,
|
|
|
"q_"+appname+"_"+appconf.Hostname,
|
|
|
rabbitmq.WithConsumerOptionsExchangeName(amqpExchange),
|
|
|
rabbitmq.WithConsumerOptionsExchangeKind("topic"),
|
|
|
rabbitmq.WithConsumerOptionsRoutingKey(routingKeyConsume),
|
|
|
rabbitmq.WithConsumerOptionsConsumerName("consumer_"+appname+"_"+appconf.Hostname),
|
|
|
rabbitmq.WithConsumerOptionsQueueAutoDelete,
|
|
|
rabbitmq.WithConsumerOptionsConcurrency(2),
|
|
|
//rabbitmq.WithConsumerOptionsQuorum,
|
|
|
//rabbitmq.WithConsumerOptionsQueueDurable,
|
|
|
//rabbitmq.WithConsumerOptionsExchangeDeclare,
|
|
|
//rabbitmq.WithConsumerOptionsBindingExchangeDurable,
|
|
|
)
|
|
|
if err != nil {
|
|
|
log.Fatal("Unable to initialize RabbitMQ consumer: " + err.Error())
|
|
|
}
|
|
|
defer consumer.Close()
|
|
|
|
|
|
go func() {
|
|
|
err = consumer.Run(handleAmqpMsg) //this is the function that we want to call to consume presence messages
|
|
|
if err != nil {
|
|
|
log.Fatal("Unable to start/run RabbitMQ consumer: " + err.Error())
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
log.Println("Consuming on topic exchange: '" + amqpExchange + "' with routing key: '" + routingKeyConsume + "' using queue: 'consumer_" + appname + "_" + appconf.Hostname + "'.")
|
|
|
|
|
|
/////////////////////////////////////////////
|
|
|
// RabbitMQ Setup Publisher
|
|
|
log.Println("Starting AMQP publisher...")
|
|
|
publisher, err = rabbitmq.NewPublisher(
|
|
|
amqpConn,
|
|
|
rabbitmq.WithPublisherOptionsLogging,
|
|
|
rabbitmq.WithPublisherOptionsExchangeKind("topic"),
|
|
|
rabbitmq.WithPublisherOptionsExchangeName(amqpExchange),
|
|
|
rabbitmq.WithPublisherOptionsExchangeDeclare,
|
|
|
)
|
|
|
if err != nil {
|
|
|
log.Fatal("Unable to initialize RabbitMQ publisher: " + err.Error())
|
|
|
}
|
|
|
defer publisher.Close()
|
|
|
|
|
|
log.Println("AMQP publisher configured on topic exchange: '" + amqpExchange + "' with routing key: '" + routingKey + "'.")
|
|
|
|
|
|
publisher.NotifyReturn(func(r rabbitmq.Return) {
|
|
|
log.Println(fmt.Sprintf("RabbitMQ published message returned from server: %s\n", string(r.Body)))
|
|
|
})
|
|
|
publisher.NotifyPublish(func(c rabbitmq.Confirmation) {
|
|
|
log.Println(fmt.Sprintf("Message confirmed from RabbitMQ server. tag: %v, ack: %v\n", c.DeliveryTag, c.Ack))
|
|
|
})
|
|
|
|
|
|
/////////////////////////////////////////////
|
|
|
// RabbitMQ Publish Test
|
|
|
sigch := make(chan os.Signal, 1)
|
|
|
signal.Notify(sigch, syscall.SIGUSR2)
|
|
|
|
|
|
//create test message to send
|
|
|
pubtest := KzMessage{
|
|
|
MsgId: uuid.New().String(),
|
|
|
AppName: appname,
|
|
|
AppVersion: version,
|
|
|
EventCategory: "dns_challenge",
|
|
|
EventName: "request",
|
|
|
ServerId: appname + "@" + appconf.Hostname,
|
|
|
Domain: "example.com",
|
|
|
ChallengeDomain: "_acme-challenge.pbx.example.com",
|
|
|
ChallengePhrase: "01234abcdef98765fedcba",
|
|
|
//Node: myHostname,
|
|
|
}
|
|
|
|
|
|
msg_pubtest, err := json.Marshal(pubtest)
|
|
|
if err != nil {
|
|
|
log.Println("Unable to marshal pubtest into JSON! Published test message will be blank.")
|
|
|
}
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
case sig := <-sigch:
|
|
|
log.Println("Received signal " + fmt.Sprintf("%s", sig) + " on pubch channel. Publishing test message... ")
|
|
|
if sig == syscall.SIGUSR2 {
|
|
|
msgFromFile, err := readJsonFile(appconf.PubMessageFile)
|
|
|
if err == nil {
|
|
|
msg_pubtest = msgFromFile
|
|
|
}
|
|
|
log.Println("Publishing test message to exchange '" + amqpExchange + "' with routing key '" + routingKey + "': \n" + string(msg_pubtest))
|
|
|
err = publisher.Publish(msg_pubtest,
|
|
|
[]string{routingKey},
|
|
|
rabbitmq.WithPublishOptionsContentType("application/json"),
|
|
|
rabbitmq.WithPublishOptionsExchange(amqpExchange),
|
|
|
)
|
|
|
if err != nil {
|
|
|
log.Println("Error publishing message: " + err.Error())
|
|
|
} else {
|
|
|
log.Println("Message PUBLISHED.")
|
|
|
}
|
|
|
}
|
|
|
case <-time.After(100 * time.Millisecond):
|
|
|
//case <-time.After(time.Second):
|
|
|
//fmt.Println("tick")
|
|
|
}
|
|
|
}
|
|
|
}
|