package main import ( "bytes" "encoding/json" "errors" "fmt" "log" "os" "os/signal" "strings" "syscall" "time" "git.pbiernat.dev/egommerce/basket-service/internal/app/config" "git.pbiernat.dev/egommerce/basket-service/internal/app/database" "git.pbiernat.dev/egommerce/basket-service/internal/app/event" "git.pbiernat.dev/egommerce/basket-service/internal/app/server" "git.pbiernat.dev/egommerce/basket-service/internal/app/service" "git.pbiernat.dev/egommerce/basket-service/internal/app/ui" discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul" "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" "github.com/go-redis/redis/v8" "github.com/streadway/amqp" ) const ( defAppName = "basket-worker" defLoggerAddr = "api-logger:24224" defRegistryAddr = "api-registry:8500" defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce" defCacheAddr = "api-cache:6379" defCachePassword = "12345678" defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017" defEventBusURL = "amqp://guest:guest@api-eventbus:5672" ebEventsExchange = "api-events" ebEventsQueue = "basket-worker" defKVNmspc = "dev.egommerce/service/basket-worker" ) func main() { if config.ErrLoadingEnvs != nil { log.Panicln("Error loading .env file", config.ErrLoadingEnvs) } c := new(server.Config) c.AppID, _ = os.Hostname() c.AppName = config.GetEnv("APP_NAME", defAppName) c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr) c.RegistryAddr = config.GetEnv("REGISTRY_ADDR", defRegistryAddr) c.DbURL = config.GetEnv("DATABASE_URL", defDbURL) c.CacheAddr = config.GetEnv("CACHE_ADDR", defCacheAddr) c.CachePassword = config.GetEnv("CACHE_PASSWORD", defCachePassword) c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL) c.EventBusExchange = ebEventsExchange c.EventBusQueue = ebEventsQueue c.KVNamespace = config.GetEnv("APP_KV_NAMESPACE", defKVNmspc) logHost, logPort := fluentd.ParseAddr(c.LoggerAddr) logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) defer logger.Close() consul, err := discovery.NewService(c.RegistryAddr, c.AppID, c.AppName, c.AppID, "", "", 0) if err != nil { logger.Log("Error connecting to %s: %v", c.RegistryAddr, err) } go func(consul *discovery.Service) { ticker := time.NewTicker(time.Second * 15) for range ticker.C { updateKVConfig(consul, c) // FIXME: duplicated in internal/app/server/server.go } }(consul) // db conn dbConn, err := database.Connect(c.DbURL) if err != nil { logger.Log("Failed to connect to Database server: %v\n", err) os.Exit(1) } defer dbConn.Close() // redis conn redis := redis.NewClient(&redis.Options{ Addr: c.CacheAddr, Password: c.CachePassword, DB: 0, }) defer redis.Close() // eventbus conn ebConn, ebCh, err := rabbitmq.Open(c.EventBusURL) if err != nil { logger.Log("Failed to connect to EventBus server: %v\n", err) os.Exit(1) } defer ebCh.Close() defer rabbitmq.Close(ebConn) err = rabbitmq.NewExchange(ebCh, c.EventBusExchange) if err != nil { logger.Log("Failed to declare EventBus exchange: %v\n", err) os.Exit(1) } // create and bind queues _, err = ebCh.QueueDeclare( c.EventBusQueue, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { logger.Log("Failed to declare EventBus queue: %v\n", err) os.Exit(1) } rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productAddedToBasket") rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productRemovedFromBasket") rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "catalog.basket.updateQuantity") // event consume msgs, err := ebCh.Consume( c.EventBusQueue, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { logger.Log("Failed to register a consumer: %s", err) os.Exit(1) } forever := make(chan struct{}) go func() { sigint := make(chan os.Signal, 1) signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) <-sigint logger.Log("Worker %s stopped working...\n", c.GetAppFullName()) close(forever) }() go func() { basketSrv := service.NewBasketService(dbConn, redis, ebCh, logger) for d := range msgs { go func(d amqp.Delivery) { msg, err := rabbitmq.Deserialize(d.Body) if err != nil { logger.Log("deserialize error: %v\n", err) d.Reject(false) // FIXME: or Nack? how to handle erros in queue... return } eName := fmt.Sprintf("%s", msg["event"]) data := (msg["data"]).(map[string]interface{}) logger.Log("Message<%s>: %v\n", eName, data) reqID := data["request_id"].(string) // FIXME Check input params! basketID := data["basket_id"].(string) // FIXME Check input params! switch true { case strings.Contains(eName, event.EVENT_PRODUCT_ADDED_TO_BASKET): productID := int(data["product_id"].(float64)) qty := int(data["quantity"].(float64)) basket, err := ui.AddProductToBasket(basketSrv, productID, qty, basketID, reqID) if err == nil { logger.Log("Product #%s added to basket #%s. ReqID: #%s", productID, basket.ID, reqID) } case strings.Contains(eName, event.EVENT_PRODUCT_REMOVED_FROM_BASKET): productID := int(data["product_id"].(float64)) qty := int(data["quantity"].(float64)) basket, err := ui.RemoveProductFromBasket(basketSrv, productID, qty, basketID, reqID) if err == nil { logger.Log("Product #%s removed from basket #%s. ReqID: #%s", productID, basket.ID, reqID) } } if err != nil { logger.Log("%s error: %s", eName, err.Error()) d.Reject(false) // FIXME: or Nack? how to handle erros in queue... return } logger.Log("ACK: %s", eName) d.Ack(false) }(d) } }() logger.Log("Waiting for messages...") <-forever } func updateKVConfig(s *discovery.Service, oldCnf *server.Config) error { // FIXME: duplicated in internal/app/server/server.go data, _, err := s.KV().Get(oldCnf.KVNamespace, nil) if err != nil { return err } if data == nil { return errors.New("empty KV config data") } buf := bytes.NewBuffer(data.Value) decoder := json.NewDecoder(buf) if err := decoder.Decode(oldCnf); err != nil { return err } return nil }