package main import ( "bytes" "encoding/json" "errors" "fmt" "log" "os" "os/signal" "strings" "syscall" "time" "git.pbiernat.dev/egommerce/catalog-service/internal/app/config" "git.pbiernat.dev/egommerce/catalog-service/internal/app/database" "git.pbiernat.dev/egommerce/catalog-service/internal/app/event" "git.pbiernat.dev/egommerce/catalog-service/internal/app/server" discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul" "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" amqp "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" ) const ( defAppName = "catalog-worker" defLoggerAddr = "api-logger:24224" defRegistryAddr = "api-registry:8500" defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce" defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017" defEventBusURL = "amqp://guest:guest@api-eventbus:5672" ebEventsExchange = "api-events" ebEventsQueue = "catalog-worker" defKVNmspc = "dev.egommerce/service/catalog-worker" ) func main() { if config.ErrLoadingEnvs != nil { log.Panicln("Error loading .env file", config.ErrLoadingEnvs) } c := new(server.Config) c.AppID, _ = os.Hostname() c.AppName = config.GetEnv("APP_NAME", defAppName) c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr) c.RegistryAddr = config.GetEnv("REGISTRY_ADDR", defRegistryAddr) c.DbURL = config.GetEnv("DATABASE_URL", defDbURL) c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL) c.EventBusExchange = ebEventsExchange c.EventBusQueue = ebEventsQueue c.KVNamespace = config.GetEnv("APP_KV_NAMESPACE", defKVNmspc) logHost, logPort := fluentd.ParseAddr(c.LoggerAddr) logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) defer logger.Close() consul, err := discovery.NewService(c.RegistryAddr, c.AppID, c.AppName, c.AppID, "", 0) if err != nil { logger.Log("Error connecting to %s: %v", c.RegistryAddr, err) } go func(consul *discovery.Service) { interval := time.Second * 30 ticker := time.NewTicker(interval) for range ticker.C { err := updateKVConfig(consul, c) // FIXME: duplicated in internal/app/server/server.go if err != nil { logger.Log("KV config update error (skipping): %v\n", err) } } }(consul) // db conn dbConn, err := database.Connect(c.DbURL) if err != nil { // fixme: add wait-for-db... logger.Log("Failed to connect to Database server: %v\n", err) os.Exit(1) } defer dbConn.Close() // eventbus conn ebConn, ebCh, err := amqp.Open(c.EventBusURL) if err != nil { logger.Log("Failed to connect to EventBus server: %v\n", err) os.Exit(1) } defer ebCh.Close() defer amqp.Close(ebConn) err = amqp.NewExchange(ebCh, c.EventBusExchange) if err != nil { logger.Log("Failed to declare EventBus exchange: %v\n", err) os.Exit(1) } // create and bind queues _, err = ebCh.QueueDeclare( c.EventBusQueue, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { logger.Log("Failed to declare EventBus queue: %v\n", err) os.Exit(1) } amqp.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "warehouse.catalog.stockUpdated") // if err != nil { // logger.Log("Failed to bind EventBus queue: %v\n", err) // os.Exit(1) // } // event consume msgs, err := ebCh.Consume( c.EventBusQueue, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { logger.Log("Failed to register a EventBus consumer: %s", err) os.Exit(1) } forever := make(chan struct{}) go func() { sigint := make(chan os.Signal, 1) signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) <-sigint logger.Log("Worker %s stopped working...\n", c.GetAppFullName()) close(forever) }() go func() { for d := range msgs { msg, err := amqp.Deserialize(d.Body) if err != nil { logger.Log("json error: %v\n", err) d.Reject(false) // FIXME: how to handle erros in queue...???? continue } eName := fmt.Sprintf("%s", msg["event"]) data := (msg["data"]).(map[string]interface{}) logger.Log("Message<%s>: %s\n", eName, data) switch true { case strings.Contains(eName, event.EVENT_WAREHOUSE_STOCK_UPDATED): // todo: first create such service ;) // update product available qty logger.Log("Event: %s", event.EVENT_WAREHOUSE_STOCK_UPDATED) } logger.Log("ACK: %s", eName) d.Ack(false) } }() logger.Log("Waiting for messages...") <-forever } func updateKVConfig(s *discovery.Service, oldCnf *server.Config) error { // FIXME: duplicated in internal/app/server/server.go data, _, err := s.KV().Get(oldCnf.KVNamespace, nil) if err != nil { return err } if data == nil { return errors.New("empty KV config data. Skipping") } buf := bytes.NewBuffer(data.Value) decoder := json.NewDecoder(buf) if err := decoder.Decode(oldCnf); err != nil { return err } return nil }