From 8fac6f7e61863251880acfbe29ff9b6b94d1e28b Mon Sep 17 00:00:00 2001 From: Piotr Biernat Date: Mon, 10 Apr 2023 03:57:01 +0200 Subject: [PATCH] Refactor --- src/cmd/migrate/main.go | 39 ++- src/cmd/server/main.go | 103 ++----- src/cmd/worker/main.go | 198 +------------ src/go.mod | 2 +- src/go.sum | 2 + src/internal/app/server/router.go | 55 ---- src/internal/app/server/server.go | 179 ------------ src/internal/config/config.go | 62 ++++ src/internal/{app => }/database/connect.go | 0 src/internal/{app => }/event/basket.go | 8 +- src/internal/{app => }/event/event.go | 4 +- src/internal/{app => }/event/warehouse.go | 0 src/internal/{app => }/log.go | 0 .../{app => }/server/catalog_handler.go | 30 +- src/internal/{app => }/server/config.go | 0 .../{app => }/server/health_handler.go | 2 +- src/internal/server/middleware.go | 33 +++ src/internal/server/router.go | 40 +++ src/internal/server/server.go | 205 +++++++++++++ src/internal/{app => }/service/catalog.go | 16 +- src/internal/{app => }/ui/catalog.go | 2 +- src/internal/worker/command.go | 35 +++ src/internal/worker/worker.go | 272 ++++++++++++++++++ .../config/env.go => pkg/config/config.go} | 0 src/pkg/database/connect.go | 16 ++ src/pkg/server/config.go | 21 ++ src/pkg/server/server.go | 79 +++++ 27 files changed, 862 insertions(+), 541 deletions(-) delete mode 100644 src/internal/app/server/router.go delete mode 100644 src/internal/app/server/server.go create mode 100644 src/internal/config/config.go rename src/internal/{app => }/database/connect.go (100%) rename src/internal/{app => }/event/basket.go (79%) rename src/internal/{app => }/event/event.go (55%) rename src/internal/{app => }/event/warehouse.go (100%) rename src/internal/{app => }/log.go (100%) rename src/internal/{app => }/server/catalog_handler.go (61%) rename src/internal/{app => }/server/config.go (100%) rename src/internal/{app => }/server/health_handler.go (91%) create mode 100644 src/internal/server/middleware.go create mode 100644 src/internal/server/router.go create mode 100644 src/internal/server/server.go rename src/internal/{app => }/service/catalog.go (84%) rename src/internal/{app => }/ui/catalog.go (95%) create mode 100644 src/internal/worker/command.go create mode 100644 src/internal/worker/worker.go rename src/{internal/app/config/env.go => pkg/config/config.go} (100%) create mode 100644 src/pkg/database/connect.go create mode 100644 src/pkg/server/config.go create mode 100644 src/pkg/server/server.go diff --git a/src/cmd/migrate/main.go b/src/cmd/migrate/main.go index cce6028..8f15d9e 100644 --- a/src/cmd/migrate/main.go +++ b/src/cmd/migrate/main.go @@ -6,10 +6,14 @@ import ( "log" "os" - "git.pbiernat.dev/egommerce/catalog-service/internal/app/config" - "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" "github.com/go-pg/migrations/v8" "github.com/go-pg/pg/v10" + + "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" + + baseCnf "git.pbiernat.dev/egommerce/catalog-service/pkg/config" + + cnf "git.pbiernat.dev/egommerce/catalog-service/internal/config" ) const ( @@ -32,19 +36,26 @@ Usage: ` func main() { - if config.ErrLoadingEnvs != nil { - log.Panicln("Error loading .env file", config.ErrLoadingEnvs) + if baseCnf.ErrLoadingEnvs != nil { + log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs) } - // dbURL := config.GetEnv("DATABASE_URL", defDbURL) - loggerAddr := config.GetEnv("LOGGER_ADDR", defLoggerAddr) - mTblName := config.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName) + c := cnf.NewConfig("catalog-migrator") - logHost, logPort := fluentd.ParseAddr(loggerAddr) - logger := fluentd.NewLogger(defAppName, logHost, logPort) - defer logger.Close() + // dbURL := baseCnf.GetEnv("DATABASE_URL", defDbURL) - flag.Usage = usage // FIXME + logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr) + if err != nil { + log.Fatalf("Error parsing logger addr: %s. Err: %v", c.LoggerAddr, err) + } + + logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error) + if err != nil { + log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err) + } + // defer logger.Close() + + flag.Usage = usage flag.Parse() db := pg.Connect(&pg.Options{ // FIXME @@ -54,10 +65,10 @@ func main() { Database: "egommerce", }) + mTbl := baseCnf.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName) mig := migrations.NewCollection() - mig.SetTableName(mTblName) - err := mig.DiscoverSQLMigrations("./migrations") - if err != nil { + mig.SetTableName(mTbl) + if err := mig.DiscoverSQLMigrations("./migrations"); err != nil { logger.Log("migration dicovery error: %#v", err) } diff --git a/src/cmd/server/main.go b/src/cmd/server/main.go index d3fec65..87f2090 100644 --- a/src/cmd/server/main.go +++ b/src/cmd/server/main.go @@ -2,102 +2,37 @@ package main import ( "log" - "net/http" "os" - "strconv" - "git.pbiernat.dev/egommerce/catalog-service/internal/app/config" - "git.pbiernat.dev/egommerce/catalog-service/internal/app/database" - "git.pbiernat.dev/egommerce/catalog-service/internal/app/server" - "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" - amqp "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" - "github.com/go-redis/redis/v8" - "github.com/prometheus/client_golang/prometheus/promhttp" -) + baseCnf "git.pbiernat.dev/egommerce/catalog-service/pkg/config" -const ( - defAppName = "catalog-svc" - defAppDomain = "catalog-svc" - defPathPrefix = "/catalog" - defNetAddr = ":80" - defLoggerAddr = "api-logger:24224" - defRegistryAddr = "api-registry:8500" - // defMetricsAddr = "api-prometheus:9090" - defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce" - defCacheAddr = "api-cache:6379" - defCachePassword = "12345678" - defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017" - defEventBusURL = "amqp://guest:guest@api-eventbus:5672" - ebEventsExchange = "api-events" - ebEventsQueue = "catalog-svc" - defKVNmspc = "dev.egommerce/service/catalog-svc" + cnf "git.pbiernat.dev/egommerce/catalog-service/internal/config" + svr "git.pbiernat.dev/egommerce/catalog-service/internal/server" ) func main() { - if config.ErrLoadingEnvs != nil { - log.Panicln("Error loading .env file", config.ErrLoadingEnvs) + if baseCnf.ErrLoadingEnvs != nil { + log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs) } - c := new(server.Config) - c.AppID, _ = os.Hostname() - c.AppName = config.GetEnv("APP_NAME", defAppName) - c.AppDomain = config.GetEnv("APP_DOMAIN", defAppDomain) - c.PathPrefix = config.GetEnv("APP_PATH_PREFIX", defPathPrefix) - c.NetAddr = config.GetEnv("SERVER_ADDR", defNetAddr) - c.Port, _ = strconv.Atoi(c.NetAddr[1:]) - c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr) - c.RegistryAddr = config.GetEnv("REGISTRY_ADDR", defRegistryAddr) - c.DbURL = config.GetEnv("DATABASE_URL", defDbURL) - c.CacheAddr = config.GetEnv("CACHE_ADDR", defCacheAddr) - c.CachePassword = config.GetEnv("CACHE_PASSWORD", defCachePassword) - c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL) - c.EventBusExchange = ebEventsExchange - c.KVNamespace = config.GetEnv("APP_KV_NAMESPACE", defKVNmspc) + c := cnf.NewConfig("catalog-server") + srv := svr.New( + c, + svr.WithCache(c), + svr.WithDatabase(c), + svr.WithEventbus(c), + svr.WithLogger(c), + svr.WithRegistry(c), + ) - logHost, logPort := fluentd.ParseAddr(c.LoggerAddr) - logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) - defer logger.Close() + while := make(chan struct{}) + err := srv.Base.Start(while, srv.Shutdown()) + <-while - // db conn - dbConn, err := database.Connect(c.DbURL) - if err != nil { // fixme: add wait-for-db... - logger.Log("Failed to connect to Database server: %v\n", err) - os.Exit(1) - } - defer dbConn.Close() - - // redis conn - redis := redis.NewClient(&redis.Options{ - Addr: c.CacheAddr, - Password: c.CachePassword, - DB: 0, - }) - defer redis.Close() - - // eventbus conn - ebConn, ebCh, err := amqp.Open(c.EventBusURL) if err != nil { - logger.Log("Failed to connect to EventBus server: %v\n", err) - os.Exit(1) - } - defer ebCh.Close() - defer amqp.Close(ebConn) - - err = amqp.NewExchange(ebCh, c.EventBusExchange) - if err != nil { - logger.Log("Failed to declare EventBus exchange: %v\n", err) + log.Fatalf("Failed to start server. Reason: %v\n", err) os.Exit(1) } - // start server - srv := server.NewServer(c, logger, dbConn, redis, ebCh) - - // start metrics - go http.ListenAndServe(":8084", promhttp.Handler()) - - srvChan := make(chan struct{}) - srv.StartWithGracefulShutdown(srvChan) - <-srvChan - - // os.Exit(1) + os.Exit(0) } diff --git a/src/cmd/worker/main.go b/src/cmd/worker/main.go index fcdc042..691d1f5 100644 --- a/src/cmd/worker/main.go +++ b/src/cmd/worker/main.go @@ -1,198 +1,30 @@ package main import ( - "bytes" - "encoding/json" - "errors" - "fmt" "log" - "os" - "os/signal" - "strings" - "syscall" - "time" - "git.pbiernat.dev/egommerce/catalog-service/internal/app/config" - "git.pbiernat.dev/egommerce/catalog-service/internal/app/database" - "git.pbiernat.dev/egommerce/catalog-service/internal/app/event" - "git.pbiernat.dev/egommerce/catalog-service/internal/app/server" - discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul" - "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" - "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" - "github.com/go-redis/redis/v8" - "github.com/streadway/amqp" -) + "git.pbiernat.dev/egommerce/catalog-service/pkg/config" -const ( - defAppName = "catalog-worker" - defLoggerAddr = "api-logger:24224" - defRegistryAddr = "api-registry:8500" - defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce" - defCacheAddr = "api-cache:6379" - defCachePassword = "12345678" - defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017" - defEventBusURL = "amqp://guest:guest@api-eventbus:5672" - ebEventsExchange = "api-events" - ebEventsQueue = "catalog-worker" - defKVNmspc = "dev.egommerce/service/catalog-worker" + cnf "git.pbiernat.dev/egommerce/catalog-service/internal/config" + "git.pbiernat.dev/egommerce/catalog-service/internal/worker" ) func main() { if config.ErrLoadingEnvs != nil { - log.Panicln("Error loading .env file", config.ErrLoadingEnvs) + log.Fatalln("Error loading .env file.") } - c := new(server.Config) - c.AppID, _ = os.Hostname() - c.AppName = config.GetEnv("APP_NAME", defAppName) - c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr) - c.RegistryAddr = config.GetEnv("REGISTRY_ADDR", defRegistryAddr) - c.DbURL = config.GetEnv("DATABASE_URL", defDbURL) - c.CacheAddr = config.GetEnv("CACHE_ADDR", defCacheAddr) - c.CachePassword = config.GetEnv("CACHE_PASSWORD", defCachePassword) - c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL) - c.EventBusExchange = ebEventsExchange - c.EventBusQueue = ebEventsQueue - c.KVNamespace = config.GetEnv("APP_KV_NAMESPACE", defKVNmspc) - - logHost, logPort := fluentd.ParseAddr(c.LoggerAddr) - logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) - defer logger.Close() - - consul, err := discovery.NewService(c.RegistryAddr, c.AppID, c.AppName, c.AppID, "", "", 0) - if err != nil { - logger.Log("Error connecting to %s: %v", c.RegistryAddr, err) - } - - go func(consul *discovery.Service) { - interval := time.Second * 30 - ticker := time.NewTicker(interval) - for range ticker.C { - updateKVConfig(consul, c) // FIXME: duplicated in internal/app/server/server.go - } - }(consul) - - // db conn - dbConn, err := database.Connect(c.DbURL) - if err != nil { // fixme: add wait-for-db... - logger.Log("Failed to connect to Database server: %v\n", err) - os.Exit(1) - } - defer dbConn.Close() - - // redis conn - redis := redis.NewClient(&redis.Options{ - Addr: c.CacheAddr, - Password: c.CachePassword, - DB: 0, - }) - defer redis.Close() - - // eventbus conn - ebConn, ebCh, err := rabbitmq.Open(c.EventBusURL) - if err != nil { - logger.Log("Failed to connect to EventBus server: %v\n", err) - os.Exit(1) - } - defer ebCh.Close() - defer rabbitmq.Close(ebConn) - - err = rabbitmq.NewExchange(ebCh, c.EventBusExchange) - if err != nil { - logger.Log("Failed to declare EventBus exchange: %v\n", err) - os.Exit(1) - } - - // create and bind queues - _, err = ebCh.QueueDeclare( - c.EventBusQueue, // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + c := cnf.NewConfig("catalog-worker") + wrk := worker.New( + c, + worker.WithCache(c), + worker.WithDatabase(c), + worker.WithEventbus(c), + worker.WithLogger(c), + worker.WithRegistry(c), ) - if err != nil { - logger.Log("Failed to declare EventBus queue: %v\n", err) - os.Exit(1) - } - rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "warehouse.catalog.stockUpdated") - // if err != nil { - // logger.Log("Failed to bind EventBus queue: %v\n", err) - // os.Exit(1) - // } - - // event consume - msgs, err := ebCh.Consume( - c.EventBusQueue, // queue - "", // consumer - false, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - if err != nil { - logger.Log("Failed to register a EventBus consumer: %s", err) - os.Exit(1) - } - - forever := make(chan struct{}) - go func() { - sigint := make(chan os.Signal, 1) - signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - <-sigint - - logger.Log("Worker %s stopped working...\n", c.GetAppFullName()) - - close(forever) - }() - - go func() { - for d := range msgs { - go func(d amqp.Delivery) { - msg, err := rabbitmq.Deserialize(d.Body) - if err != nil { - logger.Log("json error: %v\n", err) - d.Reject(false) // FIXME: how to handle erros in queue...???? - return - } - - eName := fmt.Sprintf("%s", msg["event"]) - data := (msg["data"]).(map[string]interface{}) - logger.Log("Message<%s>: %s\n", eName, data) - - switch true { - case strings.Contains(eName, event.EVENT_WAREHOUSE_STOCK_UPDATED): // todo: demo consumer... - logger.Log("Event: %s", event.EVENT_WAREHOUSE_STOCK_UPDATED) - } - - logger.Log("ACK: %s", eName) - d.Ack(false) - }(d) - } - }() - - logger.Log("Waiting for messages...") - <-forever -} - -func updateKVConfig(s *discovery.Service, oldCnf *server.Config) error { // FIXME: duplicated in internal/app/server/server.go - data, _, err := s.KV().Get(oldCnf.KVNamespace, nil) - if err != nil { - return err - } - - if data == nil { - return errors.New("empty KV config data. Skipping") - } - - buf := bytes.NewBuffer(data.Value) - decoder := json.NewDecoder(buf) - if err := decoder.Decode(oldCnf); err != nil { - return err - } - - return nil + while := make(chan struct{}) + wrk.Start(while) + <-while } diff --git a/src/go.mod b/src/go.mod index 6af2ace..55e5103 100644 --- a/src/go.mod +++ b/src/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( git.pbiernat.dev/egommerce/api-entities v0.0.26 - git.pbiernat.dev/egommerce/go-api-pkg v0.0.148 + git.pbiernat.dev/egommerce/go-api-pkg v0.0.151 github.com/georgysavva/scany/v2 v2.0.0 github.com/go-pg/migrations/v8 v8.1.0 github.com/go-pg/pg/v10 v10.10.7 diff --git a/src/go.sum b/src/go.sum index 6ba026e..6310fca 100644 --- a/src/go.sum +++ b/src/go.sum @@ -59,6 +59,8 @@ git.pbiernat.dev/egommerce/go-api-pkg v0.0.147 h1:wJ1D88iRnO6BHSiqtO3m7onFPPDBJ9 git.pbiernat.dev/egommerce/go-api-pkg v0.0.147/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0= git.pbiernat.dev/egommerce/go-api-pkg v0.0.148 h1:FyT0tfUUxMPeOEz44oYgMV13BgCU1i/TYH2NysgINws= git.pbiernat.dev/egommerce/go-api-pkg v0.0.148/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0= +git.pbiernat.dev/egommerce/go-api-pkg v0.0.151 h1:MKf+tka3Bhh4Zbn5cLqO6H39gsf7el/GUT8ittaIujM= +git.pbiernat.dev/egommerce/go-api-pkg v0.0.151/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= diff --git a/src/internal/app/server/router.go b/src/internal/app/server/router.go deleted file mode 100644 index 577e355..0000000 --- a/src/internal/app/server/router.go +++ /dev/null @@ -1,55 +0,0 @@ -package server - -import ( - "strings" - - "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" - "github.com/gofiber/fiber/v2" - "github.com/gofiber/fiber/v2/middleware/cors" -) - -var ( - defaultCORS = cors.New(cors.Config{ - AllowOrigins: "*", - AllowCredentials: true, - AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS", - AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id", - }) -) - -func SetupRoutes(s *Server) { - s.App.Options("*", defaultCORS) - - s.App.Get("/health", s.HealthHandler) - s.App.Get("/config", s.ConfigHandler) - - api := s.App.Group("/api") - v1 := api.Group("/v1") - v1.Get("/product", s.GetProductListHandler) - // v1.Get("/product/:productId", s.GetProductHandler) - v1.Post("/product", s.AddProductToBasketHandler) - v1.Delete("/product", s.RemoveProductFromBasketHandler) -} - -func SetupMiddlewares(s *Server) { - s.App.Use(defaultCORS) - s.App.Use(LoggingMiddleware(s.log)) -} - -// Middlewares -func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error { - return func(c *fiber.Ctx) error { - path := string(c.Request().URI().Path()) - if strings.Contains(path, "/health") { - return c.Next() - } - - log.Log("Request: %s, ReqHeaders: %v, remote: %s, via: %s", - c.Request().URI().String(), - c.GetReqHeaders(), - c.Context().RemoteIP().String(), - string(c.Context().UserAgent())) - - return c.Next() - } -} diff --git a/src/internal/app/server/server.go b/src/internal/app/server/server.go deleted file mode 100644 index 282f0cf..0000000 --- a/src/internal/app/server/server.go +++ /dev/null @@ -1,179 +0,0 @@ -package server - -import ( - "bytes" - "context" - "encoding/json" - "os" - "os/signal" - "syscall" - "time" - - "github.com/go-redis/redis/v8" - "github.com/gofiber/fiber/v2" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/streadway/amqp" - - def "git.pbiernat.dev/egommerce/api-entities/http" - discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul" - "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" -) - -type Server struct { - *fiber.App - conf *Config - log *fluentd.Logger - db *pgxpool.Pool - cache *redis.Client - ebCh *amqp.Channel - discovery *discovery.Service - name string - addr string - kvNmspc string -} - -type Headers struct { - RequestID string `reqHeader:"x-request-id"` -} - -func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, cache *redis.Client, ebCh *amqp.Channel) *Server { - consul, err := discovery.NewService(conf.RegistryAddr, conf.AppID, conf.AppName, conf.AppID, conf.AppDomain, conf.PathPrefix, conf.Port) - if err != nil { - logger.Log("Error connecting to %s: %v", conf.RegistryAddr, err) - } - - logger.Log("Registering service with name: %s, address: %s", consul.Name, consul.Address) - err = consul.Register() - if err != nil { - logger.Log("register error: %v", err) - } - - cnf := fiber.Config{ - AppName: conf.AppName, - ServerHeader: conf.AppName, - ReadTimeout: time.Millisecond * 50, - WriteTimeout: time.Millisecond * 50, - IdleTimeout: time.Millisecond * 50, - } - s := &Server{ - fiber.New(cnf), - conf, - logger, - db, - cache, - ebCh, - consul, - conf.AppName, - conf.NetAddr, - conf.KVNamespace, - } - - go func(s *Server) { // Consul KV config updater - interval := time.Second * 15 - ticker := time.NewTicker(interval) - for range ticker.C { - s.updateKVConfig() - } - }(s) - - go func(s *Server) { // Server metadata cache updater - interval := time.Second * 5 - ticker := time.NewTicker(interval) - for range ticker.C { - s.cacheMetadata() - } - }(s) - - SetupMiddlewares(s) - SetupRoutes(s) - - return s -} - -func (s *Server) Start() { - err := s.Listen(s.addr) - s.log.Log("Starting error: %v", err) -} - -func (s *Server) StartWithGracefulShutdown(forever chan struct{}) { - go func() { - sigint := make(chan os.Signal, 1) - signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - <-sigint - - if err := s.gracefulShutdown(); err != nil { - s.log.Log("Server is not shutting down! Reason: %v", err) - } - - close(forever) - }() - - if err := s.Listen(s.addr); err != nil { - s.log.Log("Server is not running! Reason: %v", err) - } - - <-forever -} - -// GetRequestID Return current requets ID - works only when fiber context are running -func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) { - var hdr = new(Headers) - if err := c.ReqHeaderParser(hdr); err != nil { - return "", err - } - - return hdr.RequestID, nil -} - -func (s *Server) Error400(c *fiber.Ctx, msg string) error { - return c.Status(fiber.StatusBadRequest).JSON(&def.ErrorResponse{Error: msg}) -} - -func (s *Server) Error404(c *fiber.Ctx, msg string) error { - return c.Status(fiber.StatusNotFound).JSON(&def.ErrorResponse{Error: msg}) -} - -func (s *Server) updateKVConfig() { // FIXME: duplicated in cmd/worker/main.go - config, _, err := s.discovery.KV().Get(s.kvNmspc, nil) - if err != nil || config == nil { - return - } - - kvCnf := bytes.NewBuffer(config.Value) - decoder := json.NewDecoder(kvCnf) - if err := decoder.Decode(&s.conf); err != nil { - return - } -} - -func (s *Server) cacheMetadata() { - ctx := context.Background() - key, address := "internal__"+s.conf.AppName+"__ips", s.conf.AppID // FIXME: key name - - pos := s.cache.LPos(ctx, key, address, redis.LPosArgs{}).Val() - if pos >= 0 { - s.cache.LRem(ctx, key, 0, address) - } - - s.cache.LPush(ctx, key, address).Err() -} - -func (s *Server) clearMetadataCache() { - ctx := context.Background() - key, address := "internal__"+s.conf.AppName+"__ips", s.conf.AppID // FIXME: key name - - s.cache.LRem(ctx, key, 0, address) -} - -func (s *Server) gracefulShutdown() error { - s.log.Log("Server is going down...") - s.log.Log("Unregistering service: %s", s.discovery.GetID()) - s.discovery.Unregister() - s.clearMetadataCache() - - s.ebCh.Close() - s.db.Close() - s.log.Close() - - return s.Shutdown() -} diff --git a/src/internal/config/config.go b/src/internal/config/config.go new file mode 100644 index 0000000..badb206 --- /dev/null +++ b/src/internal/config/config.go @@ -0,0 +1,62 @@ +package common + +import ( + "os" + + cnf "git.pbiernat.dev/egommerce/catalog-service/pkg/config" + srv "git.pbiernat.dev/egommerce/catalog-service/pkg/server" +) + +const ( + // defAppDomain = "catalog-svc" + defAppName = "catalog-svc" + defCacheAddr = "api-cache:6379" + defCachePassword = "12345678" + defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce" + defEventBusURL = "amqp://guest:guest@api-eventbus:56721" + defKVNmspc = "dev.egommerce/service/catalog" + defLoggerAddr = "api-logger:24224" + defNetAddr = ":80" + defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017" + defPathPrefix = "/catalog" + defRegistryAddr = "api-registry:8500" + defEbEventsExchange = "api-events" + defEbEventsQueue = "catalog-svc" +) + +type Config struct { + Base *srv.Config + + DbURL string `json:"db_url"` + CacheAddr string `json:"cache_addr"` + CachePassword string `json:"cache_password"` + EventBusExchange string `json:"eventbus_exchange"` + EventBusQueue string `json:"eventbus_queue"` + EventBusURL string `json:"eventbus_url"` + LoggerAddr string `json:"logger_addr"` + KVNamespace string + RegistryAddr string + + // Fields with JSON mappings are available through Consul KV storage +} + +func NewConfig(name string) *Config { + c := new(Config) + c.Base = new(srv.Config) + + c.Base.AppID, _ = os.Hostname() + c.Base.AppName = name + c.Base.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr) + c.Base.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix) + + c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr) + c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword) + c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL) + c.EventBusExchange = defEbEventsExchange + c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL) + c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc) + c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr) + c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr) + + return c +} diff --git a/src/internal/app/database/connect.go b/src/internal/database/connect.go similarity index 100% rename from src/internal/app/database/connect.go rename to src/internal/database/connect.go diff --git a/src/internal/app/event/basket.go b/src/internal/event/basket.go similarity index 79% rename from src/internal/app/event/basket.go rename to src/internal/event/basket.go index 7eaa747..ee4c6c0 100644 --- a/src/internal/app/event/basket.go +++ b/src/internal/event/basket.go @@ -1,18 +1,18 @@ package event const ( - EVENT_PRODUCT_ADDED_TO_BASKET = "event.ProductAddedToBasketEvent" - EVENT_PRODUCT_REMOVED_FROM_BASKET = "event.ProductRemovedFromBasketEvent" + EVENT_PRODUCT_ADDED_TO_BASKET = "event.ProductAddedToBasket" + EVENT_PRODUCT_REMOVED_FROM_BASKET = "event.ProductRemovedFromBasket" ) -type ProductAddedToBasketEvent struct { +type ProductAddedToBasket struct { *Event ProductID int `json:"product_id"` Quantity int `json:"quantity"` BasketID string `json:"basket_id"` } -type ProductRemovedFromBasketEvent struct { +type ProductRemovedFromBasket struct { *Event ProductID int `json:"product_id"` Quantity int `json:"quantity"` diff --git a/src/internal/app/event/event.go b/src/internal/event/event.go similarity index 55% rename from src/internal/app/event/event.go rename to src/internal/event/event.go index 3dd7a42..ac529c4 100644 --- a/src/internal/app/event/event.go +++ b/src/internal/event/event.go @@ -1,11 +1,13 @@ package event type Event struct { + Command string `json:"command"` RequestID string `json:"request_id"` } -func NewEvent(reqID string) *Event { +func NewEvent(command, reqID string) *Event { em := new(Event) + em.Command = command em.RequestID = reqID return em diff --git a/src/internal/app/event/warehouse.go b/src/internal/event/warehouse.go similarity index 100% rename from src/internal/app/event/warehouse.go rename to src/internal/event/warehouse.go diff --git a/src/internal/app/log.go b/src/internal/log.go similarity index 100% rename from src/internal/app/log.go rename to src/internal/log.go diff --git a/src/internal/app/server/catalog_handler.go b/src/internal/server/catalog_handler.go similarity index 61% rename from src/internal/app/server/catalog_handler.go rename to src/internal/server/catalog_handler.go index 3de2527..378fb6c 100644 --- a/src/internal/app/server/catalog_handler.go +++ b/src/internal/server/catalog_handler.go @@ -7,14 +7,14 @@ import ( "github.com/google/uuid" def "git.pbiernat.dev/egommerce/api-entities/http" - "git.pbiernat.dev/egommerce/catalog-service/internal/app/service" - "git.pbiernat.dev/egommerce/catalog-service/internal/app/ui" + "git.pbiernat.dev/egommerce/catalog-service/internal/service" + "git.pbiernat.dev/egommerce/catalog-service/internal/ui" ) func (s *Server) GetProductListHandler(c *fiber.Ctx) error { - reqID, _ := s.GetRequestID(c) + reqID, _ := s.Base.GetRequestID(c) req := new(def.GetProductListRequest) - catalogSrv := service.NewCatalogService(s.db, s.ebCh, s.log) + catalogSrv := service.NewCatalogService(s.Database, s.Eventbus, s.Logger) res, err := ui.GetProductList(catalogSrv, req.CategoryID, reqID) if err != nil { return err @@ -24,10 +24,10 @@ func (s *Server) GetProductListHandler(c *fiber.Ctx) error { } func (s *Server) AddProductToBasketHandler(c *fiber.Ctx) error { - reqID, _ := s.GetRequestID(c) + reqID, _ := s.Base.GetRequestID(c) req := new(def.AddProductToBasketRequest) if err := c.BodyParser(req); err != nil { - return s.Error400(c, err.Error()) + return s.Base.Error(c, 400, err.Error()) } basketID := prepareBasket(c) @@ -36,24 +36,24 @@ func (s *Server) AddProductToBasketHandler(c *fiber.Ctx) error { qty = req.Quantity } - catalogSrv := service.NewCatalogService(s.db, s.ebCh, s.log) + catalogSrv := service.NewCatalogService(s.Database, s.Eventbus, s.Logger) res, err := ui.AddProductToBasket(catalogSrv, req.ProductID, qty, basketID, reqID) if err != nil { - s.log.Log("AddProductToBasketHandler error: ", err) + s.Logger.Log("AddProductToBasketHandler error: ", err) if res.ProductID == 0 { - return s.Error404(c, fmt.Sprintf("Product #%d not exists", req.ProductID)) + return s.Base.Error(c, 404, fmt.Sprintf("Product #%d not exists", req.ProductID)) } - return s.Error400(c, "Failed to add product to basket") + return s.Base.Error(c, 400, "Failed to add product to basket") } return c.JSON(res) } func (s *Server) RemoveProductFromBasketHandler(c *fiber.Ctx) error { - reqID, _ := s.GetRequestID(c) + reqID, _ := s.Base.GetRequestID(c) req := new(def.RemoveProductFromBasketRequest) if err := c.BodyParser(req); err != nil { - return s.Error400(c, err.Error()) + return s.Base.Error(c, 400, err.Error()) } basketID := prepareBasket(c) @@ -62,11 +62,11 @@ func (s *Server) RemoveProductFromBasketHandler(c *fiber.Ctx) error { qty = req.Quantity } - catalogSrv := service.NewCatalogService(s.db, s.ebCh, s.log) + catalogSrv := service.NewCatalogService(s.Database, s.Eventbus, s.Logger) res, err := ui.RemoveProductFromBasket(catalogSrv, req.ProductID, qty, basketID, reqID) if err != nil { - s.log.Log("RemoveProductFromBasketHandler error: ", err) - return s.Error400(c, "Failed to remove product from basket") + s.Logger.Log("RemoveProductFromBasketHandler error: ", err) + return s.Base.Error(c, 400, "Failed to remove product from basket") } return c.JSON(res) diff --git a/src/internal/app/server/config.go b/src/internal/server/config.go similarity index 100% rename from src/internal/app/server/config.go rename to src/internal/server/config.go diff --git a/src/internal/app/server/health_handler.go b/src/internal/server/health_handler.go similarity index 91% rename from src/internal/app/server/health_handler.go rename to src/internal/server/health_handler.go index 05b27b3..9ee28b1 100644 --- a/src/internal/app/server/health_handler.go +++ b/src/internal/server/health_handler.go @@ -12,5 +12,5 @@ func (s *Server) HealthHandler(c *fiber.Ctx) error { } func (s *Server) ConfigHandler(c *fiber.Ctx) error { - return c.JSON(s.conf) + return c.JSON(s.Config) } diff --git a/src/internal/server/middleware.go b/src/internal/server/middleware.go new file mode 100644 index 0000000..74e2cda --- /dev/null +++ b/src/internal/server/middleware.go @@ -0,0 +1,33 @@ +package server + +import ( + "strings" + + "github.com/gofiber/fiber/v2" + + "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" +) + +// "github.com/gofiber/fiber/v2" +// "github.com/gofiber/fiber/v2/middleware/cors" + +func SetupMiddleware(s *Server) { + s.Base.Use(defaultCORS) + s.Base.Use(LoggingMiddleware(s.Logger)) +} + +func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error { + return func(c *fiber.Ctx) error { + path := string(c.Request().URI().Path()) + if strings.Contains(path, "/health") { + return c.Next() + } + + log.Log("Request: %s, remote: %s, via: %s", + c.Request().URI().String(), + c.Context().RemoteIP().String(), + string(c.Context().UserAgent())) + + return c.Next() + } +} diff --git a/src/internal/server/router.go b/src/internal/server/router.go new file mode 100644 index 0000000..75e966c --- /dev/null +++ b/src/internal/server/router.go @@ -0,0 +1,40 @@ +package server + +import ( + "net/http" + + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/cors" +) + +var ( + defaultCORS = cors.New(cors.Config{ + AllowOrigins: "*", + AllowCredentials: true, + AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS", + AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id", + }) +) + +func SetupRouter(s *Server) { + s.Base.Options("*", defaultCORS) + + s.Base.Get("/health", s.HealthHandler) + s.Base.Get("/config", s.ConfigHandler) + + api := s.Base.Group("/api") + v1 := api.Group("/v1") + v1.Get("/product", s.GetProductListHandler) + // v1.Get("/product/:productId", s.GetProductHandler) + v1.Post("/product", s.AddProductToBasketHandler) + v1.Delete("/product", s.RemoveProductFromBasketHandler) +} + +func CORSPreflightMiddleware(c *fiber.Ctx) error { + if string(c.Request().Header.Method()) == http.MethodOptions { + c.Response().SetStatusCode(http.StatusOK) + c.Next() + } + + return c.Next() +} diff --git a/src/internal/server/server.go b/src/internal/server/server.go new file mode 100644 index 0000000..630553f --- /dev/null +++ b/src/internal/server/server.go @@ -0,0 +1,205 @@ +package server + +import ( + "bytes" + "context" + "encoding/json" + "log" + "os" + "strconv" + "time" + + "github.com/go-redis/redis/v8" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/streadway/amqp" + + "git.pbiernat.dev/egommerce/go-api-pkg/consul" + "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" + + db "git.pbiernat.dev/egommerce/catalog-service/pkg/database" + srv "git.pbiernat.dev/egommerce/catalog-service/pkg/server" + + cnf "git.pbiernat.dev/egommerce/catalog-service/internal/config" +) + +type ( + Server struct { + Base *srv.Server + Config *cnf.Config + + Cache *redis.Client + Database *pgxpool.Pool + Eventbus *amqp.Channel + Logger *fluentd.Logger + Registry *consul.Service + } + + OptionFn func(*Server) error // FIXME: similar in worker +) + +func New(c *cnf.Config, opts ...OptionFn) *Server { + svr := &Server{ + Base: srv.New(c.Base), + Config: c, + } + + for _, opt := range opts { + if err := opt(svr); err != nil { + log.Fatalf("Failed to attach extension to the server. Err: %v\n", err) + } + } + + SetupMiddleware(svr) + SetupRouter(svr) + + return svr +} + +func WithCache(c *cnf.Config) OptionFn { + redis := redis.NewClient(&redis.Options{ + Addr: c.CacheAddr, + Password: c.CachePassword, + DB: 0, + }) + + return func(s *Server) error { + s.Cache = redis + + return nil + } +} + +func WithDatabase(c *cnf.Config) OptionFn { + dbConn, err := db.Connect(c.DbURL) + if err != nil { + log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", c.DbURL, err) + os.Exit(1) + } + + return func(s *Server) error { + s.Database = dbConn + + return nil + } +} + +func WithEventbus(c *cnf.Config) OptionFn { + conn, err := amqp.Dial(c.EventBusURL) + if err != nil { + log.Fatalf("Failed to connect to the Eventbus: %s. Err: %v\n", c.EventBusURL, err) + } + + chn, err := conn.Channel() + if err != nil { + log.Fatalf("Failed to open new Eventbus channel. Err: %v\n", err) + } + + return func(s *Server) error { + s.Eventbus = chn + + return nil + } +} + +func WithLogger(c *cnf.Config) OptionFn { + return func(s *Server) error { + logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr) + if err != nil { + log.Fatalf("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err) + } + + logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort) + if err != nil { + log.Fatalf("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err) + } + + s.Logger = logger + + return nil + } +} + +func WithRegistry(c *cnf.Config) OptionFn { + return func(s *Server) error { + port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which will cause error + registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, c.Base.AppName, c.Base.PathPrefix, port) + if err != nil { + log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err) + } + + err = registry.Register() + if err != nil { + log.Fatalf("Failed to register in the Consul service. Err: %v", err) + } + + s.Registry = registry + + go func() { // Consul KV updater + ticker := time.NewTicker(time.Second * 15) + for range ticker.C { + updateKVConfig(s) + } + }() + + go func() { // Server metadata cache updater + ticker := time.NewTicker(time.Second * 5) + for range ticker.C { + s.cacheMetadata() + } + }() + + return nil + } +} + +func (s *Server) Shutdown() srv.PurgeFn { + return func(srv *srv.Server) error { + s.Logger.Log("Server %s is going down...", s.Base.AppID) + + s.Registry.Unregister() + s.clearMetadataCache() + s.Eventbus.Close() + s.Database.Close() + s.Logger.Log("Gone.") + s.Logger.Close() + + return s.Base.Shutdown() + } +} + +// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map +func updateKVConfig(s *Server) { // @FIXME: merge duplication in server.go and worker.go + config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil) + if err != nil || config == nil { + return + } + + kvCnf := bytes.NewBuffer(config.Value) + decoder := json.NewDecoder(kvCnf) + if err := decoder.Decode(&s.Config); err != nil { + return + } +} + +func (s *Server) cacheMetadata() { + ctx := context.Background() + key, address := s.getMetadataIPsKey(), s.Base.Config.AppID + + pos := s.Cache.LPos(ctx, key, address, redis.LPosArgs{}).Val() + if pos >= 0 { + s.Cache.LRem(ctx, key, 0, address) + } + + s.Cache.LPush(ctx, key, address).Err() +} + +func (s *Server) clearMetadataCache() { + ctx := context.Background() + key, address := s.getMetadataIPsKey(), s.Config.Base.AppID + + s.Cache.LRem(ctx, key, 0, address) +} + +func (s *Server) getMetadataIPsKey() string { + return "internal__" + s.Base.Config.AppName + "__ips" +} diff --git a/src/internal/app/service/catalog.go b/src/internal/service/catalog.go similarity index 84% rename from src/internal/app/service/catalog.go rename to src/internal/service/catalog.go index 4cb784d..bab428b 100644 --- a/src/internal/app/service/catalog.go +++ b/src/internal/service/catalog.go @@ -4,7 +4,7 @@ import ( "context" "git.pbiernat.dev/egommerce/api-entities/model" - "git.pbiernat.dev/egommerce/catalog-service/internal/app/event" + "git.pbiernat.dev/egommerce/catalog-service/internal/event" "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" "github.com/georgysavva/scany/v2/pgxscan" @@ -55,7 +55,12 @@ func (s *CatalogService) GetProductList(ctx context.Context, reqID string, categ func (s *CatalogService) AddProductToBasket(reqID, basketID string, productID, qty int) error { s.log.Log("Adding product #%d to the basket #%s", productID, basketID) - msg := &event.ProductAddedToBasketEvent{Event: event.NewEvent(reqID), BasketID: basketID, ProductID: productID, Quantity: qty} + msg := &event.ProductAddedToBasket{ + Event: event.NewEvent("AddProductToBasket", reqID), + BasketID: basketID, + ProductID: productID, + Quantity: qty, + } rabbitmq.Publish(s.ebCh, "api-events", "catalog.basket.productAddedToBasket", msg) rabbitmq.Publish(s.ebCh, "api-events", "catalog.warehouse.productAddedToBasket", msg) @@ -65,7 +70,12 @@ func (s *CatalogService) AddProductToBasket(reqID, basketID string, productID, q func (s *CatalogService) RemoveProductFromBasket(reqID, basketID string, productID, qty int) error { s.log.Log("Removed product#%s from basket#%s", productID, basketID) - msg := &event.ProductRemovedFromBasketEvent{Event: event.NewEvent(reqID), BasketID: basketID, ProductID: productID, Quantity: qty} + msg := &event.ProductRemovedFromBasket{ + Event: event.NewEvent("RemoveProductFromBasket", reqID), + BasketID: basketID, + ProductID: productID, + Quantity: qty, + } rabbitmq.Publish(s.ebCh, "api-events", "catalog.basket.productRemovedFromBasket", msg) rabbitmq.Publish(s.ebCh, "api-events", "catalog.warehouse.productRemovedFromBasket", msg) diff --git a/src/internal/app/ui/catalog.go b/src/internal/ui/catalog.go similarity index 95% rename from src/internal/app/ui/catalog.go rename to src/internal/ui/catalog.go index e8742c6..3faad9d 100644 --- a/src/internal/app/ui/catalog.go +++ b/src/internal/ui/catalog.go @@ -5,7 +5,7 @@ import ( "time" def "git.pbiernat.dev/egommerce/api-entities/http" - "git.pbiernat.dev/egommerce/catalog-service/internal/app/service" + "git.pbiernat.dev/egommerce/catalog-service/internal/service" ) func GetProductList(srv *service.CatalogService, categoryID int, reqID string) ([]def.GetProductResponse, error) { diff --git a/src/internal/worker/command.go b/src/internal/worker/command.go new file mode 100644 index 0000000..c675491 --- /dev/null +++ b/src/internal/worker/command.go @@ -0,0 +1,35 @@ +package worker + +import ( + "git.pbiernat.dev/egommerce/catalog-service/internal/service" +) + +var ( + StockUpdated = "event.WarehouseStockUpdatedEvent" +) + +type Command interface { + run(CommandData) (bool, any) +} + +type CommandData map[string]interface{} + +type CommandRunner struct { + cmd Command +} + +func (r *CommandRunner) run(data CommandData) (bool, any) { + return r.cmd.run(data) +} + +type StockUpdatedCommand struct { + srvc *service.CatalogService +} + +func (c *StockUpdatedCommand) run(data CommandData) (bool, any) { + // reqID := data["request_id"].(string) // FIXME Check input params! + // productID := int(data["product_id"].(float64)) // FIXME Check input params! + + // stock, err := ui.StockUpdated(c.srvc, productID, reqID) + return true, nil //err == nil, basket +} diff --git a/src/internal/worker/worker.go b/src/internal/worker/worker.go new file mode 100644 index 0000000..775640e --- /dev/null +++ b/src/internal/worker/worker.go @@ -0,0 +1,272 @@ +package worker + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/go-redis/redis/v8" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/streadway/amqp" + + "git.pbiernat.dev/egommerce/go-api-pkg/consul" + "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" + "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" + + "git.pbiernat.dev/egommerce/catalog-service/pkg/database" + + cnf "git.pbiernat.dev/egommerce/catalog-service/internal/config" + "git.pbiernat.dev/egommerce/catalog-service/internal/event" + "git.pbiernat.dev/egommerce/catalog-service/internal/service" +) + +type ( + Worker struct { + Config *cnf.Config + Cache *redis.Client + Database *pgxpool.Pool + Eventbus *amqp.Channel + Logger *fluentd.Logger + Registry *consul.Service + } + OptionFn func(*Worker) error // FIXME: similar in server/server.go +) + +func New(c *cnf.Config, opts ...OptionFn) *Worker { + wrk := &Worker{ + Config: c, + } + + for _, opt := range opts { + if err := opt(wrk); err != nil { + log.Fatalf("Failed to attach extension to the worker. Err: %v\n", err) + } + } + + return wrk +} + +func WithCache(c *cnf.Config) OptionFn { + return func(w *Worker) error { + conn := redis.NewClient(&redis.Options{ + Addr: c.CacheAddr, + Password: c.CachePassword, + DB: 0, + }) + + w.Cache = conn + + return nil + } +} + +func WithDatabase(c *cnf.Config) OptionFn { + return func(w *Worker) error { + conn, err := database.Connect(c.DbURL) + if err != nil { + w.Logger.Log("Failed to connect to Database server: %v\n", err) + os.Exit(1) + } + + w.Database = conn + + return nil + } +} + +func WithEventbus(c *cnf.Config) OptionFn { + return func(w *Worker) error { + _, chn, err := rabbitmq.Open(c.EventBusURL) + if err != nil { + w.Logger.Log("Failed to connect to EventBus server: %v\n", err) + os.Exit(1) + } + + err = rabbitmq.NewExchange(chn, c.EventBusExchange) + if err != nil { + w.Logger.Log("Failed to declare EventBus exchange: %v\n", err) + os.Exit(1) + } + + _, err = chn.QueueDeclare( + c.EventBusQueue, // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + w.Logger.Log("Failed to declare EventBus queue: %v\n", err) + os.Exit(1) + } + + // w.bindQueues() + rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "warehouse.catalog.stockUpdated") + + w.Eventbus = chn + + return nil + } +} + +func WithLogger(c *cnf.Config) OptionFn { + return func(w *Worker) error { + logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr) + if err != nil { + w.Logger.Log("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err) + os.Exit(1) + } + + logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort) + if err != nil { + w.Logger.Log("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err) + os.Exit(1) + } + + w.Logger = logger + + return nil + } +} + +func WithRegistry(c *cnf.Config) OptionFn { + return func(w *Worker) error { + registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0) + if err != nil { + w.Logger.Log("Error connecting to %s: %v", c.RegistryAddr, err) + } + + w.Registry = registry + + go func(w *Worker) { // Fetch Consul KV config and store it in app config + ticker := time.NewTicker(time.Second * 15) + for range ticker.C { + fetchKVConfig(w) + } + }(w) + + return nil + } +} + +// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map +func fetchKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go + config, _, err := w.Registry.KV().Get(w.Config.KVNamespace, nil) + if err != nil || config == nil { + return + } + + kvCnf := bytes.NewBuffer(config.Value) + decoder := json.NewDecoder(kvCnf) + if err := decoder.Decode(&w.Config); err != nil { + return + } +} + +func (w *Worker) Start(while chan struct{}) error { + go func() { + sigint := make(chan os.Signal, 1) + signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + <-sigint + + w.Shutdown() + + close(while) + }() + + err := w.doWork() + if err != nil { + log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.Config.Base.AppID, err) + close(while) + } + + w.Logger.Log("Waiting for messages...") + + return nil +} + +func (w *Worker) Shutdown() error { + w.Logger.Log("Worker %s is going down...", w.Config.Base.AppID) + + w.Registry.Unregister() + w.Eventbus.Close() + w.Database.Close() + w.Logger.Log("Gone.") + w.Logger.Close() + + return nil +} + +func (w *Worker) doWork() error { + msgs, err := w.Eventbus.Consume( + w.Config.EventBusQueue, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + w.Logger.Log("Failed to register a consumer: %s", err) + os.Exit(1) + } + + go func() { + catalogSrvc := service.NewCatalogService(w.Database, w.Eventbus, w.Logger) + + for d := range msgs { + go func(d amqp.Delivery) { + w.processMsg(catalogSrvc, d) + }(d) + } + }() + + return nil +} + +func (w *Worker) processMsg(srvc *service.CatalogService, d amqp.Delivery) { + msg, err := rabbitmq.Deserialize(d.Body) + if err != nil { + w.Logger.Log("Deserialization error: %v\n", err) + d.Reject(false) + + return + } + + rnr := &CommandRunner{} + name := fmt.Sprintf("%s", msg["event"]) + data := (msg["data"]).(map[string]interface{}) + // reqID := (data["request_id"]).(string) // FIXME Check input params! + + w.Logger.Log("Processing message \"%s\" with data: %v\n", name, data) + + var ok = false + switch true { // Refactor -> use case for polymorphism + case strings.Contains(name, event.EVENT_WAREHOUSE_STOCK_UPDATED): + w.Logger.Log("Event: %s", event.EVENT_WAREHOUSE_STOCK_UPDATED) + } + + ok, _ = rnr.run(data) + if ok { + w.Logger.Log("Successful executed message \"%s\"\n", name) + return + } + + if !ok { + w.Logger.Log("Error processing \"%s\": %s (%v)", name, err.Error(), err) + d.Reject(false) // FIXME: or Nack(repeat until success - maybe message shout know...? + + return + } + + w.Logger.Log("Finalized processing: %s", name) + d.Ack(false) +} diff --git a/src/internal/app/config/env.go b/src/pkg/config/config.go similarity index 100% rename from src/internal/app/config/env.go rename to src/pkg/config/config.go diff --git a/src/pkg/database/connect.go b/src/pkg/database/connect.go new file mode 100644 index 0000000..e8c889c --- /dev/null +++ b/src/pkg/database/connect.go @@ -0,0 +1,16 @@ +package database + +import ( + "context" + + "github.com/jackc/pgx/v5/pgxpool" +) + +func Connect(connStr string) (*pgxpool.Pool, error) { + pool, err := pgxpool.New(context.Background(), connStr) + if err != nil { + return nil, err + } + + return pool, nil +} diff --git a/src/pkg/server/config.go b/src/pkg/server/config.go new file mode 100644 index 0000000..fca21d3 --- /dev/null +++ b/src/pkg/server/config.go @@ -0,0 +1,21 @@ +package server + +import ( + "fmt" + "time" +) + +type Config struct { + AppID string + AppName string + NetAddr string + PathPrefix string + + IdleTimeout time.Duration // miliseconds + ReadTimeout time.Duration // miliseconds + WriteTimeout time.Duration // miliseconds +} + +func (c *Config) GetAppFullName() string { + return fmt.Sprintf("%s_%s", c.AppName, c.AppID) +} diff --git a/src/pkg/server/server.go b/src/pkg/server/server.go new file mode 100644 index 0000000..9f76a01 --- /dev/null +++ b/src/pkg/server/server.go @@ -0,0 +1,79 @@ +package server + +import ( + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/gofiber/fiber/v2" + + "git.pbiernat.dev/egommerce/api-entities/http" +) + +type ( + Server struct { + *fiber.App + *Config + + addr string // e.g. "127.0.0.1:8080" + // name string // e.g. "awesome-rest-api" + // kvNmspc string + } + HeaderRequestID struct { + RequestID string `reqHeader:"x-request-id"` + } + OptionFn func(*Server) error + PurgeFn func(*Server) error +) + +func New(conf *Config) *Server { + return &Server{ + App: fiber.New(fiber.Config{ + AppName: conf.AppID, + ServerHeader: conf.AppName, + ReadTimeout: conf.ReadTimeout * time.Millisecond, + WriteTimeout: conf.WriteTimeout * time.Millisecond, + IdleTimeout: conf.IdleTimeout * time.Millisecond, + }), + Config: conf, + addr: conf.NetAddr, + } +} + +func (s *Server) Start(while chan struct{}, prgFn PurgeFn) error { + go func() { + sigint := make(chan os.Signal, 1) + signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + <-sigint + + if err := prgFn(s); err != nil { + log.Fatalf("Failed to shutdown server. Reason: %v\n", err) + } + + close(while) + }() + + err := s.Listen(s.addr) + if err != nil { + log.Fatalf("Failed to start server: %s. Reason: %v\n", s.Config.AppID, err) + close(while) + } + <-while + + return err +} + +func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) { + var hdr = new(HeaderRequestID) + if err := c.ReqHeaderParser(hdr); err != nil { + return "", err + } + + return hdr.RequestID, nil +} + +func (s *Server) Error(c *fiber.Ctx, code int, msg string) error { + return c.Status(code).JSON(http.ErrorResponse{Error: msg}) +}