Huge refactoring, resolved tigh coupling issue

This commit is contained in:
Piotr Biernat 2024-07-19 22:48:53 +02:00
parent 60e1fb5114
commit 63570fd3e6
41 changed files with 1777 additions and 701 deletions

View File

@ -1,4 +1,15 @@
SERVER_ADDR=:80 SERVER_ADDR=:80
APP_NAME=pricing-svc
APP_DOMAIN=pricing.service.ego.io
REGISTRY_USE_DOMAIN_OVER_IP=false
APP_PATH_PREFIX=/pricing
APP_KV_NAMESPACE=dev.egommerce/service/pricing-svc
LOGGER_ADDR=api-logger:24224
REGISTRY_ADDR=api-registry:8500
DATABASE_URL=postgres://postgres:12345678@postgres-db:5432/egommerce DATABASE_URL=postgres://postgres:12345678@postgres-db:5432/egommerce
CACHE_ADDR=api-cache:6379
CACHE_PASSWORD=12345678
MONGODB_URL=mongodb://mongodb:12345678@mongo-db:27017 MONGODB_URL=mongodb://mongodb:12345678@mongo-db:27017
EVENTBUS_URL=amqp://guest:guest@api-eventbus:5672 EVENTBUS_URL=amqp://guest:guest@api-eventbus:5672

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
.env
.env.*
!.env.dist
.vscode/
__debug_bin

View File

@ -5,6 +5,7 @@ ARG BIN_OUTPUT=/go/bin
ARG GO_SERVER=cmd/server/main.go ARG GO_SERVER=cmd/server/main.go
ARG GO_MIGRATE=cmd/migrate/main.go ARG GO_MIGRATE=cmd/migrate/main.go
ARG GO_WORKER=cmd/worker/main.go ARG GO_WORKER=cmd/worker/main.go
ARG GO_HEALTH=cmd/health/main.go
WORKDIR /go/src/app WORKDIR /go/src/app
COPY src ./ COPY src ./
@ -12,4 +13,5 @@ COPY src ./
RUN export CGO_ENABLED=0 ; export GOOS=linux ; export GOARCH=amd64 && \ RUN export CGO_ENABLED=0 ; export GOOS=linux ; export GOARCH=amd64 && \
go build -ldflags="-w -s" -o "$BIN_OUTPUT/server" $GO_SERVER && \ go build -ldflags="-w -s" -o "$BIN_OUTPUT/server" $GO_SERVER && \
go build -ldflags="-w -s" -o "$BIN_OUTPUT/migrate" $GO_MIGRATE && \ go build -ldflags="-w -s" -o "$BIN_OUTPUT/migrate" $GO_MIGRATE && \
go build -ldflags="-w -s" -o "$BIN_OUTPUT/worker" $GO_WORKER go build -ldflags="-w -s" -o "$BIN_OUTPUT/worker" $GO_WORKER && \
go build -ldflags="-w -s" -o "$BIN_OUTPUT/health" $GO_HEALTH

View File

@ -19,12 +19,17 @@ LABEL dev.egommerce.image.build_time=${BUILD_TIME}
WORKDIR / WORKDIR /
COPY --from=builder $BIN_OUTPUT /app COPY --from=builder $BIN_OUTPUT /app
COPY --from=builder /go/bin/migrate /bin/go_migrate COPY --from=builder /go/bin/migrate /bin/migrate
COPY --from=builder /go/bin/health /bin/health
COPY .env.dist /.env COPY .env.dist /.env
COPY ./bin /bin COPY ./bin /bin
RUN chmod 755 /bin/entrypoint.sh /bin/migrate.sh RUN chmod 755 /bin/entrypoint.sh /bin/migrate.sh
RUN apk add curl
EXPOSE 80 EXPOSE 80
CMD ["/app"]
ENTRYPOINT ["entrypoint.sh"] ENTRYPOINT ["entrypoint.sh"]
CMD ["sh", "-c", "/app"]
HEALTHCHECK --interval=5s --timeout=1s --retries=20 CMD health >/dev/null || exit 1

View File

@ -8,6 +8,9 @@ build-image-dev:
build-image-prod: build-image-prod:
- sh ${DEPLOY_DIR}/image-build.sh - sh ${DEPLOY_DIR}/image-build.sh
push-image-dev:
- sh ${DEPLOY_DIR}/image-push.sh dev
push-image-prod: push-image-prod:
- sh ${DEPLOY_DIR}/image-push.sh - sh ${DEPLOY_DIR}/image-push.sh

View File

@ -8,12 +8,15 @@ waitForService()
while [ $status != 0 ] while [ $status != 0 ]
do do
echo "[x] wating for $1..." echo "[x] wating for $1..."
sleep 1 sleep 3
wait-for-it.sh $1 -t 2 1>/dev/null 2>&1 wait-for-it.sh $1 -t 2 1>/dev/null 2>&1
status=$? status=$?
done done
} }
update-resolv # provided by stack - better approach - single copy
update-ca-certificates
waitForService "postgres-db:5432" waitForService "postgres-db:5432"
waitForService "api-eventbus:5672" waitForService "api-eventbus:5672"
waitForService "api-logger:24224" waitForService "api-logger:24224"

View File

@ -1,17 +1,17 @@
#!/usr/bin/env sh #!/usr/bin/env sh
# ensure migrate env is initialized # ensure migrate env is initialized
$(go_migrate version >/dev/null 2>&1) $(migrate version >/dev/null 2>&1)
version=$? version=$?
if [ $version != "0" ] if [ $version != "0" ]
then then
echo "Creating base table..." echo "Creating base table..."
$(go_migrate init >/dev/null 2>&1) $(migrate init >/dev/null 2>&1)
init=$? init=$?
fi fi
# check again # check again
$(go_migrate version >/dev/null 2>&1) $(migrate version >/dev/null 2>&1)
version=$? version=$?
if [ $version != "0" ] if [ $version != "0" ]
then then
@ -20,7 +20,6 @@ then
fi fi
# run migrations # run migrations
go_migrate up migrate up
echo "Done."
exit $version exit $version

View File

@ -1,5 +1,7 @@
#!/usr/bin/env sh #!/usr/bin/env sh
# Use this script to test if a given TCP host/port are available
# https://github.com/vishnubob/wait-for-it/blob/master/wait-for-it.sh
# Use this script to test if a given TCP host/port are available
set -e set -e

View File

@ -1,7 +1,7 @@
#!/bin/sh #!/bin/sh
# RUN IN REPO ROOT DIR !! # RUN IN REPO ROOT DIR !!
export IMAGE_PREFIX="git.pbiernat.dev/egommerce/pricing" export IMAGE_PREFIX="git.pbiernat.io/egommerce/pricing"
export BUILDER_IMAGE="egommerce-builder:pricing" export BUILDER_IMAGE="egommerce-builder:pricing"
export BUILD_TIME=$(date +"%Y%m%d%H%M%S") export BUILD_TIME=$(date +"%Y%m%d%H%M%S")
export SERVER_IMAGE="$IMAGE_PREFIX-svc" export SERVER_IMAGE="$IMAGE_PREFIX-svc"

View File

@ -1,13 +1,17 @@
#!/bin/sh #!/bin/sh
# RUN IN REPO ROOT DIR !! # RUN IN REPO ROOT DIR !!
export IMAGE_BASE="git.pbiernat.dev/egommerce/pricing" export IMAGE_BASE="git.pbiernat.io/egommerce/pricing"
export SERVER_IMAGE="$IMAGE_BASE-svc" export SERVER_IMAGE="$IMAGE_BASE-svc"
export WORKER_IMAGE="$IMAGE_BASE-worker" export WORKER_IMAGE="$IMAGE_BASE-worker"
TARGET=${1:-latest} TARGET=${1:-latest}
echo $DOCKER_PASSWORD | docker login git.pbiernat.dev -u $DOCKER_USERNAME --password-stdin echo $DOCKER_PASSWORD | docker login git.pbiernat.io -u $DOCKER_USERNAME --password-stdin
docker push "$SERVER_IMAGE:$TARGET" docker push "$SERVER_IMAGE:$TARGET"
docker push "$WORKER_IMAGE:$TARGET" docker push "$WORKER_IMAGE:$TARGET"
# Restart container
curl -X POST http://127.0.0.1:9001/api/webhooks/1975af39-6f45-4da7-9fe0-f0783a6c42a8
curl -X POST http://127.0.0.1:9001/api/webhooks/d473968f-e692-4ed7-b69b-008e35362007

1
src/app.run Normal file
View File

@ -0,0 +1 @@
1750554

39
src/cmd/health/main.go Normal file
View File

@ -0,0 +1,39 @@
package main
import (
"flag"
"fmt"
"os"
)
const usageText = `This program runs healthcheck on the app.
Usage:
go run cmd/health/main.go
`
func init() {
flag.Usage = func() {
fmt.Print(usageText)
flag.PrintDefaults()
os.Exit(2)
}
flag.Parse()
}
func main() {
var exitCode = 1
if isOk := healthCheck(); isOk {
exitCode = 0
}
os.Exit(exitCode)
}
func healthCheck() bool {
run, err := os.Open("/app.run")
if err != nil {
return false
}
defer run.Close()
return true
}

View File

@ -6,10 +6,13 @@ import (
"log" "log"
"os" "os"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
"git.pbiernat.dev/egommerce/pricing-service/internal/app/config"
"github.com/go-pg/migrations/v8" "github.com/go-pg/migrations/v8"
"github.com/go-pg/pg/v10" "github.com/go-pg/pg/v10"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
baseCnf "git.pbiernat.io/egommerce/go-api-pkg/config"
cnf "git.pbiernat.io/egommerce/pricing-service/internal/server"
) )
const ( const (
@ -32,32 +35,44 @@ Usage:
` `
func main() { func main() {
if config.ErrLoadingEnvs != nil { flag.Usage = func() {
log.Panicln("Error loading .env file", config.ErrLoadingEnvs) fmt.Print(usageText)
flag.PrintDefaults()
os.Exit(2)
}
flag.Parse()
if baseCnf.ErrLoadingEnvs != nil {
log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs)
} }
// dbURL := config.GetEnv("DATABASE_URL", defDbURL) c := cnf.NewConfig("pricing-migrator")
loggerAddr := config.GetEnv("LOGGER_ADDR", defLoggerAddr)
mTblName := config.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName)
logHost, logPort := fluentd.ParseAddr(loggerAddr) // dbURL := baseCnf.GetEnv("DATABASE_URL", defDbURL)
logger := fluentd.NewLogger(defAppName, logHost, logPort)
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
if err != nil {
log.Fatalf("Error parsing logger addr: %s. Err: %v", c.LoggerAddr, err)
}
logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error)
if err != nil {
log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err)
}
defer logger.Close() defer logger.Close()
flag.Usage = usage
flag.Parse()
db := pg.Connect(&pg.Options{ // FIXME db := pg.Connect(&pg.Options{ // FIXME
Addr: "postgres-db:5432", Addr: "postgres-db:5432",
User: "postgres", User: "postgres",
Password: "12345678", Password: "12345678",
Database: "egommerce", Database: "egommerce",
}) })
defer db.Close()
mTbl := baseCnf.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName)
mig := migrations.NewCollection() mig := migrations.NewCollection()
mig.SetTableName(mTblName) mig.SetTableName(mTbl)
err := mig.DiscoverSQLMigrations("./migrations") if err := mig.DiscoverSQLMigrations("./migrations"); err != nil {
if err != nil {
logger.Log("migration dicovery error: %#v", err) logger.Log("migration dicovery error: %#v", err)
} }
@ -71,10 +86,5 @@ func main() {
} else { } else {
logger.Log("version is %d\n", oldVersion) logger.Log("version is %d\n", oldVersion)
} }
} // os.Exit(0)
func usage() {
fmt.Print(usageText)
flag.PrintDefaults()
os.Exit(2)
} }

View File

@ -1,97 +1,41 @@
package main package main
import ( import (
"fmt"
"log" "log"
"os" "os"
"strconv"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd" cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
amqp "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
"git.pbiernat.dev/egommerce/pricing-service/internal/app/config"
"git.pbiernat.dev/egommerce/pricing-service/internal/app/database"
"git.pbiernat.dev/egommerce/pricing-service/internal/app/server"
"github.com/go-redis/redis/v8"
)
const ( "git.pbiernat.io/egommerce/pricing-service/internal/app"
defAppName = "pricing-svc" "git.pbiernat.io/egommerce/pricing-service/internal/server"
defAppDomain = "pricing-svc"
defPathPrefix = "/pricing"
defNetAddr = ":80"
defLoggerAddr = "api-logger:24224"
defRegistryAddr = "api-registry:8500"
defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce"
defCacheAddr = "api-cache:6379"
defCachePassword = "12345678"
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
defEventBusURL = "amqp://guest:guest@api-gateway:5672"
ebEventsExchange = "api-events"
ebEventsQueue = "pricing-svc"
defKVNmspc = "dev.egommerce/service/pricing-svc"
) )
func main() { func main() {
if config.ErrLoadingEnvs != nil { if cnf.ErrLoadingEnvs != nil {
log.Panicln("Error loading .env file", config.ErrLoadingEnvs) log.Panicln("Error loading .env file", cnf.ErrLoadingEnvs)
} }
c := new(server.Config) c := server.NewConfig("pricing")
c.AppID, _ = os.Hostname() cArr := c.GetArray()
c.AppName = config.GetEnv("APP_NAME", defAppName)
c.AppDomain = config.GetEnv("APP_DOMAIN", defAppDomain)
c.PathPrefix = config.GetEnv("APP_PATH_PREFIX", defPathPrefix)
c.NetAddr = config.GetEnv("SERVER_ADDR", defNetAddr)
c.Port, _ = strconv.Atoi(c.NetAddr[1:])
c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr)
c.RegistryAddr = config.GetEnv("REGISTRY_ADDR", defRegistryAddr)
c.DbURL = config.GetEnv("DATABASE_URL", defDbURL)
c.CacheAddr = config.GetEnv("CACHE_ADDR", defCacheAddr)
c.CachePassword = config.GetEnv("CACHE_PASSWORD", defCachePassword)
c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL)
c.EventBusExchange = ebEventsExchange
c.KVNamespace = config.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
logHost, logPort := fluentd.ParseAddr(c.LoggerAddr) doer := server.New(c)
logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) a := app.NewApp(doer)
defer logger.Close() a.RegisterPlugin(app.LoggerPlugin(cArr))
a.RegisterPlugin(app.CachePlugin(cArr))
a.RegisterPlugin(app.DatabasePlugin(cArr))
a.RegisterPlugin(app.EventbusPlugin(cArr))
a.RegisterPlugin(app.RegistryPlugin(cArr))
// db conn while := make(chan struct{})
dbConn, err := database.Connect(c.DbURL) err := a.Start(while)
if err != nil { // fixme: add wait-for-db... <-while
logger.Log("Failed to connect to Database server: %v\n", err)
os.Exit(1)
}
defer dbConn.Close()
// redis conn
redis := redis.NewClient(&redis.Options{
Addr: c.CacheAddr,
Password: c.CachePassword,
DB: 0,
})
defer redis.Close()
// eventbus conn
ebConn, ebCh, err := amqp.Open(c.EventBusURL)
if err != nil { if err != nil {
logger.Log("Failed to connect to EventBus server: %v\n", err) log.Fatalf("Failed to start server. Reason: %v\n", err)
os.Exit(1)
}
defer ebCh.Close()
defer amqp.Close(ebConn)
err = amqp.NewExchange(ebCh, c.EventBusExchange)
if err != nil {
logger.Log("Failed to declare EventBus exchange: %v\n", err)
os.Exit(1) os.Exit(1)
} }
// start server fmt.Println("Gone")
srv := server.NewServer(c, logger, dbConn, redis, ebCh) os.Exit(0)
forever := make(chan struct{})
srv.StartWithGracefulShutdown(forever)
<-forever
// os.Exit(1)
} }

View File

@ -1,187 +1,40 @@
package main package main
import ( import (
"bytes"
"encoding/json"
"errors"
"fmt" "fmt"
"log" "log"
"os" "os"
"os/signal"
"syscall"
"time"
discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul" cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
"git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
"git.pbiernat.dev/egommerce/pricing-service/internal/app/config"
"git.pbiernat.dev/egommerce/pricing-service/internal/app/database"
"git.pbiernat.dev/egommerce/pricing-service/internal/app/server"
"github.com/go-redis/redis/v8"
"github.com/streadway/amqp"
)
const ( "git.pbiernat.io/egommerce/pricing-service/internal/app"
defAppName = "pricing-worker" "git.pbiernat.io/egommerce/pricing-service/internal/worker"
defLoggerAddr = "api-logger:24224"
defRegistryAddr = "api-registry:8500"
defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce"
defCacheAddr = "api-cache:6379"
defCachePassword = "12345678"
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
defEventBusURL = "amqp://guest:guest@api-gateway:5672"
ebEventsExchange = "api-events"
ebEventsQueue = "pricing-worker"
defKVNmspc = "dev.egommerce/service/pricing-worker"
) )
func main() { func main() {
if config.ErrLoadingEnvs != nil { if cnf.ErrLoadingEnvs != nil {
log.Panicln("Error loading .env file", config.ErrLoadingEnvs) log.Fatalln("Error loading .env file.")
} }
c := new(server.Config) c := worker.NewConfig("pricing-worker")
c.AppID, _ = os.Hostname() cArr := c.GetArray()
c.AppName = config.GetEnv("APP_NAME", defAppName)
c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr)
c.RegistryAddr = config.GetEnv("REGISTRY_ADDR", defRegistryAddr)
c.DbURL = config.GetEnv("DATABASE_URL", defDbURL)
c.CacheAddr = config.GetEnv("CACHE_ADDR", defCacheAddr)
c.CachePassword = config.GetEnv("CACHE_PASSWORD", defCachePassword)
c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL)
c.EventBusExchange = ebEventsExchange
c.EventBusQueue = ebEventsQueue
c.KVNamespace = config.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
logHost, logPort := fluentd.ParseAddr(c.LoggerAddr) doer := worker.New(c)
logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) a := app.NewApp(doer)
defer logger.Close() a.RegisterPlugin(app.LoggerPlugin(cArr))
a.RegisterPlugin(app.CachePlugin(cArr))
a.RegisterPlugin(app.DatabasePlugin(cArr))
a.RegisterPlugin(app.EventbusPlugin(cArr))
while := make(chan struct{})
err := a.Start(while)
<-while
consul, err := discovery.NewService(c.RegistryAddr, c.AppID, c.AppName, c.AppID, "", "", 0)
if err != nil { if err != nil {
logger.Log("Error connecting to %s: %v", c.RegistryAddr, err) log.Fatalf("Failed to start worker. Reason: %v\n", err)
}
go func(consul *discovery.Service) {
interval := time.Second * 3
ticker := time.NewTicker(interval)
for range ticker.C {
updateKVConfig(consul, c) // FIXME: duplicated in internal/app/server/server.go
}
}(consul)
// db conn
dbConn, err := database.Connect(c.DbURL)
if err != nil { // fixme: add wait-for-db...
logger.Log("Failed to connect to Database server: %v\n", err)
os.Exit(1)
}
defer dbConn.Close()
// redis conn
redis := redis.NewClient(&redis.Options{
Addr: c.CacheAddr,
Password: c.CachePassword,
DB: 0,
})
defer redis.Close()
// eventbus conn
ebConn, ebCh, err := rabbitmq.Open(c.EventBusURL)
if err != nil {
logger.Log("Failed to connect to EventBus server: %v\n", err)
os.Exit(1)
}
defer ebCh.Close()
defer rabbitmq.Close(ebConn)
err = rabbitmq.NewExchange(ebCh, c.EventBusExchange)
if err != nil {
logger.Log("Failed to declare EventBus exchange: %v\n", err)
os.Exit(1) os.Exit(1)
} }
// create and bind queues fmt.Println("Gone")
_, err = ebCh.QueueDeclare( os.Exit(0)
c.EventBusQueue, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
logger.Log("Failed to declare EventBus queue: %v\n", err)
os.Exit(1)
}
rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "basket.pricing.*")
// event consume
msgs, err := ebCh.Consume(
c.EventBusQueue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
logger.Log("Failed to register a consumer: %s", err)
os.Exit(1)
}
forever := make(chan struct{})
go func() {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sigint
logger.Log("Worker %s stopped working...\n", c.GetAppFullName())
close(forever)
}()
go func() {
for d := range msgs {
go func(d amqp.Delivery) {
msg, err := rabbitmq.Deserialize(d.Body)
if err != nil {
logger.Log("json error: %v\n", err)
d.Reject(false) // FIXME: how to handle erros in queue...????
return
}
eName := fmt.Sprintf("%s", msg["event"])
data := (msg["data"]).(map[string]interface{})
logger.Log("Message<%s>: %s\n", eName, data)
logger.Log("ACK: %s", eName)
d.Ack(false)
}(d)
}
}()
logger.Log("Waiting for messages...")
<-forever
}
func updateKVConfig(s *discovery.Service, oldCnf *server.Config) error { // FIXME: duplicated in internal/app/server/server.go
data, _, err := s.KV().Get(oldCnf.KVNamespace, nil)
if err != nil {
return err
}
if data == nil {
return errors.New("empty KV config data")
}
buf := bytes.NewBuffer(data.Value)
decoder := json.NewDecoder(buf)
if err := decoder.Decode(oldCnf); err != nil {
return err
}
return nil
} }

View File

@ -1,62 +1,116 @@
module git.pbiernat.dev/egommerce/pricing-service module git.pbiernat.io/egommerce/pricing-service
go 1.18 go 1.18
require ( require (
git.pbiernat.dev/egommerce/api-entities v0.0.26 git.pbiernat.io/egommerce/api-entities v0.2.3
git.pbiernat.dev/egommerce/go-api-pkg v0.0.136 git.pbiernat.io/egommerce/go-api-pkg v0.2.88
github.com/georgysavva/scany/v2 v2.0.0 github.com/georgysavva/scany/v2 v2.0.0
github.com/go-pg/migrations/v8 v8.1.0 github.com/go-pg/migrations/v8 v8.1.0
github.com/go-pg/pg/v10 v10.10.7 github.com/go-pg/pg/v10 v10.10.7
github.com/go-redis/redis/v8 v8.11.5 github.com/go-redis/redis/v8 v8.11.5
github.com/gofiber/fiber/v2 v2.40.1 github.com/gofiber/fiber/v2 v2.48.0
github.com/jackc/pgx/v5 v5.1.1 github.com/jackc/pgx/v5 v5.1.1
github.com/joho/godotenv v1.4.0 github.com/rabbitmq/amqp091-go v1.10.0
github.com/streadway/amqp v1.0.0
) )
require ( require (
github.com/andybalholm/brotli v1.0.4 // indirect github.com/DataDog/datadog-go v3.2.0+incompatible // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/armon/go-metrics v0.4.1 // indirect github.com/armon/go-metrics v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/armon/go-radix v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.42.34 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible // indirect
github.com/circonus-labs/circonusllhist v0.1.3 // indirect
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fatih/color v1.13.0 // indirect github.com/envoyproxy/go-control-plane v0.11.0 // indirect
github.com/envoyproxy/protoc-gen-validate v0.10.0 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/fluent/fluent-logger-golang v1.9.0 // indirect github.com/fluent/fluent-logger-golang v1.9.0 // indirect
github.com/go-pg/zerochecker v0.2.0 // indirect github.com/go-pg/zerochecker v0.2.0 // indirect
github.com/hashicorp/consul/api v1.18.0 // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/consul v1.16.0 // indirect
github.com/hashicorp/consul-net-rpc v0.0.0-20221205195236-156cfab66a69 // indirect
github.com/hashicorp/consul/api v1.22.0 // indirect
github.com/hashicorp/consul/envoyextensions v0.3.0 // indirect
github.com/hashicorp/consul/sdk v0.14.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-bexpr v0.1.2 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.3.1 // indirect github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-retryablehttp v0.6.7 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/go-syslog v1.0.0 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/go-version v1.2.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/memberlist v0.5.0 // indirect
github.com/hashicorp/raft v1.5.0 // indirect
github.com/hashicorp/raft-autopilot v0.1.6 // indirect
github.com/hashicorp/serf v0.10.1 // indirect github.com/hashicorp/serf v0.10.1 // indirect
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.13.0 // indirect github.com/jackc/pgtype v1.14.3 // indirect
github.com/jackc/puddle/v2 v2.1.2 // indirect github.com/jackc/puddle/v2 v2.1.2 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect
github.com/klauspost/compress v1.15.9 // indirect github.com/joho/godotenv v1.5.1 // indirect
github.com/klauspost/compress v1.16.3 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.41 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/go-testing-interface v1.14.0 // indirect
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/philhofer/fwd v1.1.1 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/philhofer/fwd v1.1.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect
github.com/tinylib/msgp v1.1.6 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.3 // indirect
github.com/tinylib/msgp v1.1.8 // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.41.0 // indirect github.com/valyala/fasthttp v1.48.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect
github.com/vmihailenco/bufpool v0.1.11 // indirect github.com/vmihailenco/bufpool v0.1.11 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect
github.com/vmihailenco/tagparser v0.1.2 // indirect github.com/vmihailenco/tagparser v0.1.2 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.uber.org/atomic v1.10.0 // indirect go.uber.org/atomic v1.10.0 // indirect
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect golang.org/x/crypto v0.20.0 // indirect
golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 // indirect golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/sys v0.2.0 // indirect golang.org/x/net v0.21.0 // indirect
golang.org/x/text v0.3.8 // indirect golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
mellium.im/sasl v0.2.1 // indirect mellium.im/sasl v0.2.1 // indirect
) )

File diff suppressed because it is too large Load Diff

81
src/internal/app/app.go Normal file
View File

@ -0,0 +1,81 @@
package app
import (
"log"
"os"
"os/signal"
"strconv"
"syscall"
)
type (
Doer interface {
Start() error
RegisterHandler(string, func() any)
OnShutdown()
}
Application interface {
Start(while chan struct{})
RegisterPlugin(PluginFn) error
Shutdown()
}
App struct {
doer Doer
}
)
func NewApp(d Doer) *App {
return &App{
doer: d,
}
}
func (a *App) Start(while chan struct{}) error {
go func() {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sigint
a.Shutdown()
close(while)
}()
run := a.createRunFile("./app.run") // FIXME path...
defer a.removeRunFile(run)
err := a.doer.Start()
if err != nil {
log.Fatalf("Failed to start app. Reason: %v\n", err)
close(while)
}
<-while
return err
}
func (a *App) RegisterPlugin(p Plugin) error {
a.doer.RegisterHandler(p.name, p.fn)
return nil
}
func (a *App) Shutdown() {
a.doer.OnShutdown()
}
func (a *App) createRunFile(path string) *os.File {
run, err := os.Create(path)
if err != nil {
log.Fatalf("Failed to create run file. Reason: %v\n", err)
os.Exit(1)
}
run.WriteString(strconv.Itoa(os.Getpid()))
return run
}
func (a *App) removeRunFile(f *os.File) error {
return f.Close()
}

View File

@ -1,22 +0,0 @@
package config
import (
"os"
"github.com/joho/godotenv"
)
var ErrLoadingEnvs error
func init() {
ErrLoadingEnvs = godotenv.Load()
}
func GetEnv(name string, defVal string) string { // FIXME defVal and return types
env := os.Getenv(name)
if env == "" {
return defVal
}
return env
}

139
src/internal/app/plugins.go Normal file
View File

@ -0,0 +1,139 @@
package app
import (
"log"
"os"
"strconv"
redis "github.com/go-redis/redis/v8"
amqp "github.com/rabbitmq/amqp091-go"
"git.pbiernat.io/egommerce/go-api-pkg/consul"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
db "git.pbiernat.io/egommerce/pricing-service/pkg/database"
)
type (
Plugin struct {
name string
fn PluginFn
}
PluginFn func() any
)
func CachePlugin(cArr map[string]string) Plugin {
return Plugin{
name: "cache",
fn: func() any {
return redis.NewClient(&redis.Options{
Addr: cArr["cacheAddr"],
Password: cArr["cachePassword"],
DB: 0,
})
},
}
}
func DatabasePlugin(cArr map[string]string) Plugin {
return Plugin{
name: "database",
fn: func() any {
dbConn, err := db.Connect(cArr["dbURL"])
if err != nil {
log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", cArr["dbURL"], err)
os.Exit(1) // TODO: retry in background...
}
return dbConn
},
}
}
func EventbusPlugin(cArr map[string]string) Plugin {
return Plugin{
name: "eventbus",
fn: func() any {
conn, err := amqp.Dial(cArr["eventBusURL"])
if err != nil {
log.Fatalf("Failed to connect to the EventBus: %s. Err: %v\n", cArr["eventBusURL"], err)
os.Exit(1) // TODO: retry in background...
}
chn, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open new EventBus channel. Err: %v\n", err)
os.Exit(1) // TODO: retry in background...
}
return chn
},
}
}
func LoggerPlugin(cArr map[string]string) Plugin {
return Plugin{
name: "logger",
fn: func() any {
logHost, logPort, err := fluentd.ParseAddr(cArr["loggerAddr"])
if err != nil {
log.Fatalf("Failed to parse FluentD address: %s. Err: %v", cArr["loggerAddr"], err)
os.Exit(1) // TODO: retry in background...
}
logger, err := fluentd.NewLogger(cArr["appFullname"], logHost, logPort)
if err != nil {
log.Fatalf("Failed to connect to the FluentD on %s:%d. Err: %v", logHost, logPort, err)
os.Exit(1) // TODO: retry in background...
}
return logger
},
}
}
func RegistryPlugin(cArr map[string]string) Plugin {
return Plugin{
name: "registry",
fn: func() any {
port, _ := strconv.Atoi(cArr["netAddr"][1:]) // FIXME: can be IP:PORT or :PORT
// log.Printf("Consul retrieved port: %v", port)
registry, err := consul.NewService(cArr["registryAddr"], cArr["id"], cArr["name"], cArr["registryDomainOverIP"], cArr["ip"], cArr["domain"], cArr["pathPrefix"], port)
if err != nil {
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", cArr["registryAddr"], err)
os.Exit(1) // TODO: retry in background...
}
err = registry.Register()
if err != nil {
log.Fatalf("Failed to register in the Consul service. Err: %v", err)
os.Exit(1) // TODO: retry in background...
}
registry.RegisterHealthChecks()
// a.registerKVUpdater() // FIXME run as goroutine
return registry
// svc, _ := registry.Connect()
// tlsCnf := svc.ServerTLSConfig()
// s.Base.App.Server().TLSConfig = tlsCnf
// fmt.Println("Podmiana configa TLS")
// defer svc.Close()
// go func() { // Consul KV updater
// ticker := time.NewTicker(time.Second * 15)
// for range ticker.C {
// fetchKVConfig(s) // FIXME: duplicated in worker
// }
// }()
// go func() { // Server metadata cache updater
// ticker := time.NewTicker(time.Second * 5)
// for range ticker.C {
// s.cacheMetadata()
// }
// }()
},
}
}

View File

@ -1,31 +0,0 @@
package server
import "fmt"
type Config struct {
AppID string
AppName string
AppDomain string
PathPrefix string
NetAddr string
Port int
RegistryAddr string
KVNamespace string
LoggerAddr string `json:"logger_addr"`
DbURL string `json:"db_url"`
CacheAddr string `json:"cache_addr"`
CachePassword string `json:"cache_password"`
MongoDbUrl string `json:"mongodb_url"`
EventBusURL string `json:"eventbus_url"`
EventBusExchange string `json:"eventbus_exchange"`
EventBusQueue string `json:"eventbus_queue"`
HttpReadTimeout int `json:"http_read_timeout"`
HttpWriteTimeout int `json:"http_write_timeout"`
HttpIdleTimeout int `json:"http_idle_timeout"`
// Fields with json mapping are available trough ConsulKV
}
func (c *Config) GetAppFullName() string {
return fmt.Sprintf("%s_%s", c.AppName, c.AppID)
}

View File

@ -1,25 +0,0 @@
package server
import (
"strconv"
"git.pbiernat.dev/egommerce/pricing-service/internal/app/service"
"git.pbiernat.dev/egommerce/pricing-service/internal/app/ui"
"github.com/gofiber/fiber/v2"
)
func (s *Server) GetProductPriceHandler(c *fiber.Ctx) error {
prodID, err := strconv.Atoi(c.Params("id"))
if err != nil {
return s.Error400(c, err.Error())
}
priceSrv := service.NewPriceService(s.db, s.cache, s.ebCh, s.log)
res, err := ui.GetProductPrice(priceSrv, prodID)
if err != nil {
s.log.Log("GetProductPriceHandler error: ", err)
return s.Error404(c, "Product not found")
}
return c.JSON(res)
}

View File

@ -1,51 +0,0 @@
package server
import (
"strings"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
)
var (
defaultCORS = cors.New(cors.Config{
AllowOrigins: "*",
AllowCredentials: true,
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
})
)
func SetupRoutes(s *Server) {
s.App.Options("*", defaultCORS)
s.App.Get("/health", s.HealthHandler)
s.App.Get("/config", s.ConfigHandler)
api := s.App.Group("/api")
v1 := api.Group("/v1")
v1.Get("/product/:id<int>", s.GetProductPriceHandler)
}
func SetupMiddlewares(s *Server) {
s.App.Use(defaultCORS)
s.App.Use(LoggingMiddleware(s.log))
}
// Middlewares
func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error {
return func(c *fiber.Ctx) error {
path := string(c.Request().URI().Path())
if strings.Contains(path, "/health") {
return c.Next()
}
log.Log("Request: %s, remote: %s, via: %s",
c.Request().URI().String(),
c.Context().RemoteIP().String(),
string(c.Context().UserAgent()))
return c.Next()
}
}

View File

@ -1,178 +0,0 @@
package server
import (
"bytes"
"context"
"encoding/json"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-redis/redis/v8"
"github.com/gofiber/fiber/v2"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/streadway/amqp"
def "git.pbiernat.dev/egommerce/api-entities/http"
discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
)
type Server struct {
*fiber.App
conf *Config
log *fluentd.Logger
db *pgxpool.Pool
cache *redis.Client
ebCh *amqp.Channel
discovery *discovery.Service
name string
addr string
kvNmspc string
}
type Headers struct {
RequestID string `reqHeader:"x-request-id"`
}
func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, cache *redis.Client, ebCh *amqp.Channel) *Server {
consul, err := discovery.NewService(conf.RegistryAddr, conf.AppID, conf.AppName, conf.AppID, conf.AppDomain, conf.PathPrefix, conf.Port)
if err != nil {
logger.Log("Error connecting to %s: %v", conf.RegistryAddr, err)
}
logger.Log("Registering service with name: %s, address: %s", consul.Name, consul.Address)
err = consul.Register()
if err != nil {
logger.Log("register error: %v", err)
}
cnf := fiber.Config{
AppName: conf.AppName,
ServerHeader: conf.AppName,
ReadTimeout: time.Millisecond * 50,
WriteTimeout: time.Millisecond * 50,
IdleTimeout: time.Millisecond * 50,
}
s := &Server{
fiber.New(cnf),
conf,
logger,
db,
cache,
ebCh,
consul,
conf.AppName,
conf.NetAddr,
conf.KVNamespace,
}
go func(s *Server) { // Consul KV config updater
interval := time.Second * 15
ticker := time.NewTicker(interval)
for range ticker.C {
s.updateKVConfig()
}
}(s)
go func(s *Server) { // Server metadata cache updater
interval := time.Second * 5
ticker := time.NewTicker(interval)
for range ticker.C {
s.cacheMetadata()
}
}(s)
SetupMiddlewares(s)
SetupRoutes(s)
return s
}
func (s *Server) Start() {
err := s.Listen(s.addr)
s.log.Log("Starting error: %v", err)
}
func (s *Server) StartWithGracefulShutdown(forever chan struct{}) {
go func() {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sigint
if err := s.gracefulShutdown(); err != nil {
s.log.Log("Server is not shutting down! Reason: %v", err)
}
close(forever)
}()
if err := s.Listen(s.addr); err != nil {
s.log.Log("Server is not running! Reason: %v", err)
}
<-forever
}
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
var hdr = new(Headers)
if err := c.ReqHeaderParser(hdr); err != nil {
return "", err
}
return hdr.RequestID, nil
}
func (s *Server) Error400(c *fiber.Ctx, msg string) error {
return c.Status(fiber.StatusBadRequest).JSON(&def.ErrorResponse{Error: msg})
}
func (s *Server) Error404(c *fiber.Ctx, msg string) error {
return c.Status(fiber.StatusNotFound).JSON(&def.ErrorResponse{Error: "Product not found."})
}
func (s *Server) updateKVConfig() { // FIXME: duplicated in cmd/worker/main.go
config, _, err := s.discovery.KV().Get(s.kvNmspc, nil)
if err != nil || config == nil {
return
}
kvCnf := bytes.NewBuffer(config.Value)
decoder := json.NewDecoder(kvCnf)
if err := decoder.Decode(&s.conf); err != nil {
return
}
}
func (s *Server) cacheMetadata() {
ctx := context.Background()
key, address := "internal__"+s.conf.AppName+"__ips", s.conf.AppID // FIXME: key name
pos := s.cache.LPos(ctx, key, address, redis.LPosArgs{}).Val()
if pos >= 0 {
s.cache.LRem(ctx, key, 0, address)
}
s.cache.LPush(ctx, key, address).Err()
}
func (s *Server) clearMetadataCache() {
ctx := context.Background()
key, address := "internal__"+s.conf.AppName+"__ips", s.conf.AppID // FIXME: key name
s.cache.LRem(ctx, key, 0, address)
}
func (s *Server) gracefulShutdown() error {
s.log.Log("Server is going down...")
s.log.Log("Unregistering service: %s", s.discovery.GetID())
s.discovery.Unregister()
s.clearMetadataCache()
s.ebCh.Close()
s.db.Close()
s.log.Close()
return s.Shutdown()
}

View File

@ -1,11 +1,13 @@
package event package event
type Event struct { type Event struct {
Command string `json:"command"`
RequestID string `json:"request_id"` RequestID string `json:"request_id"`
} }
func NewEvent(reqID string) *Event { func NewEvent(command, reqID string) *Event {
em := new(Event) em := new(Event)
em.Command = command
em.RequestID = reqID em.RequestID = reqID
return em return em

View File

@ -0,0 +1,111 @@
package server
import (
"fmt"
"net"
"os"
"time"
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
)
const (
defName = "pricing-svc"
defDomain = "pricing-svc"
defCacheAddr = "egommerce.local:6379"
defCachePassword = "12345678"
defDbURL = "postgres://postgres:12345678@db-postgres:5432/egommerce"
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
defKVNmspc = "dev.egommerce/service/pricing"
defLoggerAddr = "api-logger:24224"
defNetAddr = ":80"
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
defPathPrefix = "/pricing"
defRegistryAddr = "api-registry:8500"
defEbEventsExchange = "api-events"
defEbEventsQueue = "pricing-svc"
)
type Config struct {
ID string
Name string
Domain string
NetAddr string
RegistryDomainOverIP string
PathPrefix string
IdleTimeout time.Duration // miliseconds
ReadTimeout time.Duration // miliseconds
WriteTimeout time.Duration // miliseconds
LoggerAddr string `json:"logger_addr"`
DbURL string `json:"db_url"`
CacheAddr string `json:"cache_addr"`
CachePassword string `json:"cache_password"`
MongoDbUrl string `json:"mongodb_url"`
EventBusURL string `json:"eventbus_url"`
EventBusExchange string `json:"eventbus_exchange"`
EventBusQueue string `json:"eventbus_queue"`
KVNamespace string
RegistryAddr string
// Fields with JSON mappings are available through Consul KV storage
}
func NewConfig(name string) *Config {
c := new(Config)
c.ID, _ = os.Hostname()
c.Name = name
c.Domain = cnf.GetEnv("APP_DOMAIN", defDomain)
c.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr)
c.RegistryDomainOverIP = cnf.GetEnv("REGISTRY_USE_DOMAIN_OVER_IP", "false")
c.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix)
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
c.EventBusExchange = defEbEventsExchange
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr)
return c
}
func (c *Config) GetAppFullName() string {
return fmt.Sprintf("%s_%s", c.Name, c.ID)
}
func (c *Config) GetIP() string {
host, _ := os.Hostname()
ips, _ := net.LookupIP(host)
// for _, ip := range ips {
// return ip.String()
// }
return ips[0].String()
}
func (c *Config) GetArray() map[string]string { // FIXME fix types etc
arr := make(map[string]string)
arr["id"] = c.ID
arr["name"] = c.Name
arr["appFullname"] = c.GetAppFullName()
arr["domain"] = c.Domain
arr["ip"] = c.GetIP()
arr["netAddr"] = c.NetAddr
arr["registryDomainOverIP"] = c.RegistryDomainOverIP
arr["pathPrefix"] = c.PathPrefix
arr["cacheAddr"] = c.CacheAddr
arr["cachePassword"] = c.CachePassword
arr["dbURL"] = c.DbURL
arr["eventBusExchange"] = c.EventBusExchange
arr["eventBusURL"] = c.EventBusURL
arr["kvNamespace"] = c.KVNamespace
arr["loggerAddr"] = c.LoggerAddr
arr["registryAddr"] = c.RegistryAddr
return arr
}

View File

@ -0,0 +1,10 @@
package server
// REFACTOR: UNIVERSAL SERVER CODE
import (
"github.com/gofiber/fiber/v2"
)
func (s *Server) ConfigHandler(c *fiber.Ctx) error {
return c.JSON(s.Config)
}

View File

@ -1,9 +1,9 @@
package server package server
// REFACTOR: UNIVERSAL SERVER CODE
import ( import (
def "git.pbiernat.io/egommerce/api-entities/http"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
def "git.pbiernat.dev/egommerce/api-entities/http"
) )
func (s *Server) HealthHandler(c *fiber.Ctx) error { func (s *Server) HealthHandler(c *fiber.Ctx) error {
@ -11,7 +11,3 @@ func (s *Server) HealthHandler(c *fiber.Ctx) error {
Status: "OK", Status: "OK",
}) })
} }
func (s *Server) ConfigHandler(c *fiber.Ctx) error {
return c.JSON(s.conf)
}

View File

@ -0,0 +1,32 @@
package server
import (
"strings"
"github.com/gofiber/fiber/v2"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
)
// "github.com/gofiber/fiber/v2"
// "github.com/gofiber/fiber/v2/middleware/cors"
func SetupMiddleware(s *Server) {
s.Use(LoggingMiddleware(s.GetLogger()))
}
func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error {
return func(c *fiber.Ctx) error {
path := string(c.Request().URI().Path())
if strings.Contains(path, "/health") {
return c.Next()
}
log.Log("Request: %s, remote: %s, via: %s",
c.Request().URI().String(),
c.Context().RemoteIP().String(),
string(c.Context().UserAgent()))
return c.Next()
}
}

View File

@ -0,0 +1,29 @@
package server
import (
"strconv"
"github.com/gofiber/fiber/v2"
"git.pbiernat.io/egommerce/pricing-service/internal/service"
"git.pbiernat.io/egommerce/pricing-service/internal/ui"
)
func (s *Server) GetProductPriceHandler(c *fiber.Ctx) error {
prodID, err := strconv.Atoi(c.Params("id"))
if err != nil {
return s.Error(c, 404, err.Error())
}
priceSrv := service.NewPriceService(s.GetDatabase(), s.GetCache(), s.GetEventBus(), s.GetLogger())
res, err := ui.GetProductPrice(priceSrv, prodID)
if err != nil {
s.GetLogger().Log("GetProductPriceHandler error: %s", err)
return s.Error(c, 404, "Product not found")
}
s.GetLogger().Log("Price: %#v", res)
// s.Logger.Log("GetProductPriceHandler res: %s", c.JSON(res))
return c.JSON(res)
}

View File

@ -0,0 +1,29 @@
package server
import (
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/recover"
)
var (
defaultCORS = cors.New(cors.Config{
AllowOrigins: "*",
// AllowCredentials: true,
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
})
)
func SetupRouter(s *Server) {
s.Options("*", defaultCORS)
s.Use(defaultCORS)
s.Use(recover.New())
s.Get("/health", s.HealthHandler)
s.Get("/config", s.ConfigHandler)
api := s.Group("/api")
v1 := api.Group("/v1")
v1.Get("/product/:id<int>", s.GetProductPriceHandler)
}

View File

@ -0,0 +1,136 @@
package server
import (
"net"
"time"
"github.com/go-redis/redis/v8"
"github.com/gofiber/fiber/v2"
"github.com/jackc/pgx/v5/pgxpool"
amqp "github.com/rabbitmq/amqp091-go"
"git.pbiernat.io/egommerce/api-entities/http"
"git.pbiernat.io/egommerce/go-api-pkg/consul"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
)
type (
Server struct {
*fiber.App
ID string
addr string // e.g. "127.0.0.1:80"
handlers map[string]any
}
HeaderRequestID struct {
RequestID string `reqHeader:"x-request-id"`
}
)
func New(c *Config) *Server {
return &Server{
ID: c.ID,
App: fiber.New(fiber.Config{
AppName: c.ID,
ServerHeader: c.Name + ":" + c.ID,
ReadTimeout: c.ReadTimeout * time.Millisecond,
WriteTimeout: c.WriteTimeout * time.Millisecond,
IdleTimeout: c.IdleTimeout * time.Millisecond,
}),
addr: c.NetAddr,
handlers: make(map[string]any),
}
}
func (s *Server) Start() error {
SetupMiddleware(s)
SetupRouter(s)
// fmt.Printf("Starting server at: %s...\n", s.addr)
ln, _ := net.Listen("tcp", s.addr)
// ln = tls.NewListener(ln, s.App.Server().TLSConfig)
return s.Listener(ln)
}
func (s *Server) RegisterHandler(name string, fn func() any) {
// fmt.Printf("Registering plugin( with handler): %s... OK\n", name)
s.handlers[name] = fn()
}
func (s *Server) OnShutdown() {
// s.GetLogger().Log("Server %s is going down...", s.ID)
s.GetRegistry().Unregister()
// a.clearMetadataCache()
s.GetEventBus().Close()
s.GetDatabase().Close()
s.GetLogger().Log("Gone.")
s.GetLogger().Close()
s.Shutdown()
}
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
var hdr = new(HeaderRequestID)
if err := c.ReqHeaderParser(hdr); err != nil {
return "", err
}
return hdr.RequestID, nil
}
func (s *Server) Error(c *fiber.Ctx, code int, msg string) error {
return c.Status(code).JSON(http.ErrorResponse{Error: msg})
}
// Plugin helper funcitons
func (s *Server) GetCache() *redis.Client {
return (s.handlers["cache"]).(*redis.Client)
}
func (s *Server) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue
return (s.handlers["database"]).(*pgxpool.Pool)
}
func (s *Server) GetEventBus() *amqp.Channel {
return (s.handlers["eventbus"]).(*amqp.Channel)
}
func (s *Server) GetLogger() *fluentd.Logger {
return (s.handlers["logger"]).(*fluentd.Logger)
}
func (s *Server) GetRegistry() *consul.Service {
return (s.handlers["registry"]).(*consul.Service)
}
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
// func (s *Server) registerKVUpdater() { // @FIXME: merge duplication in server.go and worker.go
// go func() {
// ticker := time.NewTicker(time.Second * 10)
// for range ticker.C {
// config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil)
// if err != nil || config == nil {
// return
// }
// kvCnf := bytes.NewBuffer(config.Value)
// decoder := json.NewDecoder(kvCnf)
// if err := decoder.Decode(&s.Config); err != nil {
// return
// }
// }
// }()
// }
// func (s *Server) clearMetadataCache() {
// ctx := context.Background()
// key, address := s.getMetadataIPsKey(), s.Config.Base.AppID
// s.Cache.LRem(ctx, key, 0, address)
// }
// func (s *Server) getMetadataIPsKey() string {
// return "internal__" + s.Base.Config.AppName + "__ips"
// }

View File

@ -4,14 +4,17 @@ import (
"context" "context"
"time" "time"
"git.pbiernat.dev/egommerce/api-entities/model" "git.pbiernat.io/egommerce/api-entities/model"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
"github.com/georgysavva/scany/v2/pgxscan" "github.com/georgysavva/scany/v2/pgxscan"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"github.com/streadway/amqp" amqp "github.com/rabbitmq/amqp091-go"
) )
// REFACTOR -> to many things to do -> read(different sources) + save(redis-cache)
type PriceService struct { type PriceService struct {
dbConn *pgxpool.Pool dbConn *pgxpool.Pool
redis *redis.Client redis *redis.Client
@ -23,7 +26,7 @@ func NewPriceService(dbConn *pgxpool.Pool, redis *redis.Client, chn *amqp.Channe
return &PriceService{dbConn, redis, chn, log} return &PriceService{dbConn, redis, chn, log}
} }
func (s *PriceService) FetchFromDB(ctx context.Context, prodID int) (float64, error) { func (s *PriceService) FetchFromDB(ctx context.Context, prodID int) (int, error) {
product := new(model.ProductPriceModel) product := new(model.ProductPriceModel)
err := pgxscan.Get(ctx, s.dbConn, product, `SELECT id, pid, price FROM catalog.product WHERE id=$1`, prodID) err := pgxscan.Get(ctx, s.dbConn, product, `SELECT id, pid, price FROM catalog.product WHERE id=$1`, prodID)
if err != nil { if err != nil {

View File

@ -5,8 +5,8 @@ import (
"strconv" "strconv"
"time" "time"
def "git.pbiernat.dev/egommerce/api-entities/http" def "git.pbiernat.io/egommerce/api-entities/http"
"git.pbiernat.dev/egommerce/pricing-service/internal/app/service" "git.pbiernat.io/egommerce/pricing-service/internal/service"
) )
func GetProductPrice(srv *service.PriceService, prodID int) (*def.ProductPriceResponse, error) { func GetProductPrice(srv *service.PriceService, prodID int) (*def.ProductPriceResponse, error) {
@ -15,11 +15,11 @@ func GetProductPrice(srv *service.PriceService, prodID int) (*def.ProductPriceRe
res := &def.ProductPriceResponse{} res := &def.ProductPriceResponse{}
if price, _ := srv.FetchFromCache(ctx, key, "float"); price != 0 { if price, _ := srv.FetchFromCache(ctx, key, "float"); price != 0 {
res.Price = price.(float64) res.Price = price.(int)
} else { } else {
price, err := srv.FetchFromDB(ctx, prodID) price, err := srv.FetchFromDB(ctx, prodID)
if err != nil { if err != nil {
return res, err return nil, err
} }
res.Price = price res.Price = price

View File

@ -0,0 +1,54 @@
package worker
import (
"git.pbiernat.io/egommerce/pricing-service/internal/service"
)
var (
StockUpdated = "event.WarehouseStockUpdatedEvent"
)
type Command interface {
run(CommandData) (bool, any)
}
type CommandData map[string]interface{}
type CommandRunner struct {
cmd Command
}
func NewCommandRunner(data map[string]interface{}, srvc *service.PriceService) *CommandRunner {
rnr := &CommandRunner{}
rnr.cmd = getCommand((data["command"]).(string), srvc)
return rnr
}
func getCommand(cmd string, srvc *service.PriceService) Command {
// fmt.Printf("getCommand: %v\n", cmd)
var c Command
switch cmd { // FIXME
case "StockUpdated":
c = &StockUpdatedCommand{srvc}
}
return c
}
func (r *CommandRunner) run(data CommandData) (bool, any) {
return r.cmd.run(data)
}
type StockUpdatedCommand struct {
srvc *service.PriceService
}
func (c *StockUpdatedCommand) run(data CommandData) (bool, any) {
// reqID := data["request_id"].(string) // FIXME Check input params!
// productID := int(data["product_id"].(float64)) // FIXME Check input params!
// stock, err := ui.StockUpdated(c.srvc, productID, reqID)
return true, nil //err == nil, basket
}

View File

@ -0,0 +1,85 @@
package worker
import (
"fmt"
"net"
"os"
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
)
const (
defName = "pricing-worker"
defDomain = "pricing-worker"
defCacheAddr = "egommerce.local:6379"
defCachePassword = "12345678"
defDbURL = "postgres://postgres:12345678@db-postgres:5432/egommerce"
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
defKVNmspc = "dev.egommerce/service/pricing-worker"
defLoggerAddr = "api-logger:24224"
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
defEbEventsExchange = "api-events"
defEbEventsQueue = "pricing-svc"
)
type Config struct {
ID string
Name string
LoggerAddr string `json:"logger_addr"`
DbURL string `json:"db_url"`
CacheAddr string `json:"cache_addr"`
CachePassword string `json:"cache_password"`
MongoDbUrl string `json:"mongodb_url"`
EventBusURL string `json:"eventbus_url"`
EventBusExchange string `json:"eventbus_exchange"`
EventBusQueue string `json:"eventbus_queue"`
KVNamespace string
}
func NewConfig(name string) *Config {
c := new(Config)
c.ID, _ = os.Hostname()
c.Name = name
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
c.EventBusExchange = defEbEventsExchange
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
return c
}
func (c *Config) GetAppFullName() string {
return fmt.Sprintf("%s_%s", c.Name, c.ID)
}
func (c *Config) GetIP() string {
host, _ := os.Hostname()
ips, _ := net.LookupIP(host)
// for _, ip := range ips {
// return ip.String()
// }
return ips[0].String()
}
func (c *Config) GetArray() map[string]string { // FIXME fix types etc
arr := make(map[string]string)
arr["id"] = c.ID
arr["name"] = c.Name
arr["appFullname"] = c.GetAppFullName()
arr["cacheAddr"] = c.CacheAddr
arr["cachePassword"] = c.CachePassword
arr["dbURL"] = c.DbURL
arr["eventBusExchange"] = c.EventBusExchange
arr["eventBusURL"] = c.EventBusURL
arr["kvNamespace"] = c.KVNamespace
arr["loggerAddr"] = c.LoggerAddr
return arr
}

View File

@ -0,0 +1,192 @@
package worker
import (
"fmt"
"log"
"os"
"strings"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v5/pgxpool"
amqp "github.com/rabbitmq/amqp091-go"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
"git.pbiernat.io/egommerce/go-api-pkg/rabbitmq"
"git.pbiernat.io/egommerce/pricing-service/internal/event"
"git.pbiernat.io/egommerce/pricing-service/internal/service"
)
type (
Worker struct {
ID string
cnf *Config
handlers map[string]any
services map[string]any
doWrkUntil chan struct{}
}
)
func New(c *Config) *Worker {
return &Worker{
ID: c.ID,
cnf: c,
handlers: make(map[string]any),
services: make(map[string]any),
doWrkUntil: make(chan struct{}),
}
}
func (w *Worker) Start() error {
// Init
err := rabbitmq.NewExchange(w.GetEventBus(), w.cnf.EventBusExchange)
if err != nil {
w.GetLogger().Log("Failed to declare EventBus exchange: %v\n", err)
fmt.Printf("Failed to declare EventBus exchange: %v\n", err)
os.Exit(1)
}
_, err = w.GetEventBus().QueueDeclare(
w.cnf.EventBusQueue, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
w.GetLogger().Log("Failed to declare EventBus queue: %v\n", err)
fmt.Printf("Failed to declare EventBus queue: %v\n", err)
os.Exit(1)
}
// w.bindQueues()
rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "basket.pricing.*")
err = w.doWork(w.doWrkUntil)
if err != nil {
log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.ID, err)
close(w.doWrkUntil)
}
<-w.doWrkUntil
return err
// go func() {
// sigint := make(chan os.Signal, 1)
// signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
// <-sigint
// w.Shutdown()
// close(while)
// }()
// run := w.createRunFile("./app.run") // TODO move to common library (shared between server and worker)
// defer w.removeRunFile(run)
// w.Logger.Log("Waiting for messages...")
// return nil
}
func (w *Worker) RegisterHandler(name string, fn func() any) {
// fmt.Printf("Registering plugin( with handler): %s... OK\n", name)
w.handlers[name] = fn()
}
func (w *Worker) OnShutdown() {
w.GetLogger().Log("Worker %s is going down...", w.ID)
// fmt.Printf("Worker %s is going down...\n", w.ID)
w.GetEventBus().Close()
w.GetDatabase().Close()
w.GetLogger().Log("Gone.")
w.GetLogger().Close()
close(w.doWrkUntil)
}
// Plugin helper funcitons
func (w *Worker) GetCache() *redis.Client {
return (w.handlers["cache"]).(*redis.Client)
}
func (w *Worker) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue
return (w.handlers["database"]).(*pgxpool.Pool)
}
func (w *Worker) GetEventBus() *amqp.Channel {
return (w.handlers["eventbus"]).(*amqp.Channel)
}
func (w *Worker) GetLogger() *fluentd.Logger {
return (w.handlers["logger"]).(*fluentd.Logger)
}
func (w *Worker) doWork(while chan struct{}) error {
w.services["price"] =
service.NewPriceService(w.GetDatabase(), w.GetCache(), w.GetEventBus(), w.GetLogger())
pSrv := (w.services["price"]).(*service.PriceService)
msgs, err := w.GetEventBus().Consume(
w.cnf.EventBusQueue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
w.GetLogger().Log("Failed to register a consumer: %s", err)
os.Exit(1)
}
go func() {
for d := range msgs {
// go func(d amqp.Delivery) {
w.processMsg(pSrv, d)
// }(d)
}
}()
<-while
return nil
}
func (w *Worker) processMsg(srvc *service.PriceService, d amqp.Delivery) {
msg, err := rabbitmq.Deserialize(d.Body)
if err != nil {
w.GetLogger().Log("Deserialization error: %v\n", err)
d.Reject(false)
return
}
name := fmt.Sprintf("%s", msg["event"])
data := (msg["data"]).(map[string]interface{})
// reqID := (data["request_id"]).(string) // FIXME Check input params!
w.GetLogger().Log("Processing message \"%s\" with data: %v\n", name, data)
var ok = false
switch true { // Refactor -> use case for polymorphism
case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET):
w.GetLogger().Log("Event: %s", event.EVENT_PRODUCT_ADDED_TO_BASKET)
case strings.Contains(name, event.EVENT_PRODUCT_REMOVED_FROM_BASKET):
w.GetLogger().Log("Event: %s", event.EVENT_PRODUCT_REMOVED_FROM_BASKET)
}
rnr := NewCommandRunner(data, srvc)
ok, _ = rnr.run(data)
if ok {
w.GetLogger().Log("Successful executed message \"%s\"\n", name)
d.Ack(false)
return
}
w.GetLogger().Log("Error processing \"%s\": %s (%v)", name, err.Error(), err)
d.Reject(false) // FIXME: or Nack(repeat until success - maybe message shout know...?
}