This commit is contained in:
parent
11cc3e5f4c
commit
8fac6f7e61
@ -6,10 +6,14 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/config"
|
|
||||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
|
||||||
"github.com/go-pg/migrations/v8"
|
"github.com/go-pg/migrations/v8"
|
||||||
"github.com/go-pg/pg/v10"
|
"github.com/go-pg/pg/v10"
|
||||||
|
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||||
|
|
||||||
|
baseCnf "git.pbiernat.dev/egommerce/catalog-service/pkg/config"
|
||||||
|
|
||||||
|
cnf "git.pbiernat.dev/egommerce/catalog-service/internal/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -32,19 +36,26 @@ Usage:
|
|||||||
`
|
`
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
if config.ErrLoadingEnvs != nil {
|
if baseCnf.ErrLoadingEnvs != nil {
|
||||||
log.Panicln("Error loading .env file", config.ErrLoadingEnvs)
|
log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// dbURL := config.GetEnv("DATABASE_URL", defDbURL)
|
c := cnf.NewConfig("catalog-migrator")
|
||||||
loggerAddr := config.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
|
||||||
mTblName := config.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName)
|
|
||||||
|
|
||||||
logHost, logPort := fluentd.ParseAddr(loggerAddr)
|
// dbURL := baseCnf.GetEnv("DATABASE_URL", defDbURL)
|
||||||
logger := fluentd.NewLogger(defAppName, logHost, logPort)
|
|
||||||
defer logger.Close()
|
|
||||||
|
|
||||||
flag.Usage = usage // FIXME
|
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error parsing logger addr: %s. Err: %v", c.LoggerAddr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err)
|
||||||
|
}
|
||||||
|
// defer logger.Close()
|
||||||
|
|
||||||
|
flag.Usage = usage
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
db := pg.Connect(&pg.Options{ // FIXME
|
db := pg.Connect(&pg.Options{ // FIXME
|
||||||
@ -54,10 +65,10 @@ func main() {
|
|||||||
Database: "egommerce",
|
Database: "egommerce",
|
||||||
})
|
})
|
||||||
|
|
||||||
|
mTbl := baseCnf.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName)
|
||||||
mig := migrations.NewCollection()
|
mig := migrations.NewCollection()
|
||||||
mig.SetTableName(mTblName)
|
mig.SetTableName(mTbl)
|
||||||
err := mig.DiscoverSQLMigrations("./migrations")
|
if err := mig.DiscoverSQLMigrations("./migrations"); err != nil {
|
||||||
if err != nil {
|
|
||||||
logger.Log("migration dicovery error: %#v", err)
|
logger.Log("migration dicovery error: %#v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,102 +2,37 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
|
||||||
|
|
||||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/config"
|
baseCnf "git.pbiernat.dev/egommerce/catalog-service/pkg/config"
|
||||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/database"
|
|
||||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/server"
|
|
||||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
|
||||||
amqp "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
|
|
||||||
"github.com/go-redis/redis/v8"
|
|
||||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
cnf "git.pbiernat.dev/egommerce/catalog-service/internal/config"
|
||||||
defAppName = "catalog-svc"
|
svr "git.pbiernat.dev/egommerce/catalog-service/internal/server"
|
||||||
defAppDomain = "catalog-svc"
|
|
||||||
defPathPrefix = "/catalog"
|
|
||||||
defNetAddr = ":80"
|
|
||||||
defLoggerAddr = "api-logger:24224"
|
|
||||||
defRegistryAddr = "api-registry:8500"
|
|
||||||
// defMetricsAddr = "api-prometheus:9090"
|
|
||||||
defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce"
|
|
||||||
defCacheAddr = "api-cache:6379"
|
|
||||||
defCachePassword = "12345678"
|
|
||||||
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
|
||||||
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
|
|
||||||
ebEventsExchange = "api-events"
|
|
||||||
ebEventsQueue = "catalog-svc"
|
|
||||||
defKVNmspc = "dev.egommerce/service/catalog-svc"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
if config.ErrLoadingEnvs != nil {
|
if baseCnf.ErrLoadingEnvs != nil {
|
||||||
log.Panicln("Error loading .env file", config.ErrLoadingEnvs)
|
log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs)
|
||||||
}
|
}
|
||||||
|
|
||||||
c := new(server.Config)
|
c := cnf.NewConfig("catalog-server")
|
||||||
c.AppID, _ = os.Hostname()
|
srv := svr.New(
|
||||||
c.AppName = config.GetEnv("APP_NAME", defAppName)
|
c,
|
||||||
c.AppDomain = config.GetEnv("APP_DOMAIN", defAppDomain)
|
svr.WithCache(c),
|
||||||
c.PathPrefix = config.GetEnv("APP_PATH_PREFIX", defPathPrefix)
|
svr.WithDatabase(c),
|
||||||
c.NetAddr = config.GetEnv("SERVER_ADDR", defNetAddr)
|
svr.WithEventbus(c),
|
||||||
c.Port, _ = strconv.Atoi(c.NetAddr[1:])
|
svr.WithLogger(c),
|
||||||
c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
svr.WithRegistry(c),
|
||||||
c.RegistryAddr = config.GetEnv("REGISTRY_ADDR", defRegistryAddr)
|
)
|
||||||
c.DbURL = config.GetEnv("DATABASE_URL", defDbURL)
|
|
||||||
c.CacheAddr = config.GetEnv("CACHE_ADDR", defCacheAddr)
|
|
||||||
c.CachePassword = config.GetEnv("CACHE_PASSWORD", defCachePassword)
|
|
||||||
c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL)
|
|
||||||
c.EventBusExchange = ebEventsExchange
|
|
||||||
c.KVNamespace = config.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
|
||||||
|
|
||||||
logHost, logPort := fluentd.ParseAddr(c.LoggerAddr)
|
while := make(chan struct{})
|
||||||
logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort)
|
err := srv.Base.Start(while, srv.Shutdown())
|
||||||
defer logger.Close()
|
<-while
|
||||||
|
|
||||||
// db conn
|
|
||||||
dbConn, err := database.Connect(c.DbURL)
|
|
||||||
if err != nil { // fixme: add wait-for-db...
|
|
||||||
logger.Log("Failed to connect to Database server: %v\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
defer dbConn.Close()
|
|
||||||
|
|
||||||
// redis conn
|
|
||||||
redis := redis.NewClient(&redis.Options{
|
|
||||||
Addr: c.CacheAddr,
|
|
||||||
Password: c.CachePassword,
|
|
||||||
DB: 0,
|
|
||||||
})
|
|
||||||
defer redis.Close()
|
|
||||||
|
|
||||||
// eventbus conn
|
|
||||||
ebConn, ebCh, err := amqp.Open(c.EventBusURL)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Log("Failed to connect to EventBus server: %v\n", err)
|
log.Fatalf("Failed to start server. Reason: %v\n", err)
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
defer ebCh.Close()
|
|
||||||
defer amqp.Close(ebConn)
|
|
||||||
|
|
||||||
err = amqp.NewExchange(ebCh, c.EventBusExchange)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log("Failed to declare EventBus exchange: %v\n", err)
|
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// start server
|
os.Exit(0)
|
||||||
srv := server.NewServer(c, logger, dbConn, redis, ebCh)
|
|
||||||
|
|
||||||
// start metrics
|
|
||||||
go http.ListenAndServe(":8084", promhttp.Handler())
|
|
||||||
|
|
||||||
srvChan := make(chan struct{})
|
|
||||||
srv.StartWithGracefulShutdown(srvChan)
|
|
||||||
<-srvChan
|
|
||||||
|
|
||||||
// os.Exit(1)
|
|
||||||
}
|
}
|
||||||
|
@ -1,198 +1,30 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"strings"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/config"
|
"git.pbiernat.dev/egommerce/catalog-service/pkg/config"
|
||||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/database"
|
|
||||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/event"
|
|
||||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/server"
|
|
||||||
discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul"
|
|
||||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
|
||||||
"git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
|
|
||||||
"github.com/go-redis/redis/v8"
|
|
||||||
"github.com/streadway/amqp"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
cnf "git.pbiernat.dev/egommerce/catalog-service/internal/config"
|
||||||
defAppName = "catalog-worker"
|
"git.pbiernat.dev/egommerce/catalog-service/internal/worker"
|
||||||
defLoggerAddr = "api-logger:24224"
|
|
||||||
defRegistryAddr = "api-registry:8500"
|
|
||||||
defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce"
|
|
||||||
defCacheAddr = "api-cache:6379"
|
|
||||||
defCachePassword = "12345678"
|
|
||||||
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
|
||||||
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
|
|
||||||
ebEventsExchange = "api-events"
|
|
||||||
ebEventsQueue = "catalog-worker"
|
|
||||||
defKVNmspc = "dev.egommerce/service/catalog-worker"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
if config.ErrLoadingEnvs != nil {
|
if config.ErrLoadingEnvs != nil {
|
||||||
log.Panicln("Error loading .env file", config.ErrLoadingEnvs)
|
log.Fatalln("Error loading .env file.")
|
||||||
}
|
}
|
||||||
|
|
||||||
c := new(server.Config)
|
c := cnf.NewConfig("catalog-worker")
|
||||||
c.AppID, _ = os.Hostname()
|
wrk := worker.New(
|
||||||
c.AppName = config.GetEnv("APP_NAME", defAppName)
|
c,
|
||||||
c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
worker.WithCache(c),
|
||||||
c.RegistryAddr = config.GetEnv("REGISTRY_ADDR", defRegistryAddr)
|
worker.WithDatabase(c),
|
||||||
c.DbURL = config.GetEnv("DATABASE_URL", defDbURL)
|
worker.WithEventbus(c),
|
||||||
c.CacheAddr = config.GetEnv("CACHE_ADDR", defCacheAddr)
|
worker.WithLogger(c),
|
||||||
c.CachePassword = config.GetEnv("CACHE_PASSWORD", defCachePassword)
|
worker.WithRegistry(c),
|
||||||
c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL)
|
|
||||||
c.EventBusExchange = ebEventsExchange
|
|
||||||
c.EventBusQueue = ebEventsQueue
|
|
||||||
c.KVNamespace = config.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
|
||||||
|
|
||||||
logHost, logPort := fluentd.ParseAddr(c.LoggerAddr)
|
|
||||||
logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort)
|
|
||||||
defer logger.Close()
|
|
||||||
|
|
||||||
consul, err := discovery.NewService(c.RegistryAddr, c.AppID, c.AppName, c.AppID, "", "", 0)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log("Error connecting to %s: %v", c.RegistryAddr, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func(consul *discovery.Service) {
|
|
||||||
interval := time.Second * 30
|
|
||||||
ticker := time.NewTicker(interval)
|
|
||||||
for range ticker.C {
|
|
||||||
updateKVConfig(consul, c) // FIXME: duplicated in internal/app/server/server.go
|
|
||||||
}
|
|
||||||
}(consul)
|
|
||||||
|
|
||||||
// db conn
|
|
||||||
dbConn, err := database.Connect(c.DbURL)
|
|
||||||
if err != nil { // fixme: add wait-for-db...
|
|
||||||
logger.Log("Failed to connect to Database server: %v\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
defer dbConn.Close()
|
|
||||||
|
|
||||||
// redis conn
|
|
||||||
redis := redis.NewClient(&redis.Options{
|
|
||||||
Addr: c.CacheAddr,
|
|
||||||
Password: c.CachePassword,
|
|
||||||
DB: 0,
|
|
||||||
})
|
|
||||||
defer redis.Close()
|
|
||||||
|
|
||||||
// eventbus conn
|
|
||||||
ebConn, ebCh, err := rabbitmq.Open(c.EventBusURL)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log("Failed to connect to EventBus server: %v\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
defer ebCh.Close()
|
|
||||||
defer rabbitmq.Close(ebConn)
|
|
||||||
|
|
||||||
err = rabbitmq.NewExchange(ebCh, c.EventBusExchange)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log("Failed to declare EventBus exchange: %v\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// create and bind queues
|
|
||||||
_, err = ebCh.QueueDeclare(
|
|
||||||
c.EventBusQueue, // name
|
|
||||||
false, // durable
|
|
||||||
false, // delete when unused
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-wait
|
|
||||||
nil, // arguments
|
|
||||||
)
|
)
|
||||||
if err != nil {
|
|
||||||
logger.Log("Failed to declare EventBus queue: %v\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "warehouse.catalog.stockUpdated")
|
while := make(chan struct{})
|
||||||
// if err != nil {
|
wrk.Start(while)
|
||||||
// logger.Log("Failed to bind EventBus queue: %v\n", err)
|
<-while
|
||||||
// os.Exit(1)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// event consume
|
|
||||||
msgs, err := ebCh.Consume(
|
|
||||||
c.EventBusQueue, // queue
|
|
||||||
"", // consumer
|
|
||||||
false, // auto-ack
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-local
|
|
||||||
false, // no-wait
|
|
||||||
nil, // args
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log("Failed to register a EventBus consumer: %s", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
forever := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
sigint := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
<-sigint
|
|
||||||
|
|
||||||
logger.Log("Worker %s stopped working...\n", c.GetAppFullName())
|
|
||||||
|
|
||||||
close(forever)
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for d := range msgs {
|
|
||||||
go func(d amqp.Delivery) {
|
|
||||||
msg, err := rabbitmq.Deserialize(d.Body)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log("json error: %v\n", err)
|
|
||||||
d.Reject(false) // FIXME: how to handle erros in queue...????
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
eName := fmt.Sprintf("%s", msg["event"])
|
|
||||||
data := (msg["data"]).(map[string]interface{})
|
|
||||||
logger.Log("Message<%s>: %s\n", eName, data)
|
|
||||||
|
|
||||||
switch true {
|
|
||||||
case strings.Contains(eName, event.EVENT_WAREHOUSE_STOCK_UPDATED): // todo: demo consumer...
|
|
||||||
logger.Log("Event: %s", event.EVENT_WAREHOUSE_STOCK_UPDATED)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Log("ACK: %s", eName)
|
|
||||||
d.Ack(false)
|
|
||||||
}(d)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
logger.Log("Waiting for messages...")
|
|
||||||
<-forever
|
|
||||||
}
|
|
||||||
|
|
||||||
func updateKVConfig(s *discovery.Service, oldCnf *server.Config) error { // FIXME: duplicated in internal/app/server/server.go
|
|
||||||
data, _, err := s.KV().Get(oldCnf.KVNamespace, nil)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if data == nil {
|
|
||||||
return errors.New("empty KV config data. Skipping")
|
|
||||||
}
|
|
||||||
|
|
||||||
buf := bytes.NewBuffer(data.Value)
|
|
||||||
decoder := json.NewDecoder(buf)
|
|
||||||
if err := decoder.Decode(oldCnf); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,7 @@ go 1.18
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
git.pbiernat.dev/egommerce/api-entities v0.0.26
|
git.pbiernat.dev/egommerce/api-entities v0.0.26
|
||||||
git.pbiernat.dev/egommerce/go-api-pkg v0.0.148
|
git.pbiernat.dev/egommerce/go-api-pkg v0.0.151
|
||||||
github.com/georgysavva/scany/v2 v2.0.0
|
github.com/georgysavva/scany/v2 v2.0.0
|
||||||
github.com/go-pg/migrations/v8 v8.1.0
|
github.com/go-pg/migrations/v8 v8.1.0
|
||||||
github.com/go-pg/pg/v10 v10.10.7
|
github.com/go-pg/pg/v10 v10.10.7
|
||||||
|
@ -59,6 +59,8 @@ git.pbiernat.dev/egommerce/go-api-pkg v0.0.147 h1:wJ1D88iRnO6BHSiqtO3m7onFPPDBJ9
|
|||||||
git.pbiernat.dev/egommerce/go-api-pkg v0.0.147/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0=
|
git.pbiernat.dev/egommerce/go-api-pkg v0.0.147/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0=
|
||||||
git.pbiernat.dev/egommerce/go-api-pkg v0.0.148 h1:FyT0tfUUxMPeOEz44oYgMV13BgCU1i/TYH2NysgINws=
|
git.pbiernat.dev/egommerce/go-api-pkg v0.0.148 h1:FyT0tfUUxMPeOEz44oYgMV13BgCU1i/TYH2NysgINws=
|
||||||
git.pbiernat.dev/egommerce/go-api-pkg v0.0.148/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0=
|
git.pbiernat.dev/egommerce/go-api-pkg v0.0.148/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0=
|
||||||
|
git.pbiernat.dev/egommerce/go-api-pkg v0.0.151 h1:MKf+tka3Bhh4Zbn5cLqO6H39gsf7el/GUT8ittaIujM=
|
||||||
|
git.pbiernat.dev/egommerce/go-api-pkg v0.0.151/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0=
|
||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||||
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
|
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
|
||||||
|
@ -1,55 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
|
||||||
"github.com/gofiber/fiber/v2"
|
|
||||||
"github.com/gofiber/fiber/v2/middleware/cors"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
defaultCORS = cors.New(cors.Config{
|
|
||||||
AllowOrigins: "*",
|
|
||||||
AllowCredentials: true,
|
|
||||||
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
|
|
||||||
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
|
|
||||||
})
|
|
||||||
)
|
|
||||||
|
|
||||||
func SetupRoutes(s *Server) {
|
|
||||||
s.App.Options("*", defaultCORS)
|
|
||||||
|
|
||||||
s.App.Get("/health", s.HealthHandler)
|
|
||||||
s.App.Get("/config", s.ConfigHandler)
|
|
||||||
|
|
||||||
api := s.App.Group("/api")
|
|
||||||
v1 := api.Group("/v1")
|
|
||||||
v1.Get("/product", s.GetProductListHandler)
|
|
||||||
// v1.Get("/product/:productId", s.GetProductHandler)
|
|
||||||
v1.Post("/product", s.AddProductToBasketHandler)
|
|
||||||
v1.Delete("/product", s.RemoveProductFromBasketHandler)
|
|
||||||
}
|
|
||||||
|
|
||||||
func SetupMiddlewares(s *Server) {
|
|
||||||
s.App.Use(defaultCORS)
|
|
||||||
s.App.Use(LoggingMiddleware(s.log))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Middlewares
|
|
||||||
func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error {
|
|
||||||
return func(c *fiber.Ctx) error {
|
|
||||||
path := string(c.Request().URI().Path())
|
|
||||||
if strings.Contains(path, "/health") {
|
|
||||||
return c.Next()
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Log("Request: %s, ReqHeaders: %v, remote: %s, via: %s",
|
|
||||||
c.Request().URI().String(),
|
|
||||||
c.GetReqHeaders(),
|
|
||||||
c.Context().RemoteIP().String(),
|
|
||||||
string(c.Context().UserAgent()))
|
|
||||||
|
|
||||||
return c.Next()
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,179 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/go-redis/redis/v8"
|
|
||||||
"github.com/gofiber/fiber/v2"
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
|
||||||
"github.com/streadway/amqp"
|
|
||||||
|
|
||||||
def "git.pbiernat.dev/egommerce/api-entities/http"
|
|
||||||
discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul"
|
|
||||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Server struct {
|
|
||||||
*fiber.App
|
|
||||||
conf *Config
|
|
||||||
log *fluentd.Logger
|
|
||||||
db *pgxpool.Pool
|
|
||||||
cache *redis.Client
|
|
||||||
ebCh *amqp.Channel
|
|
||||||
discovery *discovery.Service
|
|
||||||
name string
|
|
||||||
addr string
|
|
||||||
kvNmspc string
|
|
||||||
}
|
|
||||||
|
|
||||||
type Headers struct {
|
|
||||||
RequestID string `reqHeader:"x-request-id"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, cache *redis.Client, ebCh *amqp.Channel) *Server {
|
|
||||||
consul, err := discovery.NewService(conf.RegistryAddr, conf.AppID, conf.AppName, conf.AppID, conf.AppDomain, conf.PathPrefix, conf.Port)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log("Error connecting to %s: %v", conf.RegistryAddr, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Log("Registering service with name: %s, address: %s", consul.Name, consul.Address)
|
|
||||||
err = consul.Register()
|
|
||||||
if err != nil {
|
|
||||||
logger.Log("register error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
cnf := fiber.Config{
|
|
||||||
AppName: conf.AppName,
|
|
||||||
ServerHeader: conf.AppName,
|
|
||||||
ReadTimeout: time.Millisecond * 50,
|
|
||||||
WriteTimeout: time.Millisecond * 50,
|
|
||||||
IdleTimeout: time.Millisecond * 50,
|
|
||||||
}
|
|
||||||
s := &Server{
|
|
||||||
fiber.New(cnf),
|
|
||||||
conf,
|
|
||||||
logger,
|
|
||||||
db,
|
|
||||||
cache,
|
|
||||||
ebCh,
|
|
||||||
consul,
|
|
||||||
conf.AppName,
|
|
||||||
conf.NetAddr,
|
|
||||||
conf.KVNamespace,
|
|
||||||
}
|
|
||||||
|
|
||||||
go func(s *Server) { // Consul KV config updater
|
|
||||||
interval := time.Second * 15
|
|
||||||
ticker := time.NewTicker(interval)
|
|
||||||
for range ticker.C {
|
|
||||||
s.updateKVConfig()
|
|
||||||
}
|
|
||||||
}(s)
|
|
||||||
|
|
||||||
go func(s *Server) { // Server metadata cache updater
|
|
||||||
interval := time.Second * 5
|
|
||||||
ticker := time.NewTicker(interval)
|
|
||||||
for range ticker.C {
|
|
||||||
s.cacheMetadata()
|
|
||||||
}
|
|
||||||
}(s)
|
|
||||||
|
|
||||||
SetupMiddlewares(s)
|
|
||||||
SetupRoutes(s)
|
|
||||||
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) Start() {
|
|
||||||
err := s.Listen(s.addr)
|
|
||||||
s.log.Log("Starting error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) StartWithGracefulShutdown(forever chan struct{}) {
|
|
||||||
go func() {
|
|
||||||
sigint := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
<-sigint
|
|
||||||
|
|
||||||
if err := s.gracefulShutdown(); err != nil {
|
|
||||||
s.log.Log("Server is not shutting down! Reason: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
close(forever)
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err := s.Listen(s.addr); err != nil {
|
|
||||||
s.log.Log("Server is not running! Reason: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
<-forever
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetRequestID Return current requets ID - works only when fiber context are running
|
|
||||||
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
|
|
||||||
var hdr = new(Headers)
|
|
||||||
if err := c.ReqHeaderParser(hdr); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return hdr.RequestID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) Error400(c *fiber.Ctx, msg string) error {
|
|
||||||
return c.Status(fiber.StatusBadRequest).JSON(&def.ErrorResponse{Error: msg})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) Error404(c *fiber.Ctx, msg string) error {
|
|
||||||
return c.Status(fiber.StatusNotFound).JSON(&def.ErrorResponse{Error: msg})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) updateKVConfig() { // FIXME: duplicated in cmd/worker/main.go
|
|
||||||
config, _, err := s.discovery.KV().Get(s.kvNmspc, nil)
|
|
||||||
if err != nil || config == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
kvCnf := bytes.NewBuffer(config.Value)
|
|
||||||
decoder := json.NewDecoder(kvCnf)
|
|
||||||
if err := decoder.Decode(&s.conf); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) cacheMetadata() {
|
|
||||||
ctx := context.Background()
|
|
||||||
key, address := "internal__"+s.conf.AppName+"__ips", s.conf.AppID // FIXME: key name
|
|
||||||
|
|
||||||
pos := s.cache.LPos(ctx, key, address, redis.LPosArgs{}).Val()
|
|
||||||
if pos >= 0 {
|
|
||||||
s.cache.LRem(ctx, key, 0, address)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.cache.LPush(ctx, key, address).Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) clearMetadataCache() {
|
|
||||||
ctx := context.Background()
|
|
||||||
key, address := "internal__"+s.conf.AppName+"__ips", s.conf.AppID // FIXME: key name
|
|
||||||
|
|
||||||
s.cache.LRem(ctx, key, 0, address)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) gracefulShutdown() error {
|
|
||||||
s.log.Log("Server is going down...")
|
|
||||||
s.log.Log("Unregistering service: %s", s.discovery.GetID())
|
|
||||||
s.discovery.Unregister()
|
|
||||||
s.clearMetadataCache()
|
|
||||||
|
|
||||||
s.ebCh.Close()
|
|
||||||
s.db.Close()
|
|
||||||
s.log.Close()
|
|
||||||
|
|
||||||
return s.Shutdown()
|
|
||||||
}
|
|
62
src/internal/config/config.go
Normal file
62
src/internal/config/config.go
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
|
cnf "git.pbiernat.dev/egommerce/catalog-service/pkg/config"
|
||||||
|
srv "git.pbiernat.dev/egommerce/catalog-service/pkg/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// defAppDomain = "catalog-svc"
|
||||||
|
defAppName = "catalog-svc"
|
||||||
|
defCacheAddr = "api-cache:6379"
|
||||||
|
defCachePassword = "12345678"
|
||||||
|
defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce"
|
||||||
|
defEventBusURL = "amqp://guest:guest@api-eventbus:56721"
|
||||||
|
defKVNmspc = "dev.egommerce/service/catalog"
|
||||||
|
defLoggerAddr = "api-logger:24224"
|
||||||
|
defNetAddr = ":80"
|
||||||
|
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
||||||
|
defPathPrefix = "/catalog"
|
||||||
|
defRegistryAddr = "api-registry:8500"
|
||||||
|
defEbEventsExchange = "api-events"
|
||||||
|
defEbEventsQueue = "catalog-svc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
Base *srv.Config
|
||||||
|
|
||||||
|
DbURL string `json:"db_url"`
|
||||||
|
CacheAddr string `json:"cache_addr"`
|
||||||
|
CachePassword string `json:"cache_password"`
|
||||||
|
EventBusExchange string `json:"eventbus_exchange"`
|
||||||
|
EventBusQueue string `json:"eventbus_queue"`
|
||||||
|
EventBusURL string `json:"eventbus_url"`
|
||||||
|
LoggerAddr string `json:"logger_addr"`
|
||||||
|
KVNamespace string
|
||||||
|
RegistryAddr string
|
||||||
|
|
||||||
|
// Fields with JSON mappings are available through Consul KV storage
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConfig(name string) *Config {
|
||||||
|
c := new(Config)
|
||||||
|
c.Base = new(srv.Config)
|
||||||
|
|
||||||
|
c.Base.AppID, _ = os.Hostname()
|
||||||
|
c.Base.AppName = name
|
||||||
|
c.Base.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr)
|
||||||
|
c.Base.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix)
|
||||||
|
|
||||||
|
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
|
||||||
|
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
|
||||||
|
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
|
||||||
|
c.EventBusExchange = defEbEventsExchange
|
||||||
|
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
|
||||||
|
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
||||||
|
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
||||||
|
c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr)
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
@ -1,18 +1,18 @@
|
|||||||
package event
|
package event
|
||||||
|
|
||||||
const (
|
const (
|
||||||
EVENT_PRODUCT_ADDED_TO_BASKET = "event.ProductAddedToBasketEvent"
|
EVENT_PRODUCT_ADDED_TO_BASKET = "event.ProductAddedToBasket"
|
||||||
EVENT_PRODUCT_REMOVED_FROM_BASKET = "event.ProductRemovedFromBasketEvent"
|
EVENT_PRODUCT_REMOVED_FROM_BASKET = "event.ProductRemovedFromBasket"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ProductAddedToBasketEvent struct {
|
type ProductAddedToBasket struct {
|
||||||
*Event
|
*Event
|
||||||
ProductID int `json:"product_id"`
|
ProductID int `json:"product_id"`
|
||||||
Quantity int `json:"quantity"`
|
Quantity int `json:"quantity"`
|
||||||
BasketID string `json:"basket_id"`
|
BasketID string `json:"basket_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ProductRemovedFromBasketEvent struct {
|
type ProductRemovedFromBasket struct {
|
||||||
*Event
|
*Event
|
||||||
ProductID int `json:"product_id"`
|
ProductID int `json:"product_id"`
|
||||||
Quantity int `json:"quantity"`
|
Quantity int `json:"quantity"`
|
@ -1,11 +1,13 @@
|
|||||||
package event
|
package event
|
||||||
|
|
||||||
type Event struct {
|
type Event struct {
|
||||||
|
Command string `json:"command"`
|
||||||
RequestID string `json:"request_id"`
|
RequestID string `json:"request_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEvent(reqID string) *Event {
|
func NewEvent(command, reqID string) *Event {
|
||||||
em := new(Event)
|
em := new(Event)
|
||||||
|
em.Command = command
|
||||||
em.RequestID = reqID
|
em.RequestID = reqID
|
||||||
|
|
||||||
return em
|
return em
|
@ -7,14 +7,14 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
|
||||||
def "git.pbiernat.dev/egommerce/api-entities/http"
|
def "git.pbiernat.dev/egommerce/api-entities/http"
|
||||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/service"
|
"git.pbiernat.dev/egommerce/catalog-service/internal/service"
|
||||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/ui"
|
"git.pbiernat.dev/egommerce/catalog-service/internal/ui"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) GetProductListHandler(c *fiber.Ctx) error {
|
func (s *Server) GetProductListHandler(c *fiber.Ctx) error {
|
||||||
reqID, _ := s.GetRequestID(c)
|
reqID, _ := s.Base.GetRequestID(c)
|
||||||
req := new(def.GetProductListRequest)
|
req := new(def.GetProductListRequest)
|
||||||
catalogSrv := service.NewCatalogService(s.db, s.ebCh, s.log)
|
catalogSrv := service.NewCatalogService(s.Database, s.Eventbus, s.Logger)
|
||||||
res, err := ui.GetProductList(catalogSrv, req.CategoryID, reqID)
|
res, err := ui.GetProductList(catalogSrv, req.CategoryID, reqID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -24,10 +24,10 @@ func (s *Server) GetProductListHandler(c *fiber.Ctx) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) AddProductToBasketHandler(c *fiber.Ctx) error {
|
func (s *Server) AddProductToBasketHandler(c *fiber.Ctx) error {
|
||||||
reqID, _ := s.GetRequestID(c)
|
reqID, _ := s.Base.GetRequestID(c)
|
||||||
req := new(def.AddProductToBasketRequest)
|
req := new(def.AddProductToBasketRequest)
|
||||||
if err := c.BodyParser(req); err != nil {
|
if err := c.BodyParser(req); err != nil {
|
||||||
return s.Error400(c, err.Error())
|
return s.Base.Error(c, 400, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
basketID := prepareBasket(c)
|
basketID := prepareBasket(c)
|
||||||
@ -36,24 +36,24 @@ func (s *Server) AddProductToBasketHandler(c *fiber.Ctx) error {
|
|||||||
qty = req.Quantity
|
qty = req.Quantity
|
||||||
}
|
}
|
||||||
|
|
||||||
catalogSrv := service.NewCatalogService(s.db, s.ebCh, s.log)
|
catalogSrv := service.NewCatalogService(s.Database, s.Eventbus, s.Logger)
|
||||||
res, err := ui.AddProductToBasket(catalogSrv, req.ProductID, qty, basketID, reqID)
|
res, err := ui.AddProductToBasket(catalogSrv, req.ProductID, qty, basketID, reqID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Log("AddProductToBasketHandler error: ", err)
|
s.Logger.Log("AddProductToBasketHandler error: ", err)
|
||||||
if res.ProductID == 0 {
|
if res.ProductID == 0 {
|
||||||
return s.Error404(c, fmt.Sprintf("Product #%d not exists", req.ProductID))
|
return s.Base.Error(c, 404, fmt.Sprintf("Product #%d not exists", req.ProductID))
|
||||||
}
|
}
|
||||||
return s.Error400(c, "Failed to add product to basket")
|
return s.Base.Error(c, 400, "Failed to add product to basket")
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.JSON(res)
|
return c.JSON(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) RemoveProductFromBasketHandler(c *fiber.Ctx) error {
|
func (s *Server) RemoveProductFromBasketHandler(c *fiber.Ctx) error {
|
||||||
reqID, _ := s.GetRequestID(c)
|
reqID, _ := s.Base.GetRequestID(c)
|
||||||
req := new(def.RemoveProductFromBasketRequest)
|
req := new(def.RemoveProductFromBasketRequest)
|
||||||
if err := c.BodyParser(req); err != nil {
|
if err := c.BodyParser(req); err != nil {
|
||||||
return s.Error400(c, err.Error())
|
return s.Base.Error(c, 400, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
basketID := prepareBasket(c)
|
basketID := prepareBasket(c)
|
||||||
@ -62,11 +62,11 @@ func (s *Server) RemoveProductFromBasketHandler(c *fiber.Ctx) error {
|
|||||||
qty = req.Quantity
|
qty = req.Quantity
|
||||||
}
|
}
|
||||||
|
|
||||||
catalogSrv := service.NewCatalogService(s.db, s.ebCh, s.log)
|
catalogSrv := service.NewCatalogService(s.Database, s.Eventbus, s.Logger)
|
||||||
res, err := ui.RemoveProductFromBasket(catalogSrv, req.ProductID, qty, basketID, reqID)
|
res, err := ui.RemoveProductFromBasket(catalogSrv, req.ProductID, qty, basketID, reqID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Log("RemoveProductFromBasketHandler error: ", err)
|
s.Logger.Log("RemoveProductFromBasketHandler error: ", err)
|
||||||
return s.Error400(c, "Failed to remove product from basket")
|
return s.Base.Error(c, 400, "Failed to remove product from basket")
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.JSON(res)
|
return c.JSON(res)
|
@ -12,5 +12,5 @@ func (s *Server) HealthHandler(c *fiber.Ctx) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) ConfigHandler(c *fiber.Ctx) error {
|
func (s *Server) ConfigHandler(c *fiber.Ctx) error {
|
||||||
return c.JSON(s.conf)
|
return c.JSON(s.Config)
|
||||||
}
|
}
|
33
src/internal/server/middleware.go
Normal file
33
src/internal/server/middleware.go
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/gofiber/fiber/v2"
|
||||||
|
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||||
|
)
|
||||||
|
|
||||||
|
// "github.com/gofiber/fiber/v2"
|
||||||
|
// "github.com/gofiber/fiber/v2/middleware/cors"
|
||||||
|
|
||||||
|
func SetupMiddleware(s *Server) {
|
||||||
|
s.Base.Use(defaultCORS)
|
||||||
|
s.Base.Use(LoggingMiddleware(s.Logger))
|
||||||
|
}
|
||||||
|
|
||||||
|
func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error {
|
||||||
|
return func(c *fiber.Ctx) error {
|
||||||
|
path := string(c.Request().URI().Path())
|
||||||
|
if strings.Contains(path, "/health") {
|
||||||
|
return c.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Log("Request: %s, remote: %s, via: %s",
|
||||||
|
c.Request().URI().String(),
|
||||||
|
c.Context().RemoteIP().String(),
|
||||||
|
string(c.Context().UserAgent()))
|
||||||
|
|
||||||
|
return c.Next()
|
||||||
|
}
|
||||||
|
}
|
40
src/internal/server/router.go
Normal file
40
src/internal/server/router.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/gofiber/fiber/v2"
|
||||||
|
"github.com/gofiber/fiber/v2/middleware/cors"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
defaultCORS = cors.New(cors.Config{
|
||||||
|
AllowOrigins: "*",
|
||||||
|
AllowCredentials: true,
|
||||||
|
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
|
||||||
|
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
func SetupRouter(s *Server) {
|
||||||
|
s.Base.Options("*", defaultCORS)
|
||||||
|
|
||||||
|
s.Base.Get("/health", s.HealthHandler)
|
||||||
|
s.Base.Get("/config", s.ConfigHandler)
|
||||||
|
|
||||||
|
api := s.Base.Group("/api")
|
||||||
|
v1 := api.Group("/v1")
|
||||||
|
v1.Get("/product", s.GetProductListHandler)
|
||||||
|
// v1.Get("/product/:productId", s.GetProductHandler)
|
||||||
|
v1.Post("/product", s.AddProductToBasketHandler)
|
||||||
|
v1.Delete("/product", s.RemoveProductFromBasketHandler)
|
||||||
|
}
|
||||||
|
|
||||||
|
func CORSPreflightMiddleware(c *fiber.Ctx) error {
|
||||||
|
if string(c.Request().Header.Method()) == http.MethodOptions {
|
||||||
|
c.Response().SetStatusCode(http.StatusOK)
|
||||||
|
c.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.Next()
|
||||||
|
}
|
205
src/internal/server/server.go
Normal file
205
src/internal/server/server.go
Normal file
@ -0,0 +1,205 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/v8"
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/consul"
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||||
|
|
||||||
|
db "git.pbiernat.dev/egommerce/catalog-service/pkg/database"
|
||||||
|
srv "git.pbiernat.dev/egommerce/catalog-service/pkg/server"
|
||||||
|
|
||||||
|
cnf "git.pbiernat.dev/egommerce/catalog-service/internal/config"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
Server struct {
|
||||||
|
Base *srv.Server
|
||||||
|
Config *cnf.Config
|
||||||
|
|
||||||
|
Cache *redis.Client
|
||||||
|
Database *pgxpool.Pool
|
||||||
|
Eventbus *amqp.Channel
|
||||||
|
Logger *fluentd.Logger
|
||||||
|
Registry *consul.Service
|
||||||
|
}
|
||||||
|
|
||||||
|
OptionFn func(*Server) error // FIXME: similar in worker
|
||||||
|
)
|
||||||
|
|
||||||
|
func New(c *cnf.Config, opts ...OptionFn) *Server {
|
||||||
|
svr := &Server{
|
||||||
|
Base: srv.New(c.Base),
|
||||||
|
Config: c,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
if err := opt(svr); err != nil {
|
||||||
|
log.Fatalf("Failed to attach extension to the server. Err: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SetupMiddleware(svr)
|
||||||
|
SetupRouter(svr)
|
||||||
|
|
||||||
|
return svr
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithCache(c *cnf.Config) OptionFn {
|
||||||
|
redis := redis.NewClient(&redis.Options{
|
||||||
|
Addr: c.CacheAddr,
|
||||||
|
Password: c.CachePassword,
|
||||||
|
DB: 0,
|
||||||
|
})
|
||||||
|
|
||||||
|
return func(s *Server) error {
|
||||||
|
s.Cache = redis
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithDatabase(c *cnf.Config) OptionFn {
|
||||||
|
dbConn, err := db.Connect(c.DbURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", c.DbURL, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(s *Server) error {
|
||||||
|
s.Database = dbConn
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithEventbus(c *cnf.Config) OptionFn {
|
||||||
|
conn, err := amqp.Dial(c.EventBusURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to the Eventbus: %s. Err: %v\n", c.EventBusURL, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
chn, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to open new Eventbus channel. Err: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(s *Server) error {
|
||||||
|
s.Eventbus = chn
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithLogger(c *cnf.Config) OptionFn {
|
||||||
|
return func(s *Server) error {
|
||||||
|
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Logger = logger
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithRegistry(c *cnf.Config) OptionFn {
|
||||||
|
return func(s *Server) error {
|
||||||
|
port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which will cause error
|
||||||
|
registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, c.Base.AppName, c.Base.PathPrefix, port)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = registry.Register()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to register in the Consul service. Err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Registry = registry
|
||||||
|
|
||||||
|
go func() { // Consul KV updater
|
||||||
|
ticker := time.NewTicker(time.Second * 15)
|
||||||
|
for range ticker.C {
|
||||||
|
updateKVConfig(s)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() { // Server metadata cache updater
|
||||||
|
ticker := time.NewTicker(time.Second * 5)
|
||||||
|
for range ticker.C {
|
||||||
|
s.cacheMetadata()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) Shutdown() srv.PurgeFn {
|
||||||
|
return func(srv *srv.Server) error {
|
||||||
|
s.Logger.Log("Server %s is going down...", s.Base.AppID)
|
||||||
|
|
||||||
|
s.Registry.Unregister()
|
||||||
|
s.clearMetadataCache()
|
||||||
|
s.Eventbus.Close()
|
||||||
|
s.Database.Close()
|
||||||
|
s.Logger.Log("Gone.")
|
||||||
|
s.Logger.Close()
|
||||||
|
|
||||||
|
return s.Base.Shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
|
||||||
|
func updateKVConfig(s *Server) { // @FIXME: merge duplication in server.go and worker.go
|
||||||
|
config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil)
|
||||||
|
if err != nil || config == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
kvCnf := bytes.NewBuffer(config.Value)
|
||||||
|
decoder := json.NewDecoder(kvCnf)
|
||||||
|
if err := decoder.Decode(&s.Config); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) cacheMetadata() {
|
||||||
|
ctx := context.Background()
|
||||||
|
key, address := s.getMetadataIPsKey(), s.Base.Config.AppID
|
||||||
|
|
||||||
|
pos := s.Cache.LPos(ctx, key, address, redis.LPosArgs{}).Val()
|
||||||
|
if pos >= 0 {
|
||||||
|
s.Cache.LRem(ctx, key, 0, address)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Cache.LPush(ctx, key, address).Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) clearMetadataCache() {
|
||||||
|
ctx := context.Background()
|
||||||
|
key, address := s.getMetadataIPsKey(), s.Config.Base.AppID
|
||||||
|
|
||||||
|
s.Cache.LRem(ctx, key, 0, address)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) getMetadataIPsKey() string {
|
||||||
|
return "internal__" + s.Base.Config.AppName + "__ips"
|
||||||
|
}
|
@ -4,7 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
"git.pbiernat.dev/egommerce/api-entities/model"
|
"git.pbiernat.dev/egommerce/api-entities/model"
|
||||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/event"
|
"git.pbiernat.dev/egommerce/catalog-service/internal/event"
|
||||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||||
"git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
|
"git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
|
||||||
"github.com/georgysavva/scany/v2/pgxscan"
|
"github.com/georgysavva/scany/v2/pgxscan"
|
||||||
@ -55,7 +55,12 @@ func (s *CatalogService) GetProductList(ctx context.Context, reqID string, categ
|
|||||||
func (s *CatalogService) AddProductToBasket(reqID, basketID string, productID, qty int) error {
|
func (s *CatalogService) AddProductToBasket(reqID, basketID string, productID, qty int) error {
|
||||||
s.log.Log("Adding product #%d to the basket #%s", productID, basketID)
|
s.log.Log("Adding product #%d to the basket #%s", productID, basketID)
|
||||||
|
|
||||||
msg := &event.ProductAddedToBasketEvent{Event: event.NewEvent(reqID), BasketID: basketID, ProductID: productID, Quantity: qty}
|
msg := &event.ProductAddedToBasket{
|
||||||
|
Event: event.NewEvent("AddProductToBasket", reqID),
|
||||||
|
BasketID: basketID,
|
||||||
|
ProductID: productID,
|
||||||
|
Quantity: qty,
|
||||||
|
}
|
||||||
rabbitmq.Publish(s.ebCh, "api-events", "catalog.basket.productAddedToBasket", msg)
|
rabbitmq.Publish(s.ebCh, "api-events", "catalog.basket.productAddedToBasket", msg)
|
||||||
rabbitmq.Publish(s.ebCh, "api-events", "catalog.warehouse.productAddedToBasket", msg)
|
rabbitmq.Publish(s.ebCh, "api-events", "catalog.warehouse.productAddedToBasket", msg)
|
||||||
|
|
||||||
@ -65,7 +70,12 @@ func (s *CatalogService) AddProductToBasket(reqID, basketID string, productID, q
|
|||||||
func (s *CatalogService) RemoveProductFromBasket(reqID, basketID string, productID, qty int) error {
|
func (s *CatalogService) RemoveProductFromBasket(reqID, basketID string, productID, qty int) error {
|
||||||
s.log.Log("Removed product#%s from basket#%s", productID, basketID)
|
s.log.Log("Removed product#%s from basket#%s", productID, basketID)
|
||||||
|
|
||||||
msg := &event.ProductRemovedFromBasketEvent{Event: event.NewEvent(reqID), BasketID: basketID, ProductID: productID, Quantity: qty}
|
msg := &event.ProductRemovedFromBasket{
|
||||||
|
Event: event.NewEvent("RemoveProductFromBasket", reqID),
|
||||||
|
BasketID: basketID,
|
||||||
|
ProductID: productID,
|
||||||
|
Quantity: qty,
|
||||||
|
}
|
||||||
rabbitmq.Publish(s.ebCh, "api-events", "catalog.basket.productRemovedFromBasket", msg)
|
rabbitmq.Publish(s.ebCh, "api-events", "catalog.basket.productRemovedFromBasket", msg)
|
||||||
rabbitmq.Publish(s.ebCh, "api-events", "catalog.warehouse.productRemovedFromBasket", msg)
|
rabbitmq.Publish(s.ebCh, "api-events", "catalog.warehouse.productRemovedFromBasket", msg)
|
||||||
|
|
@ -5,7 +5,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
def "git.pbiernat.dev/egommerce/api-entities/http"
|
def "git.pbiernat.dev/egommerce/api-entities/http"
|
||||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/service"
|
"git.pbiernat.dev/egommerce/catalog-service/internal/service"
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetProductList(srv *service.CatalogService, categoryID int, reqID string) ([]def.GetProductResponse, error) {
|
func GetProductList(srv *service.CatalogService, categoryID int, reqID string) ([]def.GetProductResponse, error) {
|
35
src/internal/worker/command.go
Normal file
35
src/internal/worker/command.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
package worker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.pbiernat.dev/egommerce/catalog-service/internal/service"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
StockUpdated = "event.WarehouseStockUpdatedEvent"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Command interface {
|
||||||
|
run(CommandData) (bool, any)
|
||||||
|
}
|
||||||
|
|
||||||
|
type CommandData map[string]interface{}
|
||||||
|
|
||||||
|
type CommandRunner struct {
|
||||||
|
cmd Command
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *CommandRunner) run(data CommandData) (bool, any) {
|
||||||
|
return r.cmd.run(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
type StockUpdatedCommand struct {
|
||||||
|
srvc *service.CatalogService
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *StockUpdatedCommand) run(data CommandData) (bool, any) {
|
||||||
|
// reqID := data["request_id"].(string) // FIXME Check input params!
|
||||||
|
// productID := int(data["product_id"].(float64)) // FIXME Check input params!
|
||||||
|
|
||||||
|
// stock, err := ui.StockUpdated(c.srvc, productID, reqID)
|
||||||
|
return true, nil //err == nil, basket
|
||||||
|
}
|
272
src/internal/worker/worker.go
Normal file
272
src/internal/worker/worker.go
Normal file
@ -0,0 +1,272 @@
|
|||||||
|
package worker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/v8"
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/consul"
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
|
||||||
|
|
||||||
|
"git.pbiernat.dev/egommerce/catalog-service/pkg/database"
|
||||||
|
|
||||||
|
cnf "git.pbiernat.dev/egommerce/catalog-service/internal/config"
|
||||||
|
"git.pbiernat.dev/egommerce/catalog-service/internal/event"
|
||||||
|
"git.pbiernat.dev/egommerce/catalog-service/internal/service"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
Worker struct {
|
||||||
|
Config *cnf.Config
|
||||||
|
Cache *redis.Client
|
||||||
|
Database *pgxpool.Pool
|
||||||
|
Eventbus *amqp.Channel
|
||||||
|
Logger *fluentd.Logger
|
||||||
|
Registry *consul.Service
|
||||||
|
}
|
||||||
|
OptionFn func(*Worker) error // FIXME: similar in server/server.go
|
||||||
|
)
|
||||||
|
|
||||||
|
func New(c *cnf.Config, opts ...OptionFn) *Worker {
|
||||||
|
wrk := &Worker{
|
||||||
|
Config: c,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
if err := opt(wrk); err != nil {
|
||||||
|
log.Fatalf("Failed to attach extension to the worker. Err: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return wrk
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithCache(c *cnf.Config) OptionFn {
|
||||||
|
return func(w *Worker) error {
|
||||||
|
conn := redis.NewClient(&redis.Options{
|
||||||
|
Addr: c.CacheAddr,
|
||||||
|
Password: c.CachePassword,
|
||||||
|
DB: 0,
|
||||||
|
})
|
||||||
|
|
||||||
|
w.Cache = conn
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithDatabase(c *cnf.Config) OptionFn {
|
||||||
|
return func(w *Worker) error {
|
||||||
|
conn, err := database.Connect(c.DbURL)
|
||||||
|
if err != nil {
|
||||||
|
w.Logger.Log("Failed to connect to Database server: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Database = conn
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithEventbus(c *cnf.Config) OptionFn {
|
||||||
|
return func(w *Worker) error {
|
||||||
|
_, chn, err := rabbitmq.Open(c.EventBusURL)
|
||||||
|
if err != nil {
|
||||||
|
w.Logger.Log("Failed to connect to EventBus server: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = rabbitmq.NewExchange(chn, c.EventBusExchange)
|
||||||
|
if err != nil {
|
||||||
|
w.Logger.Log("Failed to declare EventBus exchange: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = chn.QueueDeclare(
|
||||||
|
c.EventBusQueue, // name
|
||||||
|
false, // durable
|
||||||
|
false, // delete when unused
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil, // arguments
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
w.Logger.Log("Failed to declare EventBus queue: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// w.bindQueues()
|
||||||
|
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "warehouse.catalog.stockUpdated")
|
||||||
|
|
||||||
|
w.Eventbus = chn
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithLogger(c *cnf.Config) OptionFn {
|
||||||
|
return func(w *Worker) error {
|
||||||
|
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
|
||||||
|
if err != nil {
|
||||||
|
w.Logger.Log("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort)
|
||||||
|
if err != nil {
|
||||||
|
w.Logger.Log("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Logger = logger
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithRegistry(c *cnf.Config) OptionFn {
|
||||||
|
return func(w *Worker) error {
|
||||||
|
registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0)
|
||||||
|
if err != nil {
|
||||||
|
w.Logger.Log("Error connecting to %s: %v", c.RegistryAddr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Registry = registry
|
||||||
|
|
||||||
|
go func(w *Worker) { // Fetch Consul KV config and store it in app config
|
||||||
|
ticker := time.NewTicker(time.Second * 15)
|
||||||
|
for range ticker.C {
|
||||||
|
fetchKVConfig(w)
|
||||||
|
}
|
||||||
|
}(w)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
|
||||||
|
func fetchKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go
|
||||||
|
config, _, err := w.Registry.KV().Get(w.Config.KVNamespace, nil)
|
||||||
|
if err != nil || config == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
kvCnf := bytes.NewBuffer(config.Value)
|
||||||
|
decoder := json.NewDecoder(kvCnf)
|
||||||
|
if err := decoder.Decode(&w.Config); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) Start(while chan struct{}) error {
|
||||||
|
go func() {
|
||||||
|
sigint := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
<-sigint
|
||||||
|
|
||||||
|
w.Shutdown()
|
||||||
|
|
||||||
|
close(while)
|
||||||
|
}()
|
||||||
|
|
||||||
|
err := w.doWork()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.Config.Base.AppID, err)
|
||||||
|
close(while)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Logger.Log("Waiting for messages...")
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) Shutdown() error {
|
||||||
|
w.Logger.Log("Worker %s is going down...", w.Config.Base.AppID)
|
||||||
|
|
||||||
|
w.Registry.Unregister()
|
||||||
|
w.Eventbus.Close()
|
||||||
|
w.Database.Close()
|
||||||
|
w.Logger.Log("Gone.")
|
||||||
|
w.Logger.Close()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) doWork() error {
|
||||||
|
msgs, err := w.Eventbus.Consume(
|
||||||
|
w.Config.EventBusQueue, // queue
|
||||||
|
"", // consumer
|
||||||
|
false, // auto-ack
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-local
|
||||||
|
false, // no-wait
|
||||||
|
nil, // args
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
w.Logger.Log("Failed to register a consumer: %s", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
catalogSrvc := service.NewCatalogService(w.Database, w.Eventbus, w.Logger)
|
||||||
|
|
||||||
|
for d := range msgs {
|
||||||
|
go func(d amqp.Delivery) {
|
||||||
|
w.processMsg(catalogSrvc, d)
|
||||||
|
}(d)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Worker) processMsg(srvc *service.CatalogService, d amqp.Delivery) {
|
||||||
|
msg, err := rabbitmq.Deserialize(d.Body)
|
||||||
|
if err != nil {
|
||||||
|
w.Logger.Log("Deserialization error: %v\n", err)
|
||||||
|
d.Reject(false)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
rnr := &CommandRunner{}
|
||||||
|
name := fmt.Sprintf("%s", msg["event"])
|
||||||
|
data := (msg["data"]).(map[string]interface{})
|
||||||
|
// reqID := (data["request_id"]).(string) // FIXME Check input params!
|
||||||
|
|
||||||
|
w.Logger.Log("Processing message \"%s\" with data: %v\n", name, data)
|
||||||
|
|
||||||
|
var ok = false
|
||||||
|
switch true { // Refactor -> use case for polymorphism
|
||||||
|
case strings.Contains(name, event.EVENT_WAREHOUSE_STOCK_UPDATED):
|
||||||
|
w.Logger.Log("Event: %s", event.EVENT_WAREHOUSE_STOCK_UPDATED)
|
||||||
|
}
|
||||||
|
|
||||||
|
ok, _ = rnr.run(data)
|
||||||
|
if ok {
|
||||||
|
w.Logger.Log("Successful executed message \"%s\"\n", name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
w.Logger.Log("Error processing \"%s\": %s (%v)", name, err.Error(), err)
|
||||||
|
d.Reject(false) // FIXME: or Nack(repeat until success - maybe message shout know...?
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Logger.Log("Finalized processing: %s", name)
|
||||||
|
d.Ack(false)
|
||||||
|
}
|
16
src/pkg/database/connect.go
Normal file
16
src/pkg/database/connect.go
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Connect(connStr string) (*pgxpool.Pool, error) {
|
||||||
|
pool, err := pgxpool.New(context.Background(), connStr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return pool, nil
|
||||||
|
}
|
21
src/pkg/server/config.go
Normal file
21
src/pkg/server/config.go
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
AppID string
|
||||||
|
AppName string
|
||||||
|
NetAddr string
|
||||||
|
PathPrefix string
|
||||||
|
|
||||||
|
IdleTimeout time.Duration // miliseconds
|
||||||
|
ReadTimeout time.Duration // miliseconds
|
||||||
|
WriteTimeout time.Duration // miliseconds
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Config) GetAppFullName() string {
|
||||||
|
return fmt.Sprintf("%s_%s", c.AppName, c.AppID)
|
||||||
|
}
|
79
src/pkg/server/server.go
Normal file
79
src/pkg/server/server.go
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gofiber/fiber/v2"
|
||||||
|
|
||||||
|
"git.pbiernat.dev/egommerce/api-entities/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
Server struct {
|
||||||
|
*fiber.App
|
||||||
|
*Config
|
||||||
|
|
||||||
|
addr string // e.g. "127.0.0.1:8080"
|
||||||
|
// name string // e.g. "awesome-rest-api"
|
||||||
|
// kvNmspc string
|
||||||
|
}
|
||||||
|
HeaderRequestID struct {
|
||||||
|
RequestID string `reqHeader:"x-request-id"`
|
||||||
|
}
|
||||||
|
OptionFn func(*Server) error
|
||||||
|
PurgeFn func(*Server) error
|
||||||
|
)
|
||||||
|
|
||||||
|
func New(conf *Config) *Server {
|
||||||
|
return &Server{
|
||||||
|
App: fiber.New(fiber.Config{
|
||||||
|
AppName: conf.AppID,
|
||||||
|
ServerHeader: conf.AppName,
|
||||||
|
ReadTimeout: conf.ReadTimeout * time.Millisecond,
|
||||||
|
WriteTimeout: conf.WriteTimeout * time.Millisecond,
|
||||||
|
IdleTimeout: conf.IdleTimeout * time.Millisecond,
|
||||||
|
}),
|
||||||
|
Config: conf,
|
||||||
|
addr: conf.NetAddr,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) Start(while chan struct{}, prgFn PurgeFn) error {
|
||||||
|
go func() {
|
||||||
|
sigint := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
<-sigint
|
||||||
|
|
||||||
|
if err := prgFn(s); err != nil {
|
||||||
|
log.Fatalf("Failed to shutdown server. Reason: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(while)
|
||||||
|
}()
|
||||||
|
|
||||||
|
err := s.Listen(s.addr)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to start server: %s. Reason: %v\n", s.Config.AppID, err)
|
||||||
|
close(while)
|
||||||
|
}
|
||||||
|
<-while
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
|
||||||
|
var hdr = new(HeaderRequestID)
|
||||||
|
if err := c.ReqHeaderParser(hdr); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return hdr.RequestID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) Error(c *fiber.Ctx, code int, msg string) error {
|
||||||
|
return c.Status(code).JSON(http.ErrorResponse{Error: msg})
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user