Huge refactoring

This commit is contained in:
Piotr Biernat 2024-07-19 21:24:31 +02:00
parent b276fa9e81
commit 0151c19e8f
13 changed files with 495 additions and 333 deletions

View File

@ -1,13 +1,13 @@
package main
import (
"fmt"
"log"
"os"
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
"git.pbiernat.io/egommerce/catalog-service/internal/app"
"git.pbiernat.io/egommerce/catalog-service/internal/config"
"git.pbiernat.io/egommerce/catalog-service/internal/server"
)
@ -16,24 +16,26 @@ func main() {
log.Panicln("Error loading .env file", cnf.ErrLoadingEnvs)
}
c := config.NewServerConfig("catalog")
srv := server.New(c)
app := app.NewApp(c, srv)
c := server.NewConfig("catalog")
cArr := c.GetArray()
app.RegisterHelper("cache", app.WithCache())
app.RegisterHelper("database", app.WithDatabase())
app.RegisterHelper("eventbus", app.WithEventbus())
app.RegisterHelper("logger", app.WithLogger())
app.RegisterHelper("registry", app.WithRegistry())
doer := server.New(c)
a := app.NewApp(doer)
a.RegisterPlugin(app.LoggerPlugin(cArr))
a.RegisterPlugin(app.CachePlugin(cArr))
a.RegisterPlugin(app.DatabasePlugin(cArr))
a.RegisterPlugin(app.EventbusPlugin(cArr))
a.RegisterPlugin(app.RegistryPlugin(cArr))
while := make(chan struct{})
err := app.Start(while)
err := a.Start(while)
<-while
if err != nil {
log.Fatalf("Failed to start application. Reason: %v\n", err)
log.Fatalf("Failed to start server. Reason: %v\n", err)
os.Exit(1)
}
fmt.Println("Gone")
os.Exit(0)
}

View File

@ -1,29 +1,33 @@
package main
import (
"fmt"
"log"
"os"
// "git.pbiernat.io/egommerce/go-api-pkg/config"
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
"git.pbiernat.io/egommerce/catalog-service/internal/app"
"git.pbiernat.io/egommerce/catalog-service/internal/worker"
)
func main() {
// if config.ErrLoadingEnvs != nil {
// log.Fatalln("Error loading .env file.")
// }
if cnf.ErrLoadingEnvs != nil {
log.Fatalln("Error loading .env file.")
}
c := cnf.NewConfig("catalog-worker")
wrk := worker.New(
c,
worker.WithCache(c),
worker.WithDatabase(c),
worker.WithEventbus(c),
worker.WithLogger(c),
)
c := worker.NewConfig("catalog-worker")
cArr := c.GetArray()
doer := worker.New(c)
a := app.NewApp(doer)
a.RegisterPlugin(app.LoggerPlugin(cArr))
a.RegisterPlugin(app.CachePlugin(cArr))
a.RegisterPlugin(app.DatabasePlugin(cArr))
a.RegisterPlugin(app.EventbusPlugin(cArr))
while := make(chan struct{})
err := wrk.Start(while)
err := a.Start(while)
<-while
if err != nil {
@ -31,5 +35,6 @@ func main() {
os.Exit(1)
}
fmt.Println("Gone")
os.Exit(0)
}

View File

@ -4,7 +4,7 @@ go 1.18
require (
git.pbiernat.io/egommerce/api-entities v0.2.3
git.pbiernat.io/egommerce/go-api-pkg v0.2.72
git.pbiernat.io/egommerce/go-api-pkg v0.2.88
github.com/georgysavva/scany/v2 v2.0.0
github.com/go-pg/migrations/v8 v8.1.0
github.com/go-pg/pg/v10 v10.11.1

View File

@ -37,8 +37,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
git.pbiernat.io/egommerce/api-entities v0.2.3 h1:mR6EYfZkAzh4teydb7KXDBWoxwVW3qasnmmH5J3mnas=
git.pbiernat.io/egommerce/api-entities v0.2.3/go.mod h1:INXAG5x4+i+vNwg1NpfPHiDW8nY1kn1K7pgLOtX+/I0=
git.pbiernat.io/egommerce/go-api-pkg v0.2.72 h1:289HdYxa5ZzKEirxZfat1i6B7jThvZwtvO3SZykFTcg=
git.pbiernat.io/egommerce/go-api-pkg v0.2.72/go.mod h1:XIy2mmvRNIzQmYIUAcDZafhRPxTQFS2HDmsK7ZQ6980=
git.pbiernat.io/egommerce/go-api-pkg v0.2.88 h1:xya/39BnFeha3Oc76ad/ppoQd6AstTGQd87Qszamr1A=
git.pbiernat.io/egommerce/go-api-pkg v0.2.88/go.mod h1:XIy2mmvRNIzQmYIUAcDZafhRPxTQFS2HDmsK7ZQ6980=
github.com/Azure/azure-sdk-for-go v44.0.0+incompatible h1:e82Yv2HNpS0kuyeCrV29OPKvEiqfs2/uJHic3/3iKdg=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=

View File

@ -6,64 +6,29 @@ import (
"os/signal"
"strconv"
"syscall"
redis "github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v5/pgxpool"
amqp "github.com/rabbitmq/amqp091-go"
"git.pbiernat.io/egommerce/go-api-pkg/consul"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
cnf "git.pbiernat.io/egommerce/catalog-service/internal/config"
srv "git.pbiernat.io/egommerce/catalog-service/internal/server"
db "git.pbiernat.io/egommerce/catalog-service/pkg/database"
)
type (
Doer interface {
Start(while chan struct{}) error
OnShutdown(*App)
Start() error
RegisterHandler(string, func() any)
OnShutdown()
}
Application interface {
Start(while chan struct{})
RegisterHelper(OptionFn) error
RegisterPlugin(PluginFn) error
Shutdown()
}
OptionFn func(*App) error
App struct {
cnf *srv.ServerConfig
doer Doer
// helpers
Cache *redis.Client
Database *pgxpool.Pool
Eventbus *amqp.Channel
Logger *fluentd.Logger
Registry *consul.Service
}
)
func NewApp(c *cnf.ServerConfig, d Doer) *App {
app := &App{
cnf: c,
func NewApp(d Doer) *App {
return &App{
doer: d,
}
// while := make(chan struct{})
// // err := srv.Base.Start(while)
// err := app.Doer.Start(while)
// <-while
// if err != nil {
// log.Fatalf("Failed to start application. Reason: %v\n", err)
// os.Exit(1)
// }
// os.Exit(0)
return app
}
func (a *App) Start(while chan struct{}) error {
@ -72,7 +37,6 @@ func (a *App) Start(while chan struct{}) error {
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sigint
a.doer.OnShutdown(a)
a.Shutdown()
close(while)
@ -81,17 +45,24 @@ func (a *App) Start(while chan struct{}) error {
run := a.createRunFile("./app.run") // FIXME path...
defer a.removeRunFile(run)
a.doer.Start(while)
err := a.doer.Start()
if err != nil {
log.Fatalf("Failed to start app. Reason: %v\n", err)
close(while)
}
<-while
return err
}
func (a *App) RegisterHelper(name string, opt OptionFn) error {
if err := opt(a); err != nil {
log.Fatalf("Failed to attach extension to the server. Err: %v\n", err)
}
func (a *App) RegisterPlugin(p Plugin) error {
a.doer.RegisterHandler(p.name, p.fn)
return nil
}
func (a *App) Shutdown() {
// ...
a.doer.OnShutdown()
}
func (a *App) createRunFile(path string) *os.File {
@ -108,108 +79,3 @@ func (a *App) createRunFile(path string) *os.File {
func (a *App) removeRunFile(f *os.File) error {
return f.Close()
}
func WithCache() OptionFn {
return func(a *App) error {
a.Cache = redis.NewClient(&redis.Options{
Addr: a.cnf.CacheAddr,
Password: a.cnf.CachePassword,
DB: 0,
})
return nil
}
}
func WithDatabase() OptionFn {
return func(a *App) error {
dbConn, err := db.Connect(a.cnf.DbURL)
if err != nil {
log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", a.cnf.DbURL, err)
os.Exit(1) // TODO: retry in background... for now treat as integraded service...
}
a.Database = dbConn
return nil
}
}
func WithEventbus() OptionFn {
return func(a *App) error {
conn, err := amqp.Dial(a.cnf.EventBusURL)
if err != nil {
log.Fatalf("Failed to connect to the Eventbus: %s. Err: %v\n", a.cnf.EventBusURL, err)
}
chn, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open new Eventbus channel. Err: %v\n", err)
}
a.Eventbus = chn
return nil
}
}
func WithLogger() OptionFn {
return func(a *App) error {
logHost, logPort, err := fluentd.ParseAddr(a.cnf.LoggerAddr)
if err != nil {
log.Fatalf("Failed to parse Fluentd address: %s. Err: %v", a.cnf.LoggerAddr, err)
}
logger, err := fluentd.NewLogger(a.cnf.Base.GetAppFullName(), logHost, logPort)
if err != nil {
log.Fatalf("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err)
}
a.Logger = logger
return nil
}
}
func WithRegistry() OptionFn {
return func(a *App) error {
port, _ := strconv.Atoi(a.cnf.Base.NetAddr[1:]) // FIXME: can be IP:PORT or :PORT
// log.Printf("Consul retrieved port: %v", port)
registry, err := consul.NewService(a.cnf.RegistryAddr, a.cnf.Base.AppID, a.cnf.Base.AppName, a.cnf.Base.RegistryDomainOverIP, a.cnf.Base.GetIP(), a.cnf.Base.AppDomain, a.cnf.Base.PathPrefix, port)
if err != nil {
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err)
}
err = registry.Register()
if err != nil {
log.Fatalf("Failed to register in the Consul service. Err: %v", err)
}
registry.RegisterHealthChecks()
// a.registerKVUpdater() // FIXME run as goroutine
a.Registry = registry
// svc, _ := registry.Connect()
// tlsCnf := svc.ServerTLSConfig()
// s.Base.App.Server().TLSConfig = tlsCnf
// fmt.Println("Podmiana configa TLS")
// defer svc.Close()
// go func() { // Consul KV updater
// ticker := time.NewTicker(time.Second * 15)
// for range ticker.C {
// fetchKVConfig(s) // FIXME: duplicated in worker
// }
// }()
// go func() { // Server metadata cache updater
// ticker := time.NewTicker(time.Second * 5)
// for range ticker.C {
// s.cacheMetadata()
// }
// }()
return nil
}
}

139
src/internal/app/plugins.go Normal file
View File

@ -0,0 +1,139 @@
package app
import (
"log"
"os"
"strconv"
redis "github.com/go-redis/redis/v8"
amqp "github.com/rabbitmq/amqp091-go"
"git.pbiernat.io/egommerce/go-api-pkg/consul"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
db "git.pbiernat.io/egommerce/catalog-service/pkg/database"
)
type (
Plugin struct {
name string
fn PluginFn
}
PluginFn func() any
)
func CachePlugin(cArr map[string]string) Plugin {
return Plugin{
name: "cache",
fn: func() any {
return redis.NewClient(&redis.Options{
Addr: cArr["cacheAddr"],
Password: cArr["cachePassword"],
DB: 0,
})
},
}
}
func DatabasePlugin(cArr map[string]string) Plugin {
return Plugin{
name: "database",
fn: func() any {
dbConn, err := db.Connect(cArr["dbURL"])
if err != nil {
log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", cArr["dbURL"], err)
os.Exit(1) // TODO: retry in background...
}
return dbConn
},
}
}
func EventbusPlugin(cArr map[string]string) Plugin {
return Plugin{
name: "eventbus",
fn: func() any {
conn, err := amqp.Dial(cArr["eventBusURL"])
if err != nil {
log.Fatalf("Failed to connect to the EventBus: %s. Err: %v\n", cArr["eventBusURL"], err)
os.Exit(1) // TODO: retry in background...
}
chn, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open new EventBus channel. Err: %v\n", err)
os.Exit(1) // TODO: retry in background...
}
return chn
},
}
}
func LoggerPlugin(cArr map[string]string) Plugin {
return Plugin{
name: "logger",
fn: func() any {
logHost, logPort, err := fluentd.ParseAddr(cArr["loggerAddr"])
if err != nil {
log.Fatalf("Failed to parse FluentD address: %s. Err: %v", cArr["loggerAddr"], err)
os.Exit(1) // TODO: retry in background...
}
logger, err := fluentd.NewLogger(cArr["appFullname"], logHost, logPort)
if err != nil {
log.Fatalf("Failed to connect to the FluentD on %s:%d. Err: %v", logHost, logPort, err)
os.Exit(1) // TODO: retry in background...
}
return logger
},
}
}
func RegistryPlugin(cArr map[string]string) Plugin {
return Plugin{
name: "registry",
fn: func() any {
port, _ := strconv.Atoi(cArr["netAddr"][1:]) // FIXME: can be IP:PORT or :PORT
// log.Printf("Consul retrieved port: %v", port)
registry, err := consul.NewService(cArr["registryAddr"], cArr["id"], cArr["name"], cArr["registryDomainOverIP"], cArr["ip"], cArr["domain"], cArr["pathPrefix"], port)
if err != nil {
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", cArr["registryAddr"], err)
os.Exit(1) // TODO: retry in background...
}
err = registry.Register()
if err != nil {
log.Fatalf("Failed to register in the Consul service. Err: %v", err)
os.Exit(1) // TODO: retry in background...
}
registry.RegisterHealthChecks()
// a.registerKVUpdater() // FIXME run as goroutine
return registry
// svc, _ := registry.Connect()
// tlsCnf := svc.ServerTLSConfig()
// s.Base.App.Server().TLSConfig = tlsCnf
// fmt.Println("Podmiana configa TLS")
// defer svc.Close()
// go func() { // Consul KV updater
// ticker := time.NewTicker(time.Second * 15)
// for range ticker.C {
// fetchKVConfig(s) // FIXME: duplicated in worker
// }
// }()
// go func() { // Server metadata cache updater
// ticker := time.NewTicker(time.Second * 5)
// for range ticker.C {
// s.cacheMetadata()
// }
// }()
},
}
}

View File

@ -13,9 +13,9 @@ import (
)
func (s *Server) GetProductListHandler(c *fiber.Ctx) error {
reqID, _ := s.Base.GetRequestID(c)
reqID, _ := s.GetRequestID(c)
req := new(def.GetProductListRequest)
catalogSrv := service.NewCatalogService(s.Database, s.Eventbus, s.Logger)
catalogSrv := service.NewCatalogService(s.GetDatabase(), s.GetEventBus(), s.GetLogger())
res, err := ui.GetProductList(catalogSrv, req.CategoryID, reqID)
if err != nil {
return err
@ -25,10 +25,10 @@ func (s *Server) GetProductListHandler(c *fiber.Ctx) error {
}
func (s *Server) AddProductToBasketHandler(c *fiber.Ctx) error {
reqID, _ := s.Base.GetRequestID(c)
reqID, _ := s.GetRequestID(c)
req := new(def.AddProductToBasketRequest)
if err := c.BodyParser(req); err != nil {
return s.Base.Error(c, 400, err.Error())
return s.Error(c, 400, err.Error())
}
basketID := prepareBasket(c)
@ -37,24 +37,24 @@ func (s *Server) AddProductToBasketHandler(c *fiber.Ctx) error {
qty = req.Quantity
}
catalogSrv := service.NewCatalogService(s.Database, s.Eventbus, s.Logger)
catalogSrv := service.NewCatalogService(s.GetDatabase(), s.GetEventBus(), s.GetLogger())
res, err := ui.AddProductToBasket(catalogSrv, req.ProductID, qty, basketID, reqID)
if err != nil {
s.Logger.Log("AddProductToBasketHandler error: ", err)
s.GetLogger().Log("AddProductToBasketHandler error: ", err)
if res.ProductID == 0 {
return s.Base.Error(c, 404, fmt.Sprintf("Product #%d not exists", req.ProductID))
return s.Error(c, 404, fmt.Sprintf("Product #%d not exists", req.ProductID))
}
return s.Base.Error(c, 400, "Failed to add product to basket")
return s.Error(c, 400, "Failed to add product to basket")
}
return c.JSON(res)
}
func (s *Server) RemoveProductFromBasketHandler(c *fiber.Ctx) error {
reqID, _ := s.Base.GetRequestID(c)
reqID, _ := s.GetRequestID(c)
req := new(def.RemoveProductFromBasketRequest)
if err := c.BodyParser(req); err != nil {
return s.Base.Error(c, 400, err.Error())
return s.Error(c, 400, err.Error())
}
basketID := prepareBasket(c)
@ -63,11 +63,11 @@ func (s *Server) RemoveProductFromBasketHandler(c *fiber.Ctx) error {
qty = req.Quantity
}
catalogSrv := service.NewCatalogService(s.Database, s.Eventbus, s.Logger)
catalogSrv := service.NewCatalogService(s.GetDatabase(), s.GetEventBus(), s.GetLogger())
res, err := ui.RemoveProductFromBasket(catalogSrv, req.ProductID, qty, basketID, reqID)
if err != nil {
s.Logger.Log("RemoveProductFromBasketHandler error: ", err)
return s.Base.Error(c, 400, "Failed to remove product from basket")
s.GetLogger().Log("RemoveProductFromBasketHandler error: ", err)
return s.Error(c, 400, "Failed to remove product from basket")
}
return c.JSON(res)

View File

@ -1,4 +1,4 @@
package config
package server
import (
"fmt"
@ -26,7 +26,7 @@ const (
defEbEventsQueue = "catalog-svc"
)
type ServerConfig struct {
type Config struct {
ID string
Name string
Domain string
@ -52,8 +52,8 @@ type ServerConfig struct {
// Fields with JSON mappings are available through Consul KV storage
}
func NewServerConfig(name string) *ServerConfig {
c := new(ServerConfig)
func NewConfig(name string) *Config {
c := new(Config)
c.ID, _ = os.Hostname()
c.Name = name
@ -74,15 +74,11 @@ func NewServerConfig(name string) *ServerConfig {
return c
}
func (c *ServerConfig) GetAppFullName() string {
func (c *Config) GetAppFullName() string {
return fmt.Sprintf("%s_%s", c.Name, c.ID)
}
// func (c *ServerConfig) GetAddressForRegistry() string {
// }
func (c *ServerConfig) GetIP() string {
func (c *Config) GetIP() string {
host, _ := os.Hostname()
ips, _ := net.LookupIP(host)
// for _, ip := range ips {
@ -91,3 +87,25 @@ func (c *ServerConfig) GetIP() string {
return ips[0].String()
}
func (c *Config) GetArray() map[string]string { // FIXME fix types etc
arr := make(map[string]string)
arr["id"] = c.ID
arr["name"] = c.Name
arr["appFullname"] = c.GetAppFullName()
arr["domain"] = c.Domain
arr["ip"] = c.GetIP()
arr["netAddr"] = c.NetAddr
arr["registryDomainOverIP"] = c.RegistryDomainOverIP
arr["pathPrefix"] = c.PathPrefix
arr["cacheAddr"] = c.CacheAddr
arr["cachePassword"] = c.CachePassword
arr["dbURL"] = c.DbURL
arr["eventBusExchange"] = c.EventBusExchange
arr["eventBusURL"] = c.EventBusURL
arr["kvNamespace"] = c.KVNamespace
arr["loggerAddr"] = c.LoggerAddr
arr["registryAddr"] = c.RegistryAddr
return arr
}

View File

@ -12,7 +12,7 @@ import (
// "github.com/gofiber/fiber/v2/middleware/cors"
func SetupMiddleware(s *Server) {
s.Base.Use(LoggingMiddleware(s.Logger))
s.Use(LoggingMiddleware(s.GetLogger()))
}
func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error {

View File

@ -1,78 +1,74 @@
package server
import (
"fmt"
"log"
"net"
"time"
"github.com/go-redis/redis/v8"
"github.com/gofiber/fiber/v2"
"github.com/jackc/pgx/v5/pgxpool"
amqp "github.com/rabbitmq/amqp091-go"
"git.pbiernat.io/egommerce/api-entities/http"
"git.pbiernat.io/egommerce/catalog-service/app"
cnf "git.pbiernat.io/egommerce/catalog-service/config"
"git.pbiernat.io/egommerce/go-api-pkg/consul"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
)
type (
Server struct {
*fiber.App
cnf *cnf.ServerConfig
ID string
addr string // e.g. "127.0.0.1:80"
ShutdownFn
handlers map[string]any
}
HeaderRequestID struct {
RequestID string `reqHeader:"x-request-id"`
}
ShutdownFn func() error
)
func New(conf *cnf.Config) *Server {
func New(c *Config) *Server {
return &Server{
ID: c.ID,
App: fiber.New(fiber.Config{
AppName: conf.ID,
ServerHeader: conf.Name + ":" + conf.ID,
ReadTimeout: conf.ReadTimeout * time.Millisecond,
WriteTimeout: conf.WriteTimeout * time.Millisecond,
IdleTimeout: conf.IdleTimeout * time.Millisecond,
AppName: c.ID,
ServerHeader: c.Name + ":" + c.ID,
ReadTimeout: c.ReadTimeout * time.Millisecond,
WriteTimeout: c.WriteTimeout * time.Millisecond,
IdleTimeout: c.IdleTimeout * time.Millisecond,
}),
cnf: conf,
addr: conf.NetAddr,
addr: c.NetAddr,
handlers: make(map[string]any),
}
}
func (s *Server) Init() {
// svr.Base.ShutdownFn = svr.Shutdown()
func (s *Server) Start() error {
SetupMiddleware(s)
SetupRouter(s)
}
func (s *Server) Start(while chan struct{}) {
// fmt.Println("[DOER] Started server at: " + s.addr)
// fmt.Printf("Starting server at: %s...\n", s.addr)
ln, _ := net.Listen("tcp", s.addr)
// ln = tls.NewListener(ln, s.App.Server().TLSConfig)
err := s.Listener(ln)
if err != nil {
log.Fatalf("Failed to start server: %s. Reason: %v\n", s.cnf.ID, err)
close(while)
}
<-while
// return err
return s.Listener(ln)
}
func (s *Server) OnShutdown(a *app.App) {
a.Logger.Log("Server %s is going down...", a.cnf.ID)
func (s *Server) RegisterHandler(name string, fn func() any) {
// fmt.Printf("Registering plugin( with handler): %s... OK\n", name)
s.handlers[name] = fn()
}
fmt.Println("Unregistering...")
a.Registry.Unregister()
func (s *Server) OnShutdown() {
// s.GetLogger().Log("Server %s is going down...", s.ID)
s.GetRegistry().Unregister()
// a.clearMetadataCache()
a.Eventbus.Close()
a.Database.Close()
a.Logger.Log("Gone.")
a.Logger.Close()
s.GetEventBus().Close()
s.GetDatabase().Close()
s.GetLogger().Log("Gone.")
s.GetLogger().Close()
s.Shutdown()
}
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
@ -88,6 +84,27 @@ func (s *Server) Error(c *fiber.Ctx, code int, msg string) error {
return c.Status(code).JSON(http.ErrorResponse{Error: msg})
}
// Plugin helper funcitons
func (s *Server) GetCache() *redis.Client {
return (s.handlers["cache"]).(*redis.Client)
}
func (s *Server) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue
return (s.handlers["database"]).(*pgxpool.Pool)
}
func (s *Server) GetEventBus() *amqp.Channel {
return (s.handlers["eventbus"]).(*amqp.Channel)
}
func (s *Server) GetLogger() *fluentd.Logger {
return (s.handlers["logger"]).(*fluentd.Logger)
}
func (s *Server) GetRegistry() *consul.Service {
return (s.handlers["registry"]).(*consul.Service)
}
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
// func (s *Server) registerKVUpdater() { // @FIXME: merge duplication in server.go and worker.go
// go func() {

View File

@ -5,8 +5,10 @@ import (
"git.pbiernat.io/egommerce/api-entities/model"
"git.pbiernat.io/egommerce/catalog-service/internal/event"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
"git.pbiernat.io/egommerce/go-api-pkg/rabbitmq"
"github.com/georgysavva/scany/v2/pgxscan"
"github.com/jackc/pgx/v5/pgxpool"
amqp "github.com/rabbitmq/amqp091-go"

View File

@ -0,0 +1,85 @@
package worker
import (
"fmt"
"net"
"os"
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
)
const (
defName = "catalog-worker"
defDomain = "catalog-worker"
defCacheAddr = "egommerce.local:6379"
defCachePassword = "12345678"
defDbURL = "postgres://postgres:12345678@db-postgres:5432/egommerce"
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
defKVNmspc = "dev.egommerce/service/catalog-worker"
defLoggerAddr = "api-logger:24224"
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
defEbEventsExchange = "api-events"
defEbEventsQueue = "catalog-svc"
)
type Config struct {
ID string
Name string
LoggerAddr string `json:"logger_addr"`
DbURL string `json:"db_url"`
CacheAddr string `json:"cache_addr"`
CachePassword string `json:"cache_password"`
MongoDbUrl string `json:"mongodb_url"`
EventBusURL string `json:"eventbus_url"`
EventBusExchange string `json:"eventbus_exchange"`
EventBusQueue string `json:"eventbus_queue"`
KVNamespace string
}
func NewConfig(name string) *Config {
c := new(Config)
c.ID, _ = os.Hostname()
c.Name = name
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
c.EventBusExchange = defEbEventsExchange
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
return c
}
func (c *Config) GetAppFullName() string {
return fmt.Sprintf("%s_%s", c.Name, c.ID)
}
func (c *Config) GetIP() string {
host, _ := os.Hostname()
ips, _ := net.LookupIP(host)
// for _, ip := range ips {
// return ip.String()
// }
return ips[0].String()
}
func (c *Config) GetArray() map[string]string { // FIXME fix types etc
arr := make(map[string]string)
arr["id"] = c.ID
arr["name"] = c.Name
arr["appFullname"] = c.GetAppFullName()
arr["cacheAddr"] = c.CacheAddr
arr["cachePassword"] = c.CachePassword
arr["dbURL"] = c.DbURL
arr["eventBusExchange"] = c.EventBusExchange
arr["eventBusURL"] = c.EventBusURL
arr["kvNamespace"] = c.KVNamespace
arr["loggerAddr"] = c.LoggerAddr
return arr
}

View File

@ -4,104 +4,131 @@ import (
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v5/pgxpool"
amqp "github.com/rabbitmq/amqp091-go"
"git.pbiernat.io/egommerce/go-api-pkg/consul"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
"git.pbiernat.io/egommerce/go-api-pkg/rabbitmq"
"git.pbiernat.io/egommerce/catalog-service/internal/event"
cnf "git.pbiernat.io/egommerce/catalog-service/internal/server"
"git.pbiernat.io/egommerce/catalog-service/internal/service"
)
type (
Worker struct {
Config *cnf.Config
Cache *redis.Client
Database *pgxpool.Pool
Eventbus *amqp.Channel
Logger *fluentd.Logger
Registry *consul.Service
ID string
cnf *Config
handlers map[string]any
services map[string]any
doWrkUntil chan struct{}
}
OptionFn func(*Worker) error // FIXME: similar in server/server.go
)
func New(c *cnf.Config, opts ...OptionFn) *Worker {
w := &Worker{
Config: c,
func New(c *Config) *Worker {
return &Worker{
ID: c.ID,
cnf: c,
handlers: make(map[string]any),
services: make(map[string]any),
doWrkUntil: make(chan struct{}),
}
for _, opt := range opts {
if err := opt(w); err != nil {
log.Fatalf("Failed to attach extension to the worker. Err: %v\n", err)
}
}
return w
}
func (w *Worker) Start(while chan struct{}) error {
go func() {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sigint
w.Shutdown()
close(while)
}()
run := w.createRunFile("./app.run") // TODO move to common library (shared between server and worker)
defer w.removeRunFile(run)
err := w.doWork()
func (w *Worker) Start() error {
// Init
err := rabbitmq.NewExchange(w.GetEventBus(), w.cnf.EventBusExchange)
if err != nil {
log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.Config.Base.AppID, err)
close(while)
}
w.GetLogger().Log("Failed to declare EventBus exchange: %v\n", err)
fmt.Printf("Failed to declare EventBus exchange: %v\n", err)
w.Logger.Log("Waiting for messages...")
return nil
}
func (w *Worker) Shutdown() error {
w.Logger.Log("Worker %s is going down...", w.Config.Base.AppID)
w.Registry.Unregister()
w.Eventbus.Close()
w.Database.Close()
w.Logger.Log("Gone.")
w.Logger.Close()
return nil
}
func (w *Worker) createRunFile(path string) *os.File {
run, err := os.Create(path)
if err != nil {
log.Fatalf("Failed to create run file. Reason: %v\n", err)
os.Exit(1)
}
run.WriteString(strconv.Itoa(os.Getpid()))
return run
_, err = w.GetEventBus().QueueDeclare(
w.cnf.EventBusQueue, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
w.GetLogger().Log("Failed to declare EventBus queue: %v\n", err)
fmt.Printf("Failed to declare EventBus queue: %v\n", err)
os.Exit(1)
}
err = w.doWork(w.doWrkUntil)
if err != nil {
log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.ID, err)
close(w.doWrkUntil)
}
<-w.doWrkUntil
return err
// go func() {
// sigint := make(chan os.Signal, 1)
// signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
// <-sigint
// w.Shutdown()
// close(while)
// }()
// run := w.createRunFile("./app.run") // TODO move to common library (shared between server and worker)
// defer w.removeRunFile(run)
// w.Logger.Log("Waiting for messages...")
// return nil
}
func (w *Worker) removeRunFile(f *os.File) error {
return f.Close()
func (w *Worker) RegisterHandler(name string, fn func() any) {
// fmt.Printf("Registering plugin( with handler): %s... OK\n", name)
w.handlers[name] = fn()
}
func (w *Worker) doWork() error {
msgs, err := w.Eventbus.Consume(
w.Config.EventBusQueue, // queue
func (w *Worker) OnShutdown() {
w.GetLogger().Log("Worker %s is going down...", w.ID)
// fmt.Printf("Worker %s is going down...\n", w.ID)
w.GetEventBus().Close()
w.GetDatabase().Close()
w.GetLogger().Log("Gone.")
w.GetLogger().Close()
close(w.doWrkUntil)
}
// Plugin helper funcitons
func (w *Worker) GetCache() *redis.Client {
return (w.handlers["cache"]).(*redis.Client)
}
func (w *Worker) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue
return (w.handlers["database"]).(*pgxpool.Pool)
}
func (w *Worker) GetEventBus() *amqp.Channel {
return (w.handlers["eventbus"]).(*amqp.Channel)
}
func (w *Worker) GetLogger() *fluentd.Logger {
return (w.handlers["logger"]).(*fluentd.Logger)
}
func (w *Worker) doWork(while chan struct{}) error {
w.services["catalog"] =
service.NewCatalogService(w.GetDatabase(), w.GetEventBus(), w.GetLogger())
cSrv := (w.services["catalog"]).(*service.CatalogService) // FIXME simplify maybe...?
msgs, err := w.GetEventBus().Consume(
w.cnf.EventBusQueue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
@ -110,27 +137,28 @@ func (w *Worker) doWork() error {
nil, // args
)
if err != nil {
w.Logger.Log("Failed to register a consumer: %s", err)
w.GetLogger().Log("Failed to register a consumer: %s", err)
fmt.Printf("Failed to register a consumer: %s", err)
os.Exit(1)
// close(while)
}
go func() {
catalogSrvc := service.NewCatalogService(w.Database, w.Eventbus, w.Logger)
for d := range msgs {
go func(d amqp.Delivery) {
w.processMsg(catalogSrvc, d)
}(d)
// go func(d amqp.Delivery) {
w.processMsg(cSrv, d)
// }(d)
}
}()
<-while
return nil
}
func (w *Worker) processMsg(srvc *service.CatalogService, d amqp.Delivery) {
func (w *Worker) processMsg(cSrv *service.CatalogService, d amqp.Delivery) {
msg, err := rabbitmq.Deserialize(d.Body)
if err != nil {
w.Logger.Log("Deserialization error: %v\n", err)
w.GetLogger().Log("Deserialization error: %v\n", err)
d.Reject(false)
return
@ -140,22 +168,22 @@ func (w *Worker) processMsg(srvc *service.CatalogService, d amqp.Delivery) {
data := (msg["data"]).(map[string]interface{})
// reqID := (data["request_id"]).(string) // FIXME Check input params!
w.Logger.Log("Processing message \"%s\" with data: %v\n", name, data)
w.GetLogger().Log("Processing message \"%s\" with data: %v\n", name, data)
var ok = false
switch true { // Refactor -> use case for polymorphism
case strings.Contains(name, event.EVENT_WAREHOUSE_STOCK_UPDATED):
w.Logger.Log("Event: %s", event.EVENT_WAREHOUSE_STOCK_UPDATED)
w.GetLogger().Log("Event: %s", event.EVENT_WAREHOUSE_STOCK_UPDATED)
}
rnr := NewCommandRunner(data, srvc)
rnr := NewCommandRunner(data, cSrv)
ok, _ = rnr.run(data)
if ok {
w.Logger.Log("Successful executed message \"%s\"\n", name)
w.GetLogger().Log("Successful executed message \"%s\"\n", name)
d.Ack(false)
return
}
w.Logger.Log("Error processing \"%s\": %s (%v)", name, err.Error(), err)
w.GetLogger().Log("Error processing \"%s\": %s (%v)", name, err.Error(), err)
d.Reject(false) // FIXME: or Nack(repeat until success - maybe message shoud know...?
}