package rabbitmq import ( "bytes" "encoding/json" "fmt" "log" amqp "github.com/rabbitmq/amqp091-go" ) type Message map[string]interface{} func Serialize(msg any) (string, error) { // FIXME move to separate service var b bytes.Buffer encoder := json.NewEncoder(&b) err := encoder.Encode(msg) return b.String(), err } func Deserialize(b []byte) (Message, error) { // FIXME move to separate service var msg Message buf := bytes.NewBuffer(b) decoder := json.NewDecoder(buf) err := decoder.Decode(&msg) return msg, err } func NewExchange(chn *amqp.Channel, name string) error { err := chn.ExchangeDeclare( name, "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err != nil { return err } return nil } func Publish(chn *amqp.Channel, name, routingKey string, msg any) error { jsonData, err := Serialize(msg) if err != nil { return err } msgBody := fmt.Sprintf(`{"event":"%T","data":%s}`, msg, jsonData) chn.Publish( name, // exchange name routingKey, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: []byte(msgBody), }, ) return nil } func BindQueueToExchange(chn *amqp.Channel, queueName, exchName, routingKey string) error { err := chn.QueueBind( queueName, // queue name routingKey, // routing key exchName, // exchange name false, nil, ) if err != nil { log.Printf("Failed to bind a queue: %s\n", queueName) return err } return nil }