diff --git a/src/cmd/migrate/main.go b/src/cmd/migrate/main.go index 30388ee..8f647d8 100644 --- a/src/cmd/migrate/main.go +++ b/src/cmd/migrate/main.go @@ -40,7 +40,7 @@ func main() { log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs) } - c := cnf.NewConfig() + c := cnf.NewConfig("basket-migrator") // dbURL := baseCnf.GetEnv("DATABASE_URL", defDbURL) diff --git a/src/cmd/server/main.go b/src/cmd/server/main.go index e1cfe44..33dc83b 100644 --- a/src/cmd/server/main.go +++ b/src/cmd/server/main.go @@ -1,7 +1,6 @@ package main import ( - "fmt" "log" "os" @@ -16,7 +15,7 @@ func main() { log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs) } - c := cnf.NewConfig() + c := cnf.NewConfig("basket-server") srv := svr.New( c, svr.WithCache(c), @@ -35,6 +34,5 @@ func main() { os.Exit(1) } - fmt.Println("Done.") os.Exit(0) } diff --git a/src/cmd/worker/main.go b/src/cmd/worker/main.go index 74281d3..529808e 100644 --- a/src/cmd/worker/main.go +++ b/src/cmd/worker/main.go @@ -11,10 +11,10 @@ import ( func main() { if config.ErrLoadingEnvs != nil { - log.Panicln("Error loading .env file.") + log.Fatalln("Error loading .env file.") } - c := cnf.NewConfig() + c := cnf.NewConfig("basket-worker") wrk := worker.New( c, worker.WithCache(c), @@ -24,7 +24,7 @@ func main() { worker.WithRegistry(c), ) - forever := make(chan struct{}) - wrk.Start() - <-forever + while := make(chan struct{}) + wrk.Start(while) + <-while } diff --git a/src/internal/config/config.go b/src/internal/config/config.go index ca2a8e4..43253ab 100644 --- a/src/internal/config/config.go +++ b/src/internal/config/config.go @@ -14,7 +14,7 @@ const ( defCachePassword = "12345678" defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce" defEventBusURL = "amqp://guest:guest@api-eventbus:56721" - defKVNmspc = "dev.egommerce/service/basket-svc" + defKVNmspc = "dev.egommerce/service/basket" defLoggerAddr = "api-logger:24224" defNetAddr = ":80" defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017" @@ -34,17 +34,18 @@ type Config struct { EventBusQueue string `json:"eventbus_queue"` EventBusURL string `json:"eventbus_url"` LoggerAddr string `json:"logger_addr"` + KVNamespace string RegistryAddr string // Fields with JSON mappings are available through Consul KV storage } -func NewConfig() *Config { +func NewConfig(name string) *Config { c := new(Config) c.Base = new(srv.Config) c.Base.AppID, _ = os.Hostname() - c.Base.AppName = cnf.GetEnv("APP_NAME", defAppName) + c.Base.AppName = name c.Base.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr) c.Base.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix) @@ -53,6 +54,7 @@ func NewConfig() *Config { c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL) c.EventBusExchange = defEbEventsExchange c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL) + c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc) c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr) c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr) diff --git a/src/internal/event/basket.go b/src/internal/event/basket.go index 93f91df..fa5abdc 100644 --- a/src/internal/event/basket.go +++ b/src/internal/event/basket.go @@ -1,17 +1,17 @@ package event const ( - EVENT_PRODUCT_ADDED_TO_BASKET = "event.ProductAddedToBasketEvent" - EVENT_PRODUCT_REMOVED_FROM_BASKET = "event.ProductRemovedFromBasketEvent" + EVENT_PRODUCT_ADDED_TO_BASKET = "event.ProductAddedToBasket" + EVENT_PRODUCT_REMOVED_FROM_BASKET = "event.ProductRemovedFromBasket" ) -type ProductAddedToBasketEvent struct { +type ProductAddedToBasket struct { *Event ProductID string `json:"product_id"` BasketID string `json:"basket_id"` } -type ProductRemovedFromBasketEvent struct { +type ProductRemovedFromBasket struct { *Event ProductID string `json:"product_id"` BasketID string `json:"basket_id"` diff --git a/src/internal/server/server.go b/src/internal/server/server.go index 25b2fec..397c9a5 100644 --- a/src/internal/server/server.go +++ b/src/internal/server/server.go @@ -32,8 +32,6 @@ type ( Eventbus *amqp.Channel Logger *fluentd.Logger Registry *consul.Service - - kvNmspc string } OptionFn func(*Server) error // FIXME: similar in worker @@ -139,7 +137,7 @@ func WithRegistry(c *cnf.Config) OptionFn { go func() { // Consul KV updater ticker := time.NewTicker(time.Second * 15) for range ticker.C { - updateKVConfig(s) + fetchKVConfig(s) // FIXME: duplicated in worker } }() @@ -156,13 +154,13 @@ func WithRegistry(c *cnf.Config) OptionFn { func (s *Server) Shutdown() srv.PurgeFn { return func(srv *srv.Server) error { - // s.Logger.Log("Server is going down... Unregistering service: %s", s.Base.AppID) - s.Logger.Log("Server is going down... Unregistering service...") + s.Logger.Log("Server %s is going down...", s.Base.AppID) s.Registry.Unregister() s.clearMetadataCache() s.Eventbus.Close() s.Database.Close() + s.Logger.Log("Gone.") s.Logger.Close() return s.Base.Shutdown() @@ -170,8 +168,8 @@ func (s *Server) Shutdown() srv.PurgeFn { } // @CHECK: merge s.Config and s.Base.Config to display all config as one array/map -func updateKVConfig(s *Server) { // @FIXME: merge duplication in server.go and worker.go - config, _, err := s.Registry.KV().Get(s.kvNmspc, nil) +func fetchKVConfig(s *Server) { // @FIXME: merge duplication in server.go and worker.go + config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil) if err != nil || config == nil { return } diff --git a/src/internal/ui/basket.go b/src/internal/ui/basket.go index 22c00e8..28ad839 100644 --- a/src/internal/ui/basket.go +++ b/src/internal/ui/basket.go @@ -10,6 +10,7 @@ import ( ) func AddProductToBasket(srv *service.BasketService, productID, qty int, basketID, reqID string) (*model.BasketModel, error) { + // FIXME: error occurs when 0 is passed ctx := context.Background() basket, err := srv.FetchFromDB(ctx, basketID) if err != nil { diff --git a/src/internal/worker/command.go b/src/internal/worker/command.go new file mode 100644 index 0000000..6b0067c --- /dev/null +++ b/src/internal/worker/command.go @@ -0,0 +1,55 @@ +package worker + +import ( + "git.pbiernat.dev/egommerce/basket-service/internal/service" + "git.pbiernat.dev/egommerce/basket-service/internal/ui" +) + +var ( + AddProductToBasket = "event.ProductAddedToBasket" + RemoveProductFromBasket = "event.ProductRemovedFromBasket" +) + +type Command interface { + run(CommandData) (bool, any) +} + +type CommandData map[string]interface{} + +type CommandRunner struct { + cmd Command +} + +func (r *CommandRunner) run(data CommandData) (bool, any) { + return r.cmd.run(data) +} + +type AddProductToBasketCommand struct { + srvc *service.BasketService +} + +func (c *AddProductToBasketCommand) run(data CommandData) (bool, any) { + reqID := data["request_id"].(string) // FIXME Check input params! + productID := int(data["product_id"].(float64)) // FIXME Check input params! + basketID := data["basket_id"].(string) // FIXME Check input params! + qty := int(data["quantity"].(float64)) // FIXME Check input params! + + basket, err := ui.AddProductToBasket(c.srvc, productID, qty, basketID, reqID) + + return err == nil, basket +} + +type RemoveProductFromBasketCommand struct { + srvc *service.BasketService +} + +func (c *RemoveProductFromBasketCommand) run(data CommandData) (bool, any) { + reqID := data["request_id"].(string) // FIXME Check input params! + productID := int(data["product_id"].(float64)) // FIXME Check input params! + basketID := data["basket_id"].(string) // FIXME Check input params! + qty := int(data["quantity"].(float64)) // FIXME Check input params! + + basket, err := ui.RemoveProductFromBasket(c.srvc, productID, qty, basketID, reqID) + + return err == nil, basket +} diff --git a/src/internal/worker/worker.go b/src/internal/worker/worker.go index 8d9c588..9502417 100644 --- a/src/internal/worker/worker.go +++ b/src/internal/worker/worker.go @@ -24,20 +24,16 @@ import ( cnf "git.pbiernat.dev/egommerce/basket-service/internal/config" "git.pbiernat.dev/egommerce/basket-service/internal/event" "git.pbiernat.dev/egommerce/basket-service/internal/service" - "git.pbiernat.dev/egommerce/basket-service/internal/ui" ) type ( Worker struct { - Config *cnf.Config - + Config *cnf.Config Cache *redis.Client Database *pgxpool.Pool Eventbus *amqp.Channel Logger *fluentd.Logger Registry *consul.Service - - kvNmspc string } OptionFn func(*Worker) error // FIXME: similar in server/server.go ) @@ -56,101 +52,6 @@ func New(c *cnf.Config, opts ...OptionFn) *Worker { return wrk } -func (w *Worker) Start() error { - // event consume - msgs, err := w.Eventbus.Consume( - w.Config.EventBusQueue, // queue - "", // consumer - false, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - if err != nil { - w.Logger.Log("Failed to register a consumer: %s", err) - os.Exit(1) - } - - forever := make(chan struct{}) - go func() { - sigint := make(chan os.Signal, 1) - signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - <-sigint - - w.Logger.Log("Shutting down %s worker...\n", w.Config.Base.GetAppFullName()) - w.Shutdown() - - close(forever) - }() - - go func() { - basketSrv := service.NewBasketService(w.Database, w.Cache, w.Eventbus, w.Logger) - // basketSrv := service.NewBasketService(w) - - for d := range msgs { - go func(d amqp.Delivery) { - msg, err := rabbitmq.Deserialize(d.Body) - if err != nil { - w.Logger.Log("deserialize error: %v\n", err) - d.Reject(false) // FIXME: or Nack? how to handle erros in queue... - return - } - - eName := fmt.Sprintf("%s", msg["event"]) - data := (msg["data"]).(map[string]interface{}) - w.Logger.Log("Message<%s>: %v\n", eName, data) - - reqID := data["request_id"].(string) // FIXME Check input params! - basketID := data["basket_id"].(string) // FIXME Check input params! - - switch true { - case strings.Contains(eName, event.EVENT_PRODUCT_ADDED_TO_BASKET): - productID := int(data["product_id"].(float64)) - qty := int(data["quantity"].(float64)) - - basket, err := ui.AddProductToBasket(basketSrv, productID, qty, basketID, reqID) - if err == nil { - w.Logger.Log("Product #%d added to basket #%s. ReqID: #%s", productID, basket.ID, reqID) - } - case strings.Contains(eName, event.EVENT_PRODUCT_REMOVED_FROM_BASKET): - productID := int(data["product_id"].(float64)) - qty := int(data["quantity"].(float64)) - - basket, err := ui.RemoveProductFromBasket(basketSrv, productID, qty, basketID, reqID) - if err == nil { - w.Logger.Log("Product #%d removed from basket #%s. ReqID: #%s", productID, basket.ID, reqID) - } - } - - if err != nil { - w.Logger.Log("%s error: %s", eName, err.Error()) - d.Reject(false) // FIXME: or Nack? how to handle erros in queue... - return - } - - w.Logger.Log("ACK: %s", eName) - d.Ack(false) - }(d) - } - }() - - w.Logger.Log("Waiting for messages...2") - - return nil -} - -func (w *Worker) Shutdown() error { - w.Logger.Log("Worker is going down...") - - w.Registry.Unregister() - w.Eventbus.Close() - w.Database.Close() - w.Logger.Close() - - return nil -} - func WithCache(c *cnf.Config) OptionFn { return func(w *Worker) error { conn := redis.NewClient(&redis.Options{ @@ -206,7 +107,7 @@ func WithEventbus(c *cnf.Config) OptionFn { os.Exit(1) } - // w.BindQueues() + // w.bindQueues() rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productAddedToBasket") rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productRemovedFromBasket") rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.updateQuantity") @@ -244,22 +145,22 @@ func WithRegistry(c *cnf.Config) OptionFn { w.Logger.Log("Error connecting to %s: %v", c.RegistryAddr, err) } - go func() { // Consul KV updater + w.Registry = registry + + go func(w *Worker) { // Fetch Consul KV config and store it in app config ticker := time.NewTicker(time.Second * 15) for range ticker.C { - updateKVConfig(w) + fetchKVConfig(w) // FIXME: duplicated in server } - }() - - w.Registry = registry + }(w) return nil } + } -// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map -func updateKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go - config, _, err := w.Registry.KV().Get(w.kvNmspc, nil) +func fetchKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go + config, _, err := w.Registry.KV().Get(w.Config.KVNamespace, nil) if err != nil || config == nil { return } @@ -270,3 +171,114 @@ func updateKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and w return } } + +func (w *Worker) Start(while chan struct{}) error { + go func() { + sigint := make(chan os.Signal, 1) + signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + <-sigint + + w.Shutdown() + + close(while) + }() + + err := w.doWork() + if err != nil { + log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.Config.Base.AppID, err) + close(while) + } + + w.Logger.Log("Waiting for messages...") + + return nil +} + +func (w *Worker) Shutdown() error { + w.Logger.Log("Worker %s is going down...", w.Config.Base.AppID) + + w.Registry.Unregister() + w.Eventbus.Close() + w.Database.Close() + w.Logger.Log("Gone.") + w.Logger.Close() + + return nil +} + +func (w *Worker) doWork() error { + msgs, err := w.Eventbus.Consume( + w.Config.EventBusQueue, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + w.Logger.Log("Failed to register a consumer: %s", err) + os.Exit(1) + } + + go func() { + basketSrvc := service.NewBasketService(w.Database, w.Cache, w.Eventbus, w.Logger) + + for d := range msgs { + go func(d amqp.Delivery) { + w.processMsg(basketSrvc, d) + }(d) + } + }() + + return nil +} + +func (w *Worker) processMsg(srvc *service.BasketService, d amqp.Delivery) { + msg, err := rabbitmq.Deserialize(d.Body) + if err != nil { + w.Logger.Log("Deserialization error: %v\n", err) + d.Reject(false) + + return + } + + rnr := &CommandRunner{} + name := fmt.Sprintf("%s", msg["event"]) + data := (msg["data"]).(map[string]interface{}) + reqID := (data["request_id"]).(string) // FIXME Check input params! + + w.Logger.Log("Processing message \"%s\" with data: %v\n", name, data) + + var ok = false + switch true { // Refactor -> use case for polymorphism + case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET): + basketID := data["basket_id"].(string) // FIXME Check input params! + productID := data["product_id"] // FIXME Check input params! + + rnr.cmd = &AddProductToBasketCommand{srvc} + w.Logger.Log("Adding product #%d to basket #%s. ReqID: #%s", productID, basketID, reqID) + case strings.Contains(name, event.EVENT_PRODUCT_REMOVED_FROM_BASKET): + basketID := data["basket_id"].(string) + productID := data["product_id"].(float64) + + rnr.cmd = &RemoveProductFromBasketCommand{srvc} + w.Logger.Log("Removing product #%d from basket #%s. ReqID: #%s", productID, basketID, reqID) + } + + ok, _ = rnr.run(data) + if ok { + w.Logger.Log("Successful executed message \"%s\"\n", name) + return + } + + if !ok { + w.Logger.Log("Error processing \"%s\": %s (%v)", name, err.Error(), err) + d.Reject(false) // FIXME: or Nack(repeat until success - maybe message shout know...? + + return + } + + w.Logger.Log("Finalized processing: %s", name) + d.Ack(false) +} diff --git a/src/pkg/server/server.go b/src/pkg/server/server.go index eec40c2..9f76a01 100644 --- a/src/pkg/server/server.go +++ b/src/pkg/server/server.go @@ -1,7 +1,6 @@ package server import ( - "fmt" "log" "os" "os/signal" @@ -49,7 +48,6 @@ func (s *Server) Start(while chan struct{}, prgFn PurgeFn) error { signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) <-sigint - fmt.Print("Shutting down... ") if err := prgFn(s); err != nil { log.Fatalf("Failed to shutdown server. Reason: %v\n", err) } @@ -59,7 +57,7 @@ func (s *Server) Start(while chan struct{}, prgFn PurgeFn) error { err := s.Listen(s.addr) if err != nil { - log.Fatalf("Failed to start server. Reason: %v\n", err) + log.Fatalf("Failed to start server: %s. Reason: %v\n", s.Config.AppID, err) close(while) } <-while