package worker import ( "fmt" "log" "os" "strings" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v5/pgxpool" amqp "github.com/rabbitmq/amqp091-go" "git.pbiernat.io/egommerce/go-api-pkg/fluentd" "git.pbiernat.io/egommerce/go-api-pkg/rabbitmq" "git.pbiernat.io/egommerce/order-service/internal/event" "git.pbiernat.io/egommerce/order-service/internal/service" ) type ( Worker struct { ID string cnf *Config handlers map[string]any services map[string]any doWrkUntil chan struct{} } ) func New(c *Config) *Worker { return &Worker{ ID: c.ID, cnf: c, handlers: make(map[string]any), services: make(map[string]any), doWrkUntil: make(chan struct{}), } } func (w *Worker) Start() error { setupQueues(w) err := w.doWork(w.doWrkUntil) if err != nil { log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.ID, err) close(w.doWrkUntil) } <-w.doWrkUntil return err // go func() { // sigint := make(chan os.Signal, 1) // signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) // <-sigint // w.Shutdown() // close(while) // }() // run := w.createRunFile("./app.run") // TODO move to common library (shared between server and worker) // defer w.removeRunFile(run) // w.Logger.Log("Waiting for messages...") // return nil } func (w *Worker) RegisterHandler(name string, fn func() any) { // fmt.Printf("Registering plugin( with handler): %s... OK\n", name) w.handlers[name] = fn() } func (w *Worker) OnShutdown() { w.GetLogger().Log("Worker %s is going down...", w.ID) // fmt.Printf("Worker %s is going down...\n", w.ID) unbindQueues(w) w.GetEventBus().Close() w.GetDatabase().Close() w.GetLogger().Log("Gone.") w.GetLogger().Close() close(w.doWrkUntil) } // Plugin helper funcitons func (w *Worker) GetCache() *redis.Client { return (w.handlers["cache"]).(*redis.Client) } func (w *Worker) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue return (w.handlers["database"]).(*pgxpool.Pool) } func (w *Worker) GetEventBus() *amqp.Channel { return (w.handlers["eventbus"]).(*amqp.Channel) } func (w *Worker) GetLogger() *fluentd.Logger { return (w.handlers["logger"]).(*fluentd.Logger) } func (w *Worker) doWork(while chan struct{}) error { w.services["order"] = service.NewOrderService(w.GetDatabase(), w.GetCache(), w.GetEventBus(), w.GetLogger()) oSrv := (w.services["order"]).(*service.OrderService) msgs, err := w.GetEventBus().Consume( w.cnf.EventBusQueue, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { w.GetLogger().Log("Failed to register a consumer: %s", err) os.Exit(1) //close(while) } go func() { for d := range msgs { // go func(d amqp.Delivery) { w.processMsg(oSrv, d) // }(d) } }() <-while return nil } func (w *Worker) processMsg(srvc *service.OrderService, m amqp.Delivery) { msg, err := rabbitmq.Deserialize(m.Body) if err != nil { w.GetLogger().Log("Deserialization error: %v\n", err) fmt.Printf("Deserialization error: %v\n", err) m.Reject(false) return } name := fmt.Sprintf("%s", msg["event"]) data := (msg["data"]).(map[string]interface{}) // reqID := (data["request_id"]).(string) // FIXME Check input params! w.GetLogger().Log("Processing message \"%s\" with data: %v\n", name, data) fmt.Printf("Processing message \"%s\" with data: %v\n", name, data) var ok = false switch true { // Refactor -> use case for polymorphism case strings.Contains(name, event.EVENT_BASKET_CHECKOUT): w.GetLogger().Log("Event: %s", event.EVENT_BASKET_CHECKOUT) } rnr := NewCommandRunner(data, srvc) // case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET): // basketID := data["basket_id"].(string) // FIXME Check input params! // productID := data["product_id"] // FIXME Check input params! // rnr.cmd = &AddProductToBasketCommand{srvc} // w.Logger.Log("Adding product #%d to basket #%s. ReqID: #%s", productID, basketID, reqID) // case strings.Contains(name, event.EVENT_PRODUCT_REMOVED_FROM_BASKET): // basketID := data["basket_id"].(string) // productID := data["product_id"].(float64) // rnr.cmd = &RemoveProductFromBasketCommand{srvc} // w.Logger.Log("Removing product #%d from basket #%s. ReqID: #%s", productID, basketID, reqID) // } ok, _ = rnr.run(data) if ok { w.GetLogger().Log("Successful executed message \"%s\"\n", name) fmt.Printf("Successful executed message \"%s\"\n", name) m.Ack(false) return } w.GetLogger().Log("Error processing \"%s\": %v", name, err) fmt.Printf("Error processing \"%s\": %v\n", name, err) m.Reject(false) // FIXME: or Nack(repeat until success - maybe message shout know...? } func setupQueues(w *Worker) { err := rabbitmq.NewExchange(w.GetEventBus(), w.cnf.EventBusExchange) if err != nil { w.GetLogger().Log("Failed to declare EventBus exchange: %v\n", err) fmt.Printf("Failed to declare EventBus exchange: %v\n", err) os.Exit(1) } args := amqp.Table{} args["x-message-ttl"] = 5 _, err = w.GetEventBus().QueueDeclare( w.cnf.EventBusQueue, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait args, // arguments ) if err != nil { w.GetLogger().Log("Failed to declare EventBus queue: %v\n", err) fmt.Printf("Failed to declare EventBus queue: %v\n", err) os.Exit(1) } rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "basket.order.basketCheckout") } func unbindQueues(w *Worker) { w.GetEventBus().QueueUnbind(w.cnf.EventBusQueue, "basket.order.basketCheckout", w.cnf.EventBusExchange, nil) }