package worker import ( "fmt" "log" "os" "os/signal" "strconv" "strings" "syscall" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v5/pgxpool" amqp "github.com/rabbitmq/amqp091-go" "git.pbiernat.io/egommerce/go-api-pkg/consul" "git.pbiernat.io/egommerce/go-api-pkg/fluentd" "git.pbiernat.io/egommerce/go-api-pkg/rabbitmq" "git.pbiernat.io/egommerce/catalog-service/internal/event" cnf "git.pbiernat.io/egommerce/catalog-service/internal/server" "git.pbiernat.io/egommerce/catalog-service/internal/service" ) type ( Worker struct { Config *cnf.Config Cache *redis.Client Database *pgxpool.Pool Eventbus *amqp.Channel Logger *fluentd.Logger Registry *consul.Service } OptionFn func(*Worker) error // FIXME: similar in server/server.go ) func New(c *cnf.Config, opts ...OptionFn) *Worker { w := &Worker{ Config: c, } for _, opt := range opts { if err := opt(w); err != nil { log.Fatalf("Failed to attach extension to the worker. Err: %v\n", err) } } return w } func (w *Worker) Start(while chan struct{}) error { go func() { sigint := make(chan os.Signal, 1) signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) <-sigint w.Shutdown() close(while) }() run := w.createRunFile("./app.run") // TODO move to common library (shared between server and worker) defer w.removeRunFile(run) err := w.doWork() if err != nil { log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.Config.Base.AppID, err) close(while) } w.Logger.Log("Waiting for messages...") return nil } func (w *Worker) Shutdown() error { w.Logger.Log("Worker %s is going down...", w.Config.Base.AppID) w.Registry.Unregister() w.Eventbus.Close() w.Database.Close() w.Logger.Log("Gone.") w.Logger.Close() return nil } func (w *Worker) createRunFile(path string) *os.File { run, err := os.Create(path) if err != nil { log.Fatalf("Failed to create run file. Reason: %v\n", err) os.Exit(1) } run.WriteString(strconv.Itoa(os.Getpid())) return run } func (w *Worker) removeRunFile(f *os.File) error { return f.Close() } func (w *Worker) doWork() error { msgs, err := w.Eventbus.Consume( w.Config.EventBusQueue, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { w.Logger.Log("Failed to register a consumer: %s", err) os.Exit(1) } go func() { catalogSrvc := service.NewCatalogService(w.Database, w.Eventbus, w.Logger) for d := range msgs { go func(d amqp.Delivery) { w.processMsg(catalogSrvc, d) }(d) } }() return nil } func (w *Worker) processMsg(srvc *service.CatalogService, d amqp.Delivery) { msg, err := rabbitmq.Deserialize(d.Body) if err != nil { w.Logger.Log("Deserialization error: %v\n", err) d.Reject(false) return } name := fmt.Sprintf("%s", msg["event"]) data := (msg["data"]).(map[string]interface{}) // reqID := (data["request_id"]).(string) // FIXME Check input params! w.Logger.Log("Processing message \"%s\" with data: %v\n", name, data) var ok = false switch true { // Refactor -> use case for polymorphism case strings.Contains(name, event.EVENT_WAREHOUSE_STOCK_UPDATED): w.Logger.Log("Event: %s", event.EVENT_WAREHOUSE_STOCK_UPDATED) } rnr := NewCommandRunner(data, srvc) ok, _ = rnr.run(data) if ok { w.Logger.Log("Successful executed message \"%s\"\n", name) d.Ack(false) return } w.Logger.Log("Error processing \"%s\": %s (%v)", name, err.Error(), err) d.Reject(false) // FIXME: or Nack(repeat until success - maybe message shoud know...? }