catalog-service/src/internal/worker/worker.go
Piotr Biernat 13c3c386a5 Refactor
2024-12-24 14:22:49 +01:00

208 lines
5.0 KiB
Go

package worker
import (
"crypto/tls"
"fmt"
"log"
"os"
"strings"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v5/pgxpool"
amqp "github.com/rabbitmq/amqp091-go"
"git.ego.cloudns.be/egommerce/go-api-pkg/fluentd"
"git.ego.cloudns.be/egommerce/go-api-pkg/rabbitmq"
"git.ego.cloudns.be/egommerce/catalog-service/internal/event"
"git.ego.cloudns.be/egommerce/catalog-service/internal/service"
)
type (
Worker struct {
ID string
cnf *Config
handlers map[string]any
services map[string]any
doWrkUntil chan struct{}
}
)
func New(c *Config) *Worker {
return &Worker{
ID: c.ID,
cnf: c,
handlers: make(map[string]any),
services: make(map[string]any),
doWrkUntil: make(chan struct{}),
}
}
func (w *Worker) Start() error {
setupQueues(w)
err := w.doWork(w.doWrkUntil)
if err != nil {
log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.ID, err)
close(w.doWrkUntil)
}
<-w.doWrkUntil
return err
// go func() {
// sigint := make(chan os.Signal, 1)
// signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
// <-sigint
// w.Shutdown()
// close(while)
// }()
// run := w.createRunFile("./app.run") // TODO move to common library (shared between server and worker)
// defer w.removeRunFile(run)
// w.Logger.Log("Waiting for messages...")
// return nil
}
func (w *Worker) RegisterHandler(name string, fn func() any) {
// fmt.Printf("Registering plugin( with handler): %s... OK\n", name)
w.handlers[name] = fn()
}
func (w *Worker) UpdateTLSConfig(tls *tls.Config) {
}
func (w *Worker) OnShutdown() {
w.GetLogger().Log("Worker %s is going down...", w.ID)
// fmt.Printf("Worker %s is going down...\n", w.ID)
unbindQueues(w)
w.GetEventBus().Close()
w.GetDatabase().Close()
w.GetLogger().Log("Gone.")
w.GetLogger().Close()
close(w.doWrkUntil)
}
// Plugin helper funcitons
func (w *Worker) GetCache() *redis.Client {
return (w.handlers["cache"]).(*redis.Client)
}
func (w *Worker) GetDatabase() *pgxpool.Pool {
return (w.handlers["database"]).(*pgxpool.Pool)
}
func (w *Worker) GetEventBus() *amqp.Channel {
return (w.handlers["eventbus"]).(*amqp.Channel)
}
func (w *Worker) GetLogger() *fluentd.Logger {
return (w.handlers["logger"]).(*fluentd.Logger)
}
func (w *Worker) doWork(while chan struct{}) error {
w.services["catalog"] =
service.NewCatalogService(w.GetDatabase(), w.GetEventBus(), w.GetLogger())
cSrv := (w.services["catalog"]).(*service.CatalogService)
msgs, err := w.GetEventBus().Consume(
w.cnf.EventBusQueue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
w.GetLogger().Log("Failed to register a consumer: %s", err)
fmt.Printf("Failed to register a consumer: %s", err)
os.Exit(1)
// close(while)
}
go func() {
for d := range msgs {
// go func(d amqp.Delivery) {
w.processMsg(cSrv, d)
// }(d)
}
}()
<-while
return nil
}
func (w *Worker) processMsg(cSrv *service.CatalogService, d amqp.Delivery) {
msg, err := rabbitmq.Deserialize(d.Body)
if err != nil {
w.GetLogger().Log("Deserialization error: %v\n", err)
fmt.Printf("Deserialization error: %v\n", err)
d.Reject(false)
return
}
name := fmt.Sprintf("%s", msg["event"])
data := (msg["data"]).(map[string]interface{})
// reqID := (data["request_id"]).(string) // FIXME Check input params!
w.GetLogger().Log("Processing message \"%s\" with data: %v\n", name, data)
fmt.Printf("Processing message \"%s\" with data: %v\n", name, data)
var ok = false
switch true { // Refactor -> use case for polymorphism
case strings.Contains(name, event.EVENT_WAREHOUSE_STOCK_UPDATED):
w.GetLogger().Log("Event: %s", event.EVENT_WAREHOUSE_STOCK_UPDATED)
}
r := NewCommandRunner(data, cSrv)
ok, _ = r.run(data)
if ok {
w.GetLogger().Log("Successful executed message \"%s\"\n", name)
fmt.Printf("Successful executed message \"%s\"\n", name)
d.Ack(false)
return
}
w.GetLogger().Log("Error processing \"%s\": %v\n", name, err)
fmt.Printf("Error processing \"%s\": %v\n", name, err)
d.Reject(true) // FIXME: or Nack(repeat until success - maybe message shoud know...?
}
func setupQueues(w *Worker) {
err := rabbitmq.NewExchange(w.GetEventBus(), w.cnf.EventBusExchange)
if err != nil {
w.GetLogger().Log("Failed to declare EventBus exchange: %v\n", err)
fmt.Printf("Failed to declare EventBus exchange: %v\n", err)
os.Exit(1)
}
args := amqp.Table{}
args["x-message-ttl"] = 5
_, err = w.GetEventBus().QueueDeclare(
w.cnf.EventBusQueue, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
if err != nil {
w.GetLogger().Log("Failed to declare EventBus queue: %v\n", err)
fmt.Printf("Failed to declare EventBus queue: %v\n", err)
os.Exit(1)
}
}
func unbindQueues(w *Worker) {
}