From 751e1c70e6a730f89010ac398e4013bb76f62564 Mon Sep 17 00:00:00 2001 From: Piotr Biernat Date: Tue, 4 Apr 2023 00:57:11 +0200 Subject: [PATCH] Refactoring - added Functional Option pattern --- src/cmd/migrate/main.go | 33 +- src/cmd/server/main.go | 146 ++++---- src/cmd/worker/main.go | 75 ++-- src/go.mod | 2 +- src/go.sum | 4 + src/internal/app/log.go | 20 -- src/internal/app/server/basket_handler.go | 31 +- src/internal/app/server/config.go | 32 -- src/internal/app/server/health_handler.go | 11 +- src/internal/app/server/middleware.go | 33 ++ src/internal/app/server/router.go | 35 +- src/internal/app/server/server.go | 319 +++++++++++------- src/internal/app/service/basket.go | 6 +- src/internal/common/config.go | 84 +++++ .../config/env.go => pkg/config/config.go} | 2 +- src/{internal/app => pkg}/database/connect.go | 0 src/pkg/server/config.go | 21 ++ src/pkg/server/server.go | 101 ++++++ 18 files changed, 599 insertions(+), 356 deletions(-) delete mode 100644 src/internal/app/log.go delete mode 100644 src/internal/app/server/config.go create mode 100644 src/internal/app/server/middleware.go create mode 100644 src/internal/common/config.go rename src/{internal/app/config/env.go => pkg/config/config.go} (72%) rename src/{internal/app => pkg}/database/connect.go (100%) create mode 100644 src/pkg/server/config.go create mode 100644 src/pkg/server/server.go diff --git a/src/cmd/migrate/main.go b/src/cmd/migrate/main.go index fecaf60..8894ff7 100644 --- a/src/cmd/migrate/main.go +++ b/src/cmd/migrate/main.go @@ -6,10 +6,13 @@ import ( "log" "os" - "git.pbiernat.dev/egommerce/basket-service/internal/app/config" - "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" "github.com/go-pg/migrations/v8" "github.com/go-pg/pg/v10" + + "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" + + "git.pbiernat.dev/egommerce/basket-service/internal/common" + cnf "git.pbiernat.dev/egommerce/basket-service/pkg/config" ) const ( @@ -32,16 +35,24 @@ Usage: ` func main() { - if config.ErrLoadingEnvs != nil { - log.Panicln("Error loading .env file", config.ErrLoadingEnvs) + if cnf.ErrLoadingEnvs != nil { + log.Panicln("Error loading .env file", cnf.ErrLoadingEnvs) } - // dbURL := config.GetEnv("DATABASE_URL", defDbURL) - loggerAddr := config.GetEnv("LOGGER_ADDR", defLoggerAddr) - mTblName := config.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName) + c := common.NewConfig() - logHost, logPort := fluentd.ParseAddr(loggerAddr) - logger := fluentd.NewLogger(defAppName, logHost, logPort) + // dbURL := cnf.GetEnv("DATABASE_URL", defDbURL) + mTblName := cnf.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName) + + logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr) + if err != nil { + log.Fatalf("Error parsing logger addr: %s. Err: %v", c.LoggerAddr, err) + } + + logger, err := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error) + if err != nil { + log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err) + } defer logger.Close() flag.Usage = usage @@ -56,8 +67,8 @@ func main() { mig := migrations.NewCollection() mig.SetTableName(mTblName) - err := mig.DiscoverSQLMigrations("./migrations") - if err != nil { + + if err := mig.DiscoverSQLMigrations("./migrations"); err != nil { logger.Log("migration dicovery error: %#v", err) } diff --git a/src/cmd/server/main.go b/src/cmd/server/main.go index 83d60ff..c078125 100644 --- a/src/cmd/server/main.go +++ b/src/cmd/server/main.go @@ -1,97 +1,83 @@ package main import ( + "fmt" "log" "os" - "strconv" - "git.pbiernat.dev/egommerce/basket-service/internal/app/config" - "git.pbiernat.dev/egommerce/basket-service/internal/app/database" + cnf "git.pbiernat.dev/egommerce/basket-service/pkg/config" + "git.pbiernat.dev/egommerce/basket-service/internal/app/server" - "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" - amqp "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" - "github.com/go-redis/redis/v8" -) - -const ( - defAppName = "basket-svc" - defAppDomain = "basket-svc" - defPathPrefix = "/basket" - defNetAddr = ":80" - defLoggerAddr = "api-logger:24224" - defRegistryAddr = "api-registry:8500" - defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce" - defCacheAddr = "api-cache:6379" - defCachePassword = "12345678" - defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017" - defEventBusURL = "amqp://guest:guest@api-eventbus:5672" - ebEventsExchange = "api-events" - ebEventsQueue = "basket-svc" - defKVNmspc = "dev.egommerce/service/basket-svc" + "git.pbiernat.dev/egommerce/basket-service/internal/common" ) func main() { - if config.ErrLoadingEnvs != nil { - log.Panicln("Error loading .env file", config.ErrLoadingEnvs) + if cnf.ErrLoadingEnvs != nil { + log.Panicln("Error loading .env file", cnf.ErrLoadingEnvs) } - c := new(server.Config) - c.AppID, _ = os.Hostname() - c.AppName = config.GetEnv("APP_NAME", defAppName) - c.AppDomain = config.GetEnv("APP_DOMAIN", defAppDomain) - c.PathPrefix = config.GetEnv("APP_PATH_PREFIX", defPathPrefix) - c.NetAddr = config.GetEnv("SERVER_ADDR", defNetAddr) - c.Port, _ = strconv.Atoi(c.NetAddr[1:]) - c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr) - c.RegistryAddr = config.GetEnv("REGISTRY_ADDR", defRegistryAddr) - c.DbURL = config.GetEnv("DATABASE_URL", defDbURL) - c.CacheAddr = config.GetEnv("CACHE_ADDR", defCacheAddr) - c.CachePassword = config.GetEnv("CACHE_PASSWORD", defCachePassword) - c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL) - c.EventBusExchange = ebEventsExchange - c.KVNamespace = config.GetEnv("APP_KV_NAMESPACE", defKVNmspc) - - logHost, logPort := fluentd.ParseAddr(c.LoggerAddr) - logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) - defer logger.Close() - - // db conn - dbConn, err := database.Connect(c.DbURL) - if err != nil { // fixme: add wait-for-db... - logger.Log("Failed to connect to Database server: %v\n", err) - os.Exit(1) - } - defer dbConn.Close() - - // redis conn - redis := redis.NewClient(&redis.Options{ - Addr: c.CacheAddr, - Password: c.CachePassword, - DB: 0, - }) - defer redis.Close() - - // eventbus conn - ebConn, ebCh, err := amqp.Open(c.EventBusURL) - if err != nil { - logger.Log("Failed to connect to EventBus server: %v\n", err) - os.Exit(1) - } - defer ebCh.Close() - defer amqp.Close(ebConn) - - err = amqp.NewExchange(ebCh, c.EventBusExchange) - if err != nil { - logger.Log("Failed to declare EventBus exchange: %v\n", err) - os.Exit(1) - } - - // start server - srv := server.NewServer(c, logger, dbConn, redis, ebCh) + c := common.NewConfig() + srv := server.New( + c, + server.WithCache(c), + server.WithDatabase(c), + server.WithEventbus(c), + server.WithLogger(c), + server.WithRegistry(c), + ) forever := make(chan struct{}) - srv.StartWithGracefulShutdown(forever) + + err := srv.Base.Start(forever, srv.Shutdown()) + // server.SetupMiddleware(srv) + // server.SetupRouter(srv) + <-forever - // os.Exit(1) + if err != nil { + log.Fatalf("Failed to start server. Reason: %v\n", err) + os.Exit(1) + } + + fmt.Println("Done.") + os.Exit(0) + + // c.AppDomain = cnf.GetEnv("APP_DOMAIN", defAppDomain) + // c.Port, _ = strconv.Atoi(c.NetAddr[1:]) + // c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc) + + // logHost, logPort := fluentd.ParseAddr(c.LoggerAddr) + // logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) + // defer logger.Close() + + // db conn + // dbConn, err := database.Connect(c.DbURL) + // if err != nil { // fixme: add wait-for-db... + // logger.Log("Failed to connect to Database server: %v\n", err) + // os.Exit(1) + // } + // defer dbConn.Close() + + // redis conn + // redis := redis.NewClient(&redis.Options{ + // Addr: c.CacheAddr, + // Password: c.CachePassword, + // DB: 0, + // }) + // defer redis.Close() + + // eventbus conn + // ebConn, ebCh, err := amqp.Open(c.EventBusURL) + // if err != nil { + // logger.Log("Failed to connect to EventBus server: %v\n", err) + // os.Exit(1) + // } + // defer ebCh.Close() + // defer amqp.Close(ebConn) + + // err = amqp.NewExchange(ebCh, c.EventBusExchange) + // if err != nil { + // logger.Log("Failed to declare EventBus exchange: %v\n", err) + // os.Exit(1) + // } } diff --git a/src/cmd/worker/main.go b/src/cmd/worker/main.go index e9d9fd3..8924267 100644 --- a/src/cmd/worker/main.go +++ b/src/cmd/worker/main.go @@ -1,9 +1,6 @@ package main import ( - "bytes" - "encoding/json" - "errors" "fmt" "log" "os" @@ -12,17 +9,19 @@ import ( "syscall" "time" - "git.pbiernat.dev/egommerce/basket-service/internal/app/config" - "git.pbiernat.dev/egommerce/basket-service/internal/app/database" - "git.pbiernat.dev/egommerce/basket-service/internal/app/event" - "git.pbiernat.dev/egommerce/basket-service/internal/app/server" - "git.pbiernat.dev/egommerce/basket-service/internal/app/service" - "git.pbiernat.dev/egommerce/basket-service/internal/app/ui" - discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul" "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" "github.com/go-redis/redis/v8" "github.com/streadway/amqp" + + "git.pbiernat.dev/egommerce/go-api-pkg/consul" + + "git.pbiernat.dev/egommerce/basket-service/internal/app/event" + "git.pbiernat.dev/egommerce/basket-service/internal/app/service" + "git.pbiernat.dev/egommerce/basket-service/internal/app/ui" + "git.pbiernat.dev/egommerce/basket-service/internal/common" + "git.pbiernat.dev/egommerce/basket-service/pkg/config" + "git.pbiernat.dev/egommerce/basket-service/pkg/database" ) const ( @@ -44,34 +43,30 @@ func main() { log.Panicln("Error loading .env file", config.ErrLoadingEnvs) } - c := new(server.Config) - c.AppID, _ = os.Hostname() - c.AppName = config.GetEnv("APP_NAME", defAppName) - c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr) - c.RegistryAddr = config.GetEnv("REGISTRY_ADDR", defRegistryAddr) - c.DbURL = config.GetEnv("DATABASE_URL", defDbURL) - c.CacheAddr = config.GetEnv("CACHE_ADDR", defCacheAddr) - c.CachePassword = config.GetEnv("CACHE_PASSWORD", defCachePassword) - c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL) - c.EventBusExchange = ebEventsExchange - c.EventBusQueue = ebEventsQueue - c.KVNamespace = config.GetEnv("APP_KV_NAMESPACE", defKVNmspc) + c := common.NewConfig() + logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr) + if err != nil { + log.Fatalf("Error parsing logger addr: %s. Err: %v", c.LoggerAddr, err) + } - logHost, logPort := fluentd.ParseAddr(c.LoggerAddr) - logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) + logger, err := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error) + if err != nil { + log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err) + } defer logger.Close() - consul, err := discovery.NewService(c.RegistryAddr, c.AppID, c.AppName, c.AppID, "", "", 0) + // consul, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0) + registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0) if err != nil { logger.Log("Error connecting to %s: %v", c.RegistryAddr, err) } - go func(consul *discovery.Service) { + go func(consul *consul.Service) { ticker := time.NewTicker(time.Second * 15) for range ticker.C { updateKVConfig(consul, c) // FIXME: duplicated in internal/app/server/server.go } - }(consul) + }(registry) // db conn dbConn, err := database.Connect(c.DbURL) @@ -202,21 +197,21 @@ func main() { <-forever } -func updateKVConfig(s *discovery.Service, oldCnf *server.Config) error { // FIXME: duplicated in internal/app/server/server.go - data, _, err := s.KV().Get(oldCnf.KVNamespace, nil) - if err != nil { - return err - } +func updateKVConfig(s *consul.Service, oldCnf *common.Config) error { // FIXME: duplicated in internal/app/server/server.go + // data, _, err := s.KV().Get(oldCnf.KVNamespace, nil) + // if err != nil { + // return err + // } - if data == nil { - return errors.New("empty KV config data") - } + // if data == nil { + // return errors.New("empty KV config data") + // } - buf := bytes.NewBuffer(data.Value) - decoder := json.NewDecoder(buf) - if err := decoder.Decode(oldCnf); err != nil { - return err - } + // buf := bytes.NewBuffer(data.Value) + // decoder := json.NewDecoder(buf) + // if err := decoder.Decode(oldCnf); err != nil { + // return err + // } return nil } diff --git a/src/go.mod b/src/go.mod index 4bb84ca..ee0ac61 100644 --- a/src/go.mod +++ b/src/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( git.pbiernat.dev/egommerce/api-entities v0.0.26 - git.pbiernat.dev/egommerce/go-api-pkg v0.0.136 + git.pbiernat.dev/egommerce/go-api-pkg v0.0.150 github.com/georgysavva/scany/v2 v2.0.0 github.com/go-pg/migrations/v8 v8.1.0 github.com/go-pg/pg/v10 v10.10.7 diff --git a/src/go.sum b/src/go.sum index af6828c..a0201ad 100644 --- a/src/go.sum +++ b/src/go.sum @@ -3,6 +3,10 @@ git.pbiernat.dev/egommerce/api-entities v0.0.26 h1:Avz02GINwuYWOjw1fmZIJ3QgGEIz3 git.pbiernat.dev/egommerce/api-entities v0.0.26/go.mod h1:+BXvUcr6Cr6QNpJsW8BUfe1vVILdWDADNE0e3u0lNvU= git.pbiernat.dev/egommerce/go-api-pkg v0.0.136 h1:SzJRAkqJKdng/3d0V7o/R0yGh7QaZynPBn/P++on9RA= git.pbiernat.dev/egommerce/go-api-pkg v0.0.136/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0= +git.pbiernat.dev/egommerce/go-api-pkg v0.0.149 h1:7K4z/XUMAPrnvOPcFfkeeNCpuc5IHoC1Pd68Ht7q9Ts= +git.pbiernat.dev/egommerce/go-api-pkg v0.0.149/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0= +git.pbiernat.dev/egommerce/go-api-pkg v0.0.150 h1:DMM3Kxb6HNw4BExA7Ss7P9ivs+TIeO9DxjfHPKeWrSg= +git.pbiernat.dev/egommerce/go-api-pkg v0.0.150/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= diff --git a/src/internal/app/log.go b/src/internal/app/log.go deleted file mode 100644 index 1728b1a..0000000 --- a/src/internal/app/log.go +++ /dev/null @@ -1,20 +0,0 @@ -package app - -import ( - "log" -) - -const AppName = "basket-svc" - -func Panic(v ...any) { - log.Panicln(AppName+":", v) -} - -func Panicf(format string, v ...any) { - log.Panicf(AppName+": "+format, v...) -} - -func Panicln(v ...any) { - v = append([]any{AppName + ":"}, v...) - log.Panicln(v...) -} diff --git a/src/internal/app/server/basket_handler.go b/src/internal/app/server/basket_handler.go index f4f3e6a..10905df 100644 --- a/src/internal/app/server/basket_handler.go +++ b/src/internal/app/server/basket_handler.go @@ -1,30 +1,31 @@ package server +// REFACTOR: APP DEDICATED CODE import ( "context" "time" - def "git.pbiernat.dev/egommerce/api-entities/http" + "git.pbiernat.dev/egommerce/api-entities/http" "git.pbiernat.dev/egommerce/basket-service/internal/app/service" "git.pbiernat.dev/egommerce/basket-service/internal/app/ui" "github.com/gofiber/fiber/v2" ) func (s *Server) GetBasketHandler(c *fiber.Ctx) error { - req := new(def.GetBasketRequest) + req := new(http.GetBasketRequest) if err := c.BodyParser(req); err != nil { - return s.Error400(c, err.Error()) + return s.Base.Error400(c, err.Error()) } basketID := req.BasketID - basketSrv := service.NewBasketService(s.db, s.cache, s.ebCh, s.log) + basketSrv := service.NewBasketService(s.Database, s.Cache, s.Eventbus, s.Logger) ctx := context.Background() basket, err := basketSrv.FetchFromDB(ctx, basketID) if err != nil { - return s.Error400(c, "Failed to retrieve basket") + return s.Base.Error400(c, "Failed to retrieve basket") } - res := &def.GetBasketResponse{ + res := &http.GetBasketResponse{ ID: basket.ID, State: basket.State, CreatedAt: time.Duration(basket.CreatedAt.Time.Unix()), @@ -38,16 +39,16 @@ func (s *Server) GetBasketHandler(c *fiber.Ctx) error { func (s *Server) GetBasketItemsHandler(c *fiber.Ctx) error { basketID := c.Params("basketId", "") - basketSrv := service.NewBasketService(s.db, s.cache, s.ebCh, s.log) + basketSrv := service.NewBasketService(s.Database, s.Cache, s.Eventbus, s.Logger) ctx := context.Background() items, err := basketSrv.FetchItems(ctx, basketID) if err != nil { - return s.Error400(c, "Failed to retrieve basket items") + return s.Base.Error400(c, "Failed to retrieve basket items") } - var res []*def.GetBasketItemsResponse // FIXME + var res []*http.GetBasketItemsResponse for _, item := range items { - resItem := &def.GetBasketItemsResponse{ + resItem := &http.GetBasketItemsResponse{ ID: item.ID, BasketID: item.BasketID, ProductID: item.ProductID, @@ -65,17 +66,17 @@ func (s *Server) GetBasketItemsHandler(c *fiber.Ctx) error { } func (s *Server) CheckoutHandler(c *fiber.Ctx) error { - reqID, _ := s.GetRequestID(c) - req := new(def.BasketCheckoutRequest) + reqID, _ := s.Base.GetRequestID(c) + req := new(http.BasketCheckoutRequest) if err := c.BodyParser(req); err != nil { - return s.Error400(c, err.Error()) + return s.Base.Error400(c, err.Error()) } basketID := req.BasketID - basketSrv := service.NewBasketService(s.db, s.cache, s.ebCh, s.log) + basketSrv := service.NewBasketService(s.Database, s.Cache, s.Eventbus, s.Logger) res, err := ui.CheckoutBasket(basketSrv, basketID, reqID) if err != nil { - return s.Error400(c, "Failed to create order") + return s.Base.Error400(c, "Failed to create order") } return c.JSON(res) diff --git a/src/internal/app/server/config.go b/src/internal/app/server/config.go deleted file mode 100644 index 1940c7b..0000000 --- a/src/internal/app/server/config.go +++ /dev/null @@ -1,32 +0,0 @@ -package server - -import "fmt" - -type Config struct { - AppID string - AppName string - AppDomain string - PathPrefix string - NetAddr string - Port int - RegistryAddr string - KVNamespace string - - LoggerAddr string `json:"logger_addr"` - DbURL string `json:"db_url"` - CacheAddr string `json:"cache_addr"` - CachePassword string `json:"cache_password"` - MongoDbUrl string `json:"mongodb_url"` - EventBusURL string `json:"eventbus_url"` - EventBusExchange string `json:"eventbus_exchange"` - EventBusQueue string `json:"eventbus_queue"` - - HttpReadTimeout int `json:"http_read_timeout"` - HttpWriteTimeout int `json:"http_write_timeout"` - HttpIdleTimeout int `json:"http_idle_timeout"` - // Fields with json mapping are available trough ConsulKV -} - -func (c *Config) GetAppFullName() string { - return fmt.Sprintf("%s_%s", c.AppName, c.AppID) -} diff --git a/src/internal/app/server/health_handler.go b/src/internal/app/server/health_handler.go index 73d756e..8c0a64e 100644 --- a/src/internal/app/server/health_handler.go +++ b/src/internal/app/server/health_handler.go @@ -1,17 +1,18 @@ package server +// REFACTOR: UNIVERSAL SERVER CODE import ( "github.com/gofiber/fiber/v2" - def "git.pbiernat.dev/egommerce/api-entities/http" + "git.pbiernat.dev/egommerce/api-entities/http" ) -func (s *Server) HealthHandler(c *fiber.Ctx) error { - return c.JSON(&def.HealthResponse{ - Status: "OK", +func (s *Server) HealthHandler(c *fiber.Ctx) error { // TODO add necessary logic + return c.JSON(&http.HealthResponse{ + Status: "OKa", }) } func (s *Server) ConfigHandler(c *fiber.Ctx) error { - return c.JSON(s.conf) + return c.JSON(s.Config) } diff --git a/src/internal/app/server/middleware.go b/src/internal/app/server/middleware.go new file mode 100644 index 0000000..74e2cda --- /dev/null +++ b/src/internal/app/server/middleware.go @@ -0,0 +1,33 @@ +package server + +import ( + "strings" + + "github.com/gofiber/fiber/v2" + + "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" +) + +// "github.com/gofiber/fiber/v2" +// "github.com/gofiber/fiber/v2/middleware/cors" + +func SetupMiddleware(s *Server) { + s.Base.Use(defaultCORS) + s.Base.Use(LoggingMiddleware(s.Logger)) +} + +func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error { + return func(c *fiber.Ctx) error { + path := string(c.Request().URI().Path()) + if strings.Contains(path, "/health") { + return c.Next() + } + + log.Log("Request: %s, remote: %s, via: %s", + c.Request().URI().String(), + c.Context().RemoteIP().String(), + string(c.Context().UserAgent())) + + return c.Next() + } +} diff --git a/src/internal/app/server/router.go b/src/internal/app/server/router.go index 38cb6aa..df12031 100644 --- a/src/internal/app/server/router.go +++ b/src/internal/app/server/router.go @@ -1,10 +1,9 @@ package server +// REFACTOR: APP DEDICATED CODE import ( "net/http" - "strings" - "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/cors" ) @@ -18,24 +17,19 @@ var ( }) ) -func SetupRoutes(s *Server) { - s.App.Options("*", defaultCORS) +func SetupRouter(s *Server) { + s.Base.Options("*", defaultCORS) - s.App.Get("/health", s.HealthHandler) - s.App.Get("/config", s.ConfigHandler) + s.Base.Get("/health", s.HealthHandler) + s.Base.Get("/config", s.ConfigHandler) - api := s.App.Group("/api") + api := s.Base.Group("/api") v1 := api.Group("/v1") v1.Get("/basket", s.GetBasketHandler) v1.Get("/basket/:basketId/items", s.GetBasketItemsHandler) v1.Post("/checkout", s.CheckoutHandler) } -func SetupMiddlewares(s *Server) { - s.App.Use(defaultCORS) - s.App.Use(LoggingMiddleware(s.log)) -} - func CORSPreflightMiddleware(c *fiber.Ctx) error { if string(c.Request().Header.Method()) == http.MethodOptions { c.Response().SetStatusCode(http.StatusOK) @@ -44,20 +38,3 @@ func CORSPreflightMiddleware(c *fiber.Ctx) error { return c.Next() } - -// Middlewares -func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error { - return func(c *fiber.Ctx) error { - path := string(c.Request().URI().Path()) - if strings.Contains(path, "/health") { - return c.Next() - } - - log.Log("Request: %s, remote: %s, via: %s", - c.Request().URI().String(), - c.Context().RemoteIP().String(), - string(c.Context().UserAgent())) - - return c.Next() - } -} diff --git a/src/internal/app/server/server.go b/src/internal/app/server/server.go index 9fac46c..a7aa3c2 100644 --- a/src/internal/app/server/server.go +++ b/src/internal/app/server/server.go @@ -4,177 +4,258 @@ import ( "bytes" "context" "encoding/json" + "fmt" + "log" "os" - "os/signal" - "syscall" + "strconv" "time" "github.com/go-redis/redis/v8" - "github.com/gofiber/fiber/v2" "github.com/jackc/pgx/v5/pgxpool" "github.com/streadway/amqp" - def "git.pbiernat.dev/egommerce/api-entities/http" - discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul" + "git.pbiernat.dev/egommerce/go-api-pkg/consul" "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" + + db "git.pbiernat.dev/egommerce/basket-service/pkg/database" + srv "git.pbiernat.dev/egommerce/basket-service/pkg/server" + + "git.pbiernat.dev/egommerce/basket-service/internal/common" ) -type Server struct { - *fiber.App - conf *Config - log *fluentd.Logger - db *pgxpool.Pool - cache *redis.Client - ebCh *amqp.Channel - discovery *discovery.Service - name string - addr string - kvNmspc string -} +type ( + Server struct { + Base *srv.Server + Config *common.Config -type Headers struct { - RequestID string `reqHeader:"x-request-id"` -} + Cache *redis.Client + Database *pgxpool.Pool + Eventbus *amqp.Channel + Logger *fluentd.Logger + Registry *consul.Service -func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, cache *redis.Client, ebCh *amqp.Channel) *Server { - consul, err := discovery.NewService(conf.RegistryAddr, conf.AppID, conf.AppName, conf.AppID, conf.AppDomain, conf.PathPrefix, conf.Port) - if err != nil { - logger.Log("Error connecting to %s: %v", conf.RegistryAddr, err) + kvNmspc string } - logger.Log("Registering service with name: %s, address: %s", consul.Name, consul.Address) - err = consul.Register() - if err != nil { - logger.Log("register error: %v", err) + OptionFn func(*Server) error +) + +func New(c *common.Config, opts ...OptionFn) *Server { + svr := &Server{ + Base: srv.New(c.Base), } - cnf := fiber.Config{ - AppName: conf.AppName, - ServerHeader: conf.AppName, - ReadTimeout: time.Millisecond * 50, - WriteTimeout: time.Millisecond * 50, - IdleTimeout: time.Millisecond * 50, - } - s := &Server{ - fiber.New(cnf), - conf, - logger, - db, - cache, - ebCh, - consul, - conf.AppName, - conf.NetAddr, - conf.KVNamespace, - } - - go func(s *Server) { // Consul KV updater - interval := time.Second * 15 - ticker := time.NewTicker(interval) - for range ticker.C { - s.updateKVConfig() + for _, opt := range opts { + if err := opt(svr); err != nil { + log.Fatalf("Failed to start HTTP Server. Err: %v\n", err) } - }(s) + } - go func(s *Server) { // Server metadata cache updater - ticker := time.NewTicker(time.Second * 5) - for range ticker.C { - s.cacheMetadata() - } - }(s) + SetupMiddleware(svr) + SetupRouter(svr) - SetupMiddlewares(s) - SetupRoutes(s) - - return s + return svr } -// func (s *Server) Start() { -// err := s.Listen(s.addr) -// s.log.Log("Starting error: %v", err) +func WithCache(c *common.Config) OptionFn { + redis := redis.NewClient(&redis.Options{ + Addr: c.CacheAddr, + Password: c.CachePassword, + DB: 0, + }) + + return func(s *Server) error { + s.Cache = redis + + return nil + } + // defer redis.Close() +} + +func WithDatabase(c *common.Config) OptionFn { + dbConn, err := db.Connect(c.DbURL) + if err != nil { + log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", c.DbURL, err) + os.Exit(1) + } + + return func(s *Server) error { + s.Database = dbConn + + return nil + } + // defer dbConn.Close() +} + +func WithEventbus(c *common.Config) OptionFn { + conn, err := amqp.Dial(c.EventBusURL) + if err != nil { + log.Fatalf("Failed to connect to the Eventbus: %s. Err: %v\n", c.EventBusURL, err) + } + + chn, err := conn.Channel() + if err != nil { + log.Fatalf("Failed to open new Eventbus channel. Err: %v\n", err) + } + + return func(s *Server) error { + s.Eventbus = chn + + return nil + } + + // defer ebCh.Close() + // defer amqp.Close(ebConn) +} + +func WithLogger(c *common.Config) OptionFn { + logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr) + if err != nil { + log.Fatalf("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err) + } + + logger, err := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) + if err != nil { + log.Fatalf("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err) + } + + return func(s *Server) error { + s.Logger = logger + + return nil + } + // defer logger.Close() +} + +func WithRegistry(c *common.Config) OptionFn { + // fmt.Printf("WithRegistry constructor: config: %v", c.Base) + port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which now will cause error + registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, c.Base.AppName, c.Base.PathPrefix, port) + if err != nil { + log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err) + } + + err = registry.Register() + if err != nil { + log.Fatalf("Failed to register in the Consul service. Err: %v", err) + } + + return func(s *Server) error { + s.Registry = registry + + go func(s *Server) { // Consul KV updater + ticker := time.NewTicker(time.Second * 15) + for range ticker.C { + s.updateKVConfig() + } + }(s) + + go func(s *Server) { // Server metadata cache updater + ticker := time.NewTicker(time.Second * 5) + for range ticker.C { + s.cacheMetadata() + } + }(s) + + return nil + } +} + +// REFACTOR IN PROGRESS +// func (s *Server) Shutdown() error { +// s.Logger.Log("Server is going down... Unregistering service: %s", s.Discovery.GetID()) +// s.Discovery.Unregister() +// s.clearMetadataCache() + +// s.Cache.Close() +// s.Database.Close() +// s.Eventbus.Close() +// s.Logger.Close() + +// return nil // } -func (s *Server) StartWithGracefulShutdown(forever chan struct{}) { - go func() { - sigint := make(chan os.Signal, 1) - signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - <-sigint +func (s *Server) Shutdown() srv.PurgeFn { + return func(srv *srv.Server) error { + fmt.Printf("%v", s.Base) + // s.Logger.Log("Server is going down... Unregistering service: %s", s.Base.AppID) + s.Logger.Log("Server is going down... Unregistering service...") - if err := s.gracefulShutdown(); err != nil { - s.log.Log("Server is not shutting down! Reason: %v", err) - } + s.Registry.Unregister() + s.clearMetadataCache() + s.Eventbus.Close() + s.Database.Close() + s.Logger.Close() - close(forever) - }() - - if err := s.Listen(s.addr); err != nil { - s.log.Log("Server is not running! Reason: %v", err) + return s.Base.Shutdown() } - - <-forever } -func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) { - var hdr = new(Headers) - if err := c.ReqHeaderParser(hdr); err != nil { - return "", err - } +// END: REFACTOR IN PROGRESS - return hdr.RequestID, nil -} - -func (s *Server) Error400(c *fiber.Ctx, msg string) error { - return c.Status(fiber.StatusBadRequest).JSON(&def.ErrorResponse{Error: msg}) -} - -func (s *Server) Error404(c *fiber.Ctx, msg string) error { - return c.Status(fiber.StatusNotFound).JSON(&def.ErrorResponse{Error: msg}) -} - -func (s *Server) updateKVConfig() { // FIXME: duplicated in cmd/worker/main.go - config, _, err := s.discovery.KV().Get(s.kvNmspc, nil) +// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map +func (s *Server) updateKVConfig() { + config, _, err := s.Registry.KV().Get(s.kvNmspc, nil) if err != nil || config == nil { return } kvCnf := bytes.NewBuffer(config.Value) decoder := json.NewDecoder(kvCnf) - if err := decoder.Decode(&s.conf); err != nil { + if err := decoder.Decode(&s.Config); err != nil { return } } func (s *Server) cacheMetadata() { - ctx := context.Background() - key, address := s.getMetadataIPsKey(), s.conf.AppID + ctx := context.TODO() + key, address := s.getMetadataIPsKey(), s.Base.Config.AppID - pos := s.cache.LPos(ctx, key, address, redis.LPosArgs{}).Val() + pos := s.Cache.LPos(ctx, key, address, redis.LPosArgs{}).Val() if pos >= 0 { - s.cache.LRem(ctx, key, 0, address) + s.Cache.LRem(ctx, key, 0, address) } - s.cache.LPush(ctx, key, address).Err() + s.Cache.LPush(ctx, key, address).Err() } func (s *Server) clearMetadataCache() { - ctx := context.Background() - key, address := s.getMetadataIPsKey(), s.conf.AppID + ctx := context.TODO() + fmt.Printf("metadata: %v", s.Config.Base) + key, address := s.getMetadataIPsKey(), s.Config.Base.AppID - s.cache.LRem(ctx, key, 0, address) + s.Cache.LRem(ctx, key, 0, address) } func (s *Server) getMetadataIPsKey() string { - return "internal__" + s.conf.AppName + "__ips" + return "internal__" + s.Base.Config.AppName + "__ips" } -func (s *Server) gracefulShutdown() error { - s.log.Log("Server is going down... Unregistering service: %s", s.discovery.GetID()) - s.discovery.Unregister() - s.clearMetadataCache() +// +// +// +//// OLD CODE TO BE REMOVED +// func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, cache *redis.Client, ebCh *amqp.Channel) *Server { - s.ebCh.Close() - s.db.Close() - s.log.Close() +// cnf := fiber.Config{ +// AppName: conf.AppName, +// ServerHeader: conf.AppName, +// ReadTimeout: conf.ReadTimeout * time.Millisecond, +// WriteTimeout: conf.WriteTimeout * time.Millisecond, +// IdleTimeout: conf.IdleTimeout * time.Millisecond, +// } +// s := &Server{ +// fiber.New(cnf), +// conf, +// logger, +// db, +// cache, +// ebCh, +// consul, +// conf.AppName, +// conf.NetAddr, +// conf.KVNamespace, +// } - return s.Shutdown() -} +// return s +// } diff --git a/src/internal/app/service/basket.go b/src/internal/app/service/basket.go index 4c26160..12a88fb 100644 --- a/src/internal/app/service/basket.go +++ b/src/internal/app/service/basket.go @@ -15,7 +15,7 @@ import ( ) const ( - SERVICE_USER_AGENT = "basket-httpclient" + ServiceUserAgent = "basket-httpclient" ) type BasketService struct { @@ -82,7 +82,7 @@ func (s *BasketService) FetchItem(ctx context.Context, basketID string, productI func (s *BasketService) AddItem(ctx context.Context, itemID int, basketID string, qty int) error { var price float64 = 0 - pricingAPI := api.NewPricingAPI(SERVICE_USER_AGENT, s.redis) + pricingAPI := api.NewPricingAPI(ServiceUserAgent, s.redis) productPrice, err := pricingAPI.GetProductPrice(itemID) if err == nil { @@ -112,7 +112,7 @@ func (s *BasketService) RemoveItem(ctx context.Context, itemID int, basketID str func (s *BasketService) UpdateItem(ctx context.Context, item *model.BasketItemModel, qty int) error { var price float64 = 0 - pricingAPI := api.NewPricingAPI(SERVICE_USER_AGENT, s.redis) + pricingAPI := api.NewPricingAPI(ServiceUserAgent, s.redis) productPrice, err := pricingAPI.GetProductPrice(item.ProductID) if err == nil { diff --git a/src/internal/common/config.go b/src/internal/common/config.go new file mode 100644 index 0000000..ca27f70 --- /dev/null +++ b/src/internal/common/config.go @@ -0,0 +1,84 @@ +package common + +import ( + "fmt" + "os" + + cnf "git.pbiernat.dev/egommerce/basket-service/pkg/config" + srv "git.pbiernat.dev/egommerce/basket-service/pkg/server" +) + +const ( + // defAppDomain = "basket-svc" + defAppName = "basket-svc" + defCacheAddr = "api-cache:6379" + defCachePassword = "12345678" + defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce" + defEventBusURL = "amqp://guest:guest@api-eventbus:56721" + defKVNmspc = "dev.egommerce/service/basket-svc" + defLoggerAddr = "api-logger:24224" + defNetAddr = ":80" + defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017" + defPathPrefix = "/basket" + defRegistryAddr = "api-registry:8500" + defEbEventsExchange = "api-events" + defEbEventsQueue = "basket-svc" +) + +type Config struct { + Base *srv.Config + // AppID string + // AppName string + // NetAddr string + // PathPrefix string + + // IdleTimeout time.Duration `json:"idle_timeout"` // miliseconds + // ReadTimeout time.Duration `json:"read_timeout"` // miliseconds + // WriteTimeout time.Duration `json:"write_timeout"` // miliseconds + + DbURL string `json:"db_url"` + CacheAddr string `json:"cache_addr"` + CachePassword string `json:"cache_password"` + EventBusExchange string `json:"eventbus_exchange"` + EventBusQueue string `json:"eventbus_queue"` + EventBusURL string `json:"eventbus_url"` + LoggerAddr string `json:"logger_addr"` + RegistryAddr string + + // Fields with JSON mappings are available through Consul KV storage + + // Port int + // KVNamespace string + // MongoDbUrl string `json:"mongodb_url"` + // HttpReadTimeout int `json:"http_read_timeout"` + // HttpWriteTimeout int `json:"http_write_timeout"` + // HttpIdleTimeout int `json:"http_idle_timeout"` +} + +func NewConfig() *Config { + c := new(Config) + c.Base = new(srv.Config) + + c.Base.AppID, _ = os.Hostname() + c.Base.AppName = cnf.GetEnv("APP_NAME", defAppName) + c.Base.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr) + c.Base.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix) + + c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr) + c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword) + c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL) + c.EventBusExchange = defEbEventsExchange + c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL) + c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr) + c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr) + + return c +} + +func (c *Config) GetAppFullName() string { + return fmt.Sprintf("%s_%s", c.Base.AppName, c.Base.AppID) // @TODO check if Base prop can be private +} + +func (c *Config) GetListenAddr() string { + return "" // @TODO: Implement me! +} diff --git a/src/internal/app/config/env.go b/src/pkg/config/config.go similarity index 72% rename from src/internal/app/config/env.go rename to src/pkg/config/config.go index 9dbe349..67c6b43 100644 --- a/src/internal/app/config/env.go +++ b/src/pkg/config/config.go @@ -12,7 +12,7 @@ func init() { ErrLoadingEnvs = godotenv.Load() } -func GetEnv(name string, defVal string) string { // FIXME defVal and return types +func GetEnv(name string, defVal string) string { env := os.Getenv(name) if env == "" { return defVal diff --git a/src/internal/app/database/connect.go b/src/pkg/database/connect.go similarity index 100% rename from src/internal/app/database/connect.go rename to src/pkg/database/connect.go diff --git a/src/pkg/server/config.go b/src/pkg/server/config.go new file mode 100644 index 0000000..fca21d3 --- /dev/null +++ b/src/pkg/server/config.go @@ -0,0 +1,21 @@ +package server + +import ( + "fmt" + "time" +) + +type Config struct { + AppID string + AppName string + NetAddr string + PathPrefix string + + IdleTimeout time.Duration // miliseconds + ReadTimeout time.Duration // miliseconds + WriteTimeout time.Duration // miliseconds +} + +func (c *Config) GetAppFullName() string { + return fmt.Sprintf("%s_%s", c.AppName, c.AppID) +} diff --git a/src/pkg/server/server.go b/src/pkg/server/server.go new file mode 100644 index 0000000..10c7029 --- /dev/null +++ b/src/pkg/server/server.go @@ -0,0 +1,101 @@ +package server + +import ( + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/gofiber/fiber/v2" + + "git.pbiernat.dev/egommerce/api-entities/http" +) + +type ( + HeaderRequestID struct { + RequestID string `reqHeader:"x-request-id"` + } + Server struct { + *fiber.App + *Config + + addr string // e.g. "127.0.0.1:8080" + + // name string // e.g. "awesome-rest-api" + // kvNmspc string + // cache *redis.Client + // db *pgxpool.Pool + // discovery *discovery.Service + // ebCh *amqp.Channel + // log *fluentd.Logger + } + PurgeFn func(*Server) error +) + +func New(conf *Config) *Server { + return &Server{ + App: fiber.New(fiber.Config{ + AppName: conf.AppID, + ServerHeader: conf.AppName, + ReadTimeout: conf.ReadTimeout * time.Millisecond, + WriteTimeout: conf.WriteTimeout * time.Millisecond, + IdleTimeout: conf.IdleTimeout * time.Millisecond, + }), + addr: conf.NetAddr, + } +} + +func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) { + var hdr = new(HeaderRequestID) + if err := c.ReqHeaderParser(hdr); err != nil { + return "", err + } + + return hdr.RequestID, nil +} + +// @Refactor make single func with error message and optional http status code... +func (s *Server) Error400(c *fiber.Ctx, msg string) error { + return c.Status(fiber.StatusBadRequest).JSON(http.ErrorResponse{msg}) + // test with &(reference) before http.ErrorMessage, but probably it's gonna be erroneous +} + +func (s *Server) Error401(c *fiber.Ctx, msg string) error { + return c.Status(fiber.StatusUnauthorized).JSON(http.ErrorResponse{msg}) +} + +func (s *Server) Error403(c *fiber.Ctx, msg string) error { + return c.Status(fiber.StatusForbidden).JSON(http.ErrorResponse{msg}) +} + +func (s *Server) Error404(c *fiber.Ctx, msg string) error { + return c.Status(fiber.StatusNotFound).JSON(http.ErrorResponse{msg}) +} + +// @EndRefactor + +func (s *Server) Start(forever chan struct{}, prgFn PurgeFn) error { + go func() { + fmt.Println("Starting...") + sigint := make(chan os.Signal, 1) + signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + <-sigint + + fmt.Println("shutting down: after term signal.") + if err := prgFn(s); err != nil { + log.Fatalf("Failed to shutdown server. Reason: %v\n", err) + } + + close(forever) + }() + + err := s.Listen(s.addr) + <-forever + // if err := s.Listen(s.addr); err != nil { + // s.logger.Log("Failed to start server! Reason: %v\n", err) + // } + + return err +}