Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
d3c0a66bd4 | |||
5eef989e40 | |||
056421b5c0 | |||
1eed3f506b |
.app.config.env.dist.gitignoreDockerfile.builderDockerfile.targetMakefile
bin
deploy
src
.gitignoreapp.run
cmd
go.modgo.suminternal
pkg
24
.app.config
24
.app.config
@ -1,24 +0,0 @@
|
|||||||
{
|
|
||||||
"ID": "basket",
|
|
||||||
"Name": "basket",
|
|
||||||
"Address": "__IP__",
|
|
||||||
"Tags": ["basket-svc", "basket", "https", "service"],
|
|
||||||
"Port": 443,
|
|
||||||
"Connect": {
|
|
||||||
"Native": true,
|
|
||||||
"SidecarService": {
|
|
||||||
"Port": 20001,
|
|
||||||
"Check": {
|
|
||||||
"Name": "Connect Envoy Sidecar",
|
|
||||||
"TCP": "127.0.0.1:20001",
|
|
||||||
"Interval": "5s"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"Check": {
|
|
||||||
"TCP": "__IP__:443",
|
|
||||||
"Interval": "5s",
|
|
||||||
"Timeout": "1s",
|
|
||||||
"DeregisterCriticalServiceAfter": "10s"
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,15 +1,16 @@
|
|||||||
SERVER_ADDR=:443
|
SERVER_ADDR=:80
|
||||||
|
|
||||||
APP_NAME=basket-svc
|
APP_NAME=basket-svc
|
||||||
APP_DOMAIN=basket.service.ego.io
|
APP_DOMAIN=basket-svc
|
||||||
REGISTRY_USE_DOMAIN_OVER_IP=false
|
|
||||||
APP_PATH_PREFIX=/basket
|
APP_PATH_PREFIX=/basket
|
||||||
APP_KV_NAMESPACE=dev.egommerce/service/basket-svc
|
APP_KV_NAMESPACE=dev.egommerce/service/basket-svc
|
||||||
|
|
||||||
LOGGER_ADDR=api-logger:24224
|
LOGGER_ADDR=api-logger:24224
|
||||||
REGISTRY_ADDR=api-registry:8501
|
REGISTRY_ADDR=api-registry:8500
|
||||||
DATABASE_URL=postgres://postgres:12345678@postgres-db:5432/egommerce
|
DATABASE_URL=postgres://postgres:12345678@postgres-db:5432/egommerce
|
||||||
CACHE_ADDR=api-cache:6379
|
CACHE_ADDR=api-cache:6379
|
||||||
CACHE_PASSWORD=12345678
|
CACHE_PASSWORD=12345678
|
||||||
MONGODB_URL=mongodb://mongodb:12345678@mongo-db:27017
|
MONGODB_URL=mongodb://mongodb:12345678@mongo-db:27017
|
||||||
EVENTBUS_URL=amqp://guest:guest@api-eventbus:5672
|
EVENTBUS_URL=amqp://guest:guest@api-eventbus:5672
|
||||||
|
|
||||||
|
|
||||||
|
6
.gitignore
vendored
6
.gitignore
vendored
@ -1,6 +0,0 @@
|
|||||||
.env
|
|
||||||
.env.*
|
|
||||||
!.env.dist
|
|
||||||
|
|
||||||
.vscode/
|
|
||||||
__debug_bin
|
|
@ -5,7 +5,6 @@ ARG BIN_OUTPUT=/go/bin
|
|||||||
ARG GO_SERVER=cmd/server/main.go
|
ARG GO_SERVER=cmd/server/main.go
|
||||||
ARG GO_MIGRATE=cmd/migrate/main.go
|
ARG GO_MIGRATE=cmd/migrate/main.go
|
||||||
ARG GO_WORKER=cmd/worker/main.go
|
ARG GO_WORKER=cmd/worker/main.go
|
||||||
ARG GO_HEALTH=cmd/health/main.go
|
|
||||||
|
|
||||||
WORKDIR /go/src/app
|
WORKDIR /go/src/app
|
||||||
COPY src ./
|
COPY src ./
|
||||||
@ -13,5 +12,4 @@ COPY src ./
|
|||||||
RUN export CGO_ENABLED=0 ; export GOOS=linux ; export GOARCH=amd64 && \
|
RUN export CGO_ENABLED=0 ; export GOOS=linux ; export GOARCH=amd64 && \
|
||||||
go build -ldflags="-w -s" -o "$BIN_OUTPUT/server" $GO_SERVER && \
|
go build -ldflags="-w -s" -o "$BIN_OUTPUT/server" $GO_SERVER && \
|
||||||
go build -ldflags="-w -s" -o "$BIN_OUTPUT/migrate" $GO_MIGRATE && \
|
go build -ldflags="-w -s" -o "$BIN_OUTPUT/migrate" $GO_MIGRATE && \
|
||||||
go build -ldflags="-w -s" -o "$BIN_OUTPUT/worker" $GO_WORKER && \
|
go build -ldflags="-w -s" -o "$BIN_OUTPUT/worker" $GO_WORKER
|
||||||
go build -ldflags="-w -s" -o "$BIN_OUTPUT/health" $GO_HEALTH
|
|
||||||
|
@ -19,18 +19,12 @@ LABEL dev.egommerce.image.build_time=${BUILD_TIME}
|
|||||||
|
|
||||||
WORKDIR /
|
WORKDIR /
|
||||||
COPY --from=builder $BIN_OUTPUT /app
|
COPY --from=builder $BIN_OUTPUT /app
|
||||||
COPY --from=builder /go/bin/migrate /bin/migrate
|
COPY --from=builder /go/bin/migrate /bin/go_migrate
|
||||||
COPY --from=builder /go/bin/health /bin/health
|
COPY .env.dist /.env
|
||||||
COPY .env.docker /.env
|
|
||||||
COPY ./.app.config /
|
|
||||||
COPY ./bin /bin
|
COPY ./bin /bin
|
||||||
RUN chmod 755 /bin/entrypoint.sh /bin/migrate.sh
|
RUN chmod 755 /bin/entrypoint.sh /bin/migrate.sh
|
||||||
|
|
||||||
RUN apk add curl
|
EXPOSE 80
|
||||||
|
|
||||||
EXPOSE 443
|
|
||||||
|
|
||||||
ENTRYPOINT ["entrypoint.sh"]
|
|
||||||
CMD ["sh", "-c", "/app"]
|
CMD ["sh", "-c", "/app"]
|
||||||
|
ENTRYPOINT ["entrypoint.sh"]
|
||||||
HEALTHCHECK --interval=5s --timeout=1s --retries=20 CMD health >/dev/null || exit 1
|
|
||||||
|
3
Makefile
3
Makefile
@ -8,9 +8,6 @@ build-image-dev:
|
|||||||
build-image-prod:
|
build-image-prod:
|
||||||
- sh ${DEPLOY_DIR}/image-build.sh
|
- sh ${DEPLOY_DIR}/image-build.sh
|
||||||
|
|
||||||
push-image-dev:
|
|
||||||
- sh ${DEPLOY_DIR}/image-push.sh dev
|
|
||||||
|
|
||||||
push-image-prod:
|
push-image-prod:
|
||||||
- sh ${DEPLOY_DIR}/image-push.sh
|
- sh ${DEPLOY_DIR}/image-push.sh
|
||||||
|
|
||||||
|
@ -14,23 +14,15 @@ waitForService()
|
|||||||
done
|
done
|
||||||
}
|
}
|
||||||
|
|
||||||
update-resolv
|
waitForService "postgres-db:5432"
|
||||||
update-ca-certificates
|
waitForService "api-eventbus:5672"
|
||||||
|
waitForService "api-logger:24224"
|
||||||
waitForService "api-registry:8501"
|
waitForService "api-registry:8500"
|
||||||
waitForService "esb.service.ego.io:5672"
|
waitForService "pricing-svc:80"
|
||||||
waitForService "logger.service.ego.io:24224"
|
|
||||||
waitForService "postgresdb.service.ego.io:5432"
|
|
||||||
# waitForService "pricing.service.ego.io:443"
|
|
||||||
# waitForService "api-eventbus:5672"
|
|
||||||
# waitForService "api-logger:24224"
|
|
||||||
# waitForService "db-postgres:5432"
|
|
||||||
# waitForService "pricing-svc:443" #dev-disabled
|
|
||||||
|
|
||||||
register-service
|
|
||||||
|
|
||||||
# run migrations
|
# run migrations
|
||||||
migrate.sh
|
migrate.sh
|
||||||
|
|
||||||
# set -euo pipefail
|
# set -euo pipefail
|
||||||
|
|
||||||
exec "$@"
|
exec "$@"
|
||||||
|
@ -1,17 +1,17 @@
|
|||||||
#!/usr/bin/env sh
|
#!/usr/bin/env sh
|
||||||
|
|
||||||
# ensure migrate env is initialized
|
# ensure migrate env is initialized
|
||||||
$(migrate version >/dev/null 2>&1)
|
$(go_migrate version >/dev/null 2>&1)
|
||||||
version=$?
|
version=$?
|
||||||
if [ $version != "0" ]
|
if [ $version != "0" ]
|
||||||
then
|
then
|
||||||
echo "Creating base table..."
|
echo "Creating base table..."
|
||||||
$(migrate init >/dev/null 2>&1)
|
$(go_migrate init >/dev/null 2>&1)
|
||||||
init=$?
|
init=$?
|
||||||
fi
|
fi
|
||||||
|
|
||||||
# check again
|
# check again
|
||||||
$(migrate version >/dev/null 2>&1)
|
$(go_migrate version >/dev/null 2>&1)
|
||||||
version=$?
|
version=$?
|
||||||
if [ $version != "0" ]
|
if [ $version != "0" ]
|
||||||
then
|
then
|
||||||
@ -20,7 +20,7 @@ then
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
# run migrations
|
# run migrations
|
||||||
migrate up
|
go_migrate up
|
||||||
echo "Done."
|
echo "Done."
|
||||||
|
|
||||||
exit $version
|
exit $version
|
||||||
|
@ -1,7 +1,5 @@
|
|||||||
#!/usr/bin/env sh
|
#!/usr/bin/env sh
|
||||||
|
# Use this script to test if a given TCP host/port are available
|
||||||
# https://github.com/vishnubob/wait-for-it/blob/master/wait-for-it.sh
|
|
||||||
# Use this script to test if a given TCP host/port are available
|
|
||||||
|
|
||||||
set -e
|
set -e
|
||||||
|
|
||||||
|
@ -1 +0,0 @@
|
|||||||
../../stack/deploy/certs/basket-svc/
|
|
@ -1,7 +1,7 @@
|
|||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
# RUN IN REPO ROOT DIR !!
|
# RUN IN REPO ROOT DIR !!
|
||||||
|
|
||||||
export IMAGE_PREFIX="git.pbiernat.io/egommerce/basket"
|
export IMAGE_PREFIX="git.pbiernat.dev/egommerce/basket"
|
||||||
export BUILDER_IMAGE="egommerce-builder:basket"
|
export BUILDER_IMAGE="egommerce-builder:basket"
|
||||||
export BUILD_TIME=$(date +"%Y%m%d%H%M%S")
|
export BUILD_TIME=$(date +"%Y%m%d%H%M%S")
|
||||||
export SERVER_IMAGE="$IMAGE_PREFIX-svc"
|
export SERVER_IMAGE="$IMAGE_PREFIX-svc"
|
||||||
|
@ -1,17 +1,13 @@
|
|||||||
#!/bin/sh
|
#!/bin/sh
|
||||||
# RUN IN REPO ROOT DIR !!
|
# RUN IN REPO ROOT DIR !!
|
||||||
|
|
||||||
export IMAGE_BASE="git.pbiernat.io/egommerce/basket"
|
export IMAGE_BASE="git.pbiernat.dev/egommerce/basket"
|
||||||
export SERVER_IMAGE="$IMAGE_BASE-svc"
|
export SERVER_IMAGE="$IMAGE_BASE-svc"
|
||||||
export WORKER_IMAGE="$IMAGE_BASE-worker"
|
export WORKER_IMAGE="$IMAGE_BASE-worker"
|
||||||
|
|
||||||
TARGET=${1:-latest}
|
TARGET=${1:-latest}
|
||||||
|
|
||||||
echo $DOCKER_PASSWORD | docker login git.pbiernat.io -u $DOCKER_USERNAME --password-stdin
|
echo $DOCKER_PASSWORD | docker login git.pbiernat.dev -u $DOCKER_USERNAME --password-stdin
|
||||||
|
|
||||||
docker push "$SERVER_IMAGE:$TARGET"
|
docker push "$SERVER_IMAGE:$TARGET"
|
||||||
docker push "$WORKER_IMAGE:$TARGET"
|
docker push "$WORKER_IMAGE:$TARGET"
|
||||||
|
|
||||||
# Restart container
|
|
||||||
curl -X POST http://127.0.0.1:9001/api/webhooks/70892c11-b14e-4fb5-b813-ed0daade00d8
|
|
||||||
curl -X POST http://127.0.0.1:9001/api/webhooks/309e753d-641d-48ea-9256-a267f4f8b5b4
|
|
||||||
|
1
src/.gitignore
vendored
1
src/.gitignore
vendored
@ -13,6 +13,5 @@
|
|||||||
*.out
|
*.out
|
||||||
|
|
||||||
.env
|
.env
|
||||||
|
|
||||||
# Dependency directories (remove the comment below to include it)
|
# Dependency directories (remove the comment below to include it)
|
||||||
vendor/
|
vendor/
|
||||||
|
@ -1 +0,0 @@
|
|||||||
1697814
|
|
@ -1,39 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"flag"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
)
|
|
||||||
|
|
||||||
const usageText = `This program runs healthcheck on the app.
|
|
||||||
Usage:
|
|
||||||
go run cmd/health/main.go
|
|
||||||
`
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
flag.Usage = func() {
|
|
||||||
fmt.Print(usageText)
|
|
||||||
flag.PrintDefaults()
|
|
||||||
os.Exit(2)
|
|
||||||
}
|
|
||||||
flag.Parse()
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
var exitCode = 1
|
|
||||||
if isOk := healthCheck(); isOk {
|
|
||||||
exitCode = 0
|
|
||||||
}
|
|
||||||
os.Exit(exitCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
func healthCheck() bool {
|
|
||||||
run, err := os.Open("./app.run")
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
defer run.Close()
|
|
||||||
|
|
||||||
return true
|
|
||||||
}
|
|
@ -9,10 +9,10 @@ import (
|
|||||||
"github.com/go-pg/migrations/v8"
|
"github.com/go-pg/migrations/v8"
|
||||||
"github.com/go-pg/pg/v10"
|
"github.com/go-pg/pg/v10"
|
||||||
|
|
||||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||||
|
|
||||||
cnf "git.pbiernat.io/egommerce/basket-service/internal/server"
|
"git.pbiernat.dev/egommerce/basket-service/internal/common"
|
||||||
baseCnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
cnf "git.pbiernat.dev/egommerce/basket-service/pkg/config"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -35,20 +35,14 @@ Usage:
|
|||||||
`
|
`
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
flag.Usage = func() {
|
if cnf.ErrLoadingEnvs != nil {
|
||||||
fmt.Print(usageText)
|
log.Panicln("Error loading .env file", cnf.ErrLoadingEnvs)
|
||||||
flag.PrintDefaults()
|
|
||||||
os.Exit(2)
|
|
||||||
}
|
|
||||||
flag.Parse()
|
|
||||||
|
|
||||||
if baseCnf.ErrLoadingEnvs != nil {
|
|
||||||
log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c := cnf.NewConfig("basket-migrator")
|
c := common.NewConfig()
|
||||||
|
|
||||||
// dbURL := baseCnf.GetEnv("DATABASE_URL", defDbURL)
|
// dbURL := cnf.GetEnv("DATABASE_URL", defDbURL)
|
||||||
|
mTblName := cnf.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName)
|
||||||
|
|
||||||
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
|
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -61,17 +55,19 @@ func main() {
|
|||||||
}
|
}
|
||||||
defer logger.Close()
|
defer logger.Close()
|
||||||
|
|
||||||
|
flag.Usage = usage
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
db := pg.Connect(&pg.Options{ // FIXME
|
db := pg.Connect(&pg.Options{ // FIXME
|
||||||
Addr: "postgres-db:5432",
|
Addr: "postgres-db:5432",
|
||||||
User: "postgres",
|
User: "postgres",
|
||||||
Password: "12345678",
|
Password: "12345678",
|
||||||
Database: "egommerce",
|
Database: "egommerce",
|
||||||
})
|
})
|
||||||
defer db.Close()
|
|
||||||
|
|
||||||
mTbl := baseCnf.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName)
|
|
||||||
mig := migrations.NewCollection()
|
mig := migrations.NewCollection()
|
||||||
mig.SetTableName(mTbl)
|
mig.SetTableName(mTblName)
|
||||||
|
|
||||||
if err := mig.DiscoverSQLMigrations("./migrations"); err != nil {
|
if err := mig.DiscoverSQLMigrations("./migrations"); err != nil {
|
||||||
logger.Log("migration dicovery error: %#v", err)
|
logger.Log("migration dicovery error: %#v", err)
|
||||||
}
|
}
|
||||||
@ -86,5 +82,10 @@ func main() {
|
|||||||
} else {
|
} else {
|
||||||
logger.Log("version is %d\n", oldVersion)
|
logger.Log("version is %d\n", oldVersion)
|
||||||
}
|
}
|
||||||
// os.Exit(0)
|
}
|
||||||
|
|
||||||
|
func usage() {
|
||||||
|
fmt.Print(usageText)
|
||||||
|
flag.PrintDefaults()
|
||||||
|
os.Exit(2)
|
||||||
}
|
}
|
||||||
|
@ -5,10 +5,10 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
cnf "git.pbiernat.dev/egommerce/basket-service/pkg/config"
|
||||||
|
|
||||||
"git.pbiernat.io/egommerce/basket-service/internal/app"
|
"git.pbiernat.dev/egommerce/basket-service/internal/app/server"
|
||||||
"git.pbiernat.io/egommerce/basket-service/internal/server"
|
"git.pbiernat.dev/egommerce/basket-service/internal/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -16,26 +16,68 @@ func main() {
|
|||||||
log.Panicln("Error loading .env file", cnf.ErrLoadingEnvs)
|
log.Panicln("Error loading .env file", cnf.ErrLoadingEnvs)
|
||||||
}
|
}
|
||||||
|
|
||||||
c := server.NewConfig("basket")
|
c := common.NewConfig()
|
||||||
cArr := c.GetArray()
|
srv := server.New(
|
||||||
|
c,
|
||||||
|
server.WithCache(c),
|
||||||
|
server.WithDatabase(c),
|
||||||
|
server.WithEventbus(c),
|
||||||
|
server.WithLogger(c),
|
||||||
|
server.WithRegistry(c),
|
||||||
|
)
|
||||||
|
|
||||||
doer := server.New(c)
|
forever := make(chan struct{})
|
||||||
a := app.NewApp(doer)
|
|
||||||
a.RegisterPlugin(app.LoggerPlugin(cArr))
|
|
||||||
a.RegisterPlugin(app.CachePlugin(cArr))
|
|
||||||
a.RegisterPlugin(app.DatabasePlugin(cArr))
|
|
||||||
a.RegisterPlugin(app.EventbusPlugin(cArr))
|
|
||||||
// a.RegisterPlugin(app.RegistryPlugin(cArr))
|
|
||||||
|
|
||||||
while := make(chan struct{})
|
err := srv.Base.Start(forever, srv.Shutdown())
|
||||||
err := a.Start(while)
|
// server.SetupMiddleware(srv)
|
||||||
<-while
|
// server.SetupRouter(srv)
|
||||||
|
|
||||||
|
<-forever
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to start server. Reason: %v\n", err)
|
log.Fatalf("Failed to start server. Reason: %v\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Gone")
|
fmt.Println("Done.")
|
||||||
os.Exit(0)
|
os.Exit(0)
|
||||||
|
|
||||||
|
// c.AppDomain = cnf.GetEnv("APP_DOMAIN", defAppDomain)
|
||||||
|
// c.Port, _ = strconv.Atoi(c.NetAddr[1:])
|
||||||
|
// c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
||||||
|
|
||||||
|
// logHost, logPort := fluentd.ParseAddr(c.LoggerAddr)
|
||||||
|
// logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort)
|
||||||
|
// defer logger.Close()
|
||||||
|
|
||||||
|
// db conn
|
||||||
|
// dbConn, err := database.Connect(c.DbURL)
|
||||||
|
// if err != nil { // fixme: add wait-for-db...
|
||||||
|
// logger.Log("Failed to connect to Database server: %v\n", err)
|
||||||
|
// os.Exit(1)
|
||||||
|
// }
|
||||||
|
// defer dbConn.Close()
|
||||||
|
|
||||||
|
// redis conn
|
||||||
|
// redis := redis.NewClient(&redis.Options{
|
||||||
|
// Addr: c.CacheAddr,
|
||||||
|
// Password: c.CachePassword,
|
||||||
|
// DB: 0,
|
||||||
|
// })
|
||||||
|
// defer redis.Close()
|
||||||
|
|
||||||
|
// eventbus conn
|
||||||
|
// ebConn, ebCh, err := amqp.Open(c.EventBusURL)
|
||||||
|
// if err != nil {
|
||||||
|
// logger.Log("Failed to connect to EventBus server: %v\n", err)
|
||||||
|
// os.Exit(1)
|
||||||
|
// }
|
||||||
|
// defer ebCh.Close()
|
||||||
|
// defer amqp.Close(ebConn)
|
||||||
|
|
||||||
|
// err = amqp.NewExchange(ebCh, c.EventBusExchange)
|
||||||
|
// if err != nil {
|
||||||
|
// logger.Log("Failed to declare EventBus exchange: %v\n", err)
|
||||||
|
// os.Exit(1)
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
@ -4,37 +4,214 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
|
||||||
|
"github.com/go-redis/redis/v8"
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
|
||||||
"git.pbiernat.io/egommerce/basket-service/internal/app"
|
"git.pbiernat.dev/egommerce/go-api-pkg/consul"
|
||||||
"git.pbiernat.io/egommerce/basket-service/internal/worker"
|
|
||||||
|
"git.pbiernat.dev/egommerce/basket-service/internal/app/event"
|
||||||
|
"git.pbiernat.dev/egommerce/basket-service/internal/app/service"
|
||||||
|
"git.pbiernat.dev/egommerce/basket-service/internal/app/ui"
|
||||||
|
"git.pbiernat.dev/egommerce/basket-service/internal/common"
|
||||||
|
"git.pbiernat.dev/egommerce/basket-service/pkg/config"
|
||||||
|
"git.pbiernat.dev/egommerce/basket-service/pkg/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// defAppName = "basket-worker"
|
||||||
|
// defLoggerAddr = "api-logger:24224"
|
||||||
|
// defRegistryAddr = "api-registry:8500"
|
||||||
|
// defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce"
|
||||||
|
// defCacheAddr = "api-cache:6379"
|
||||||
|
// defCachePassword = "12345678"
|
||||||
|
// defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
||||||
|
// defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
|
||||||
|
// ebEventsExchange = "api-events"
|
||||||
|
// ebEventsQueue = "basket-worker"
|
||||||
|
// defKVNmspc = "dev.egommerce/service/basket-worker"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
if cnf.ErrLoadingEnvs != nil {
|
if config.ErrLoadingEnvs != nil {
|
||||||
log.Fatalln("Error loading .env file.")
|
log.Panicln("Error loading .env file", config.ErrLoadingEnvs)
|
||||||
}
|
}
|
||||||
|
|
||||||
c := worker.NewConfig("catalog-worker")
|
c := common.NewConfig()
|
||||||
cArr := c.GetArray()
|
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
|
||||||
|
|
||||||
doer := worker.New(c)
|
|
||||||
a := app.NewApp(doer)
|
|
||||||
a.RegisterPlugin(app.LoggerPlugin(cArr))
|
|
||||||
a.RegisterPlugin(app.CachePlugin(cArr))
|
|
||||||
a.RegisterPlugin(app.DatabasePlugin(cArr))
|
|
||||||
a.RegisterPlugin(app.EventbusPlugin(cArr))
|
|
||||||
|
|
||||||
while := make(chan struct{})
|
|
||||||
err := a.Start(while)
|
|
||||||
<-while
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to start worker. Reason: %v\n", err)
|
log.Fatalf("Error parsing logger addr: %s. Err: %v", c.LoggerAddr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger, err := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err)
|
||||||
|
}
|
||||||
|
defer logger.Close()
|
||||||
|
|
||||||
|
// consul, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0)
|
||||||
|
registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0)
|
||||||
|
if err != nil {
|
||||||
|
logger.Log("Error connecting to %s: %v", c.RegistryAddr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
go func(consul *consul.Service) {
|
||||||
|
ticker := time.NewTicker(time.Second * 15)
|
||||||
|
for range ticker.C {
|
||||||
|
updateKVConfig(consul, c) // FIXME: duplicated in internal/app/server/server.go
|
||||||
|
}
|
||||||
|
}(registry)
|
||||||
|
|
||||||
|
// db conn
|
||||||
|
dbConn, err := database.Connect(c.DbURL)
|
||||||
|
if err != nil {
|
||||||
|
logger.Log("Failed to connect to Database server: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
defer dbConn.Close()
|
||||||
|
|
||||||
|
// redis conn
|
||||||
|
redis := redis.NewClient(&redis.Options{
|
||||||
|
Addr: c.CacheAddr,
|
||||||
|
Password: c.CachePassword,
|
||||||
|
DB: 0,
|
||||||
|
})
|
||||||
|
defer redis.Close()
|
||||||
|
|
||||||
|
// eventbus conn
|
||||||
|
ebConn, ebCh, err := rabbitmq.Open(c.EventBusURL)
|
||||||
|
if err != nil {
|
||||||
|
logger.Log("Failed to connect to EventBus server: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
defer ebCh.Close()
|
||||||
|
defer rabbitmq.Close(ebConn)
|
||||||
|
|
||||||
|
err = rabbitmq.NewExchange(ebCh, c.EventBusExchange)
|
||||||
|
if err != nil {
|
||||||
|
logger.Log("Failed to declare EventBus exchange: %v\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println("Gone")
|
// create and bind queues
|
||||||
os.Exit(0)
|
_, err = ebCh.QueueDeclare(
|
||||||
|
c.EventBusQueue, // name
|
||||||
|
false, // durable
|
||||||
|
false, // delete when unused
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil, // arguments
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
logger.Log("Failed to declare EventBus queue: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productAddedToBasket")
|
||||||
|
rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productRemovedFromBasket")
|
||||||
|
rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "catalog.basket.updateQuantity")
|
||||||
|
|
||||||
|
// event consume
|
||||||
|
msgs, err := ebCh.Consume(
|
||||||
|
c.EventBusQueue, // queue
|
||||||
|
"", // consumer
|
||||||
|
false, // auto-ack
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-local
|
||||||
|
false, // no-wait
|
||||||
|
nil, // args
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
logger.Log("Failed to register a consumer: %s", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
forever := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
sigint := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
<-sigint
|
||||||
|
|
||||||
|
logger.Log("Worker %s stopped working...\n", c.GetAppFullName())
|
||||||
|
|
||||||
|
close(forever)
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
basketSrv := service.NewBasketService(dbConn, redis, ebCh, logger)
|
||||||
|
|
||||||
|
for d := range msgs {
|
||||||
|
go func(d amqp.Delivery) {
|
||||||
|
msg, err := rabbitmq.Deserialize(d.Body)
|
||||||
|
if err != nil {
|
||||||
|
logger.Log("deserialize error: %v\n", err)
|
||||||
|
d.Reject(false) // FIXME: or Nack? how to handle erros in queue...
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
eName := fmt.Sprintf("%s", msg["event"])
|
||||||
|
data := (msg["data"]).(map[string]interface{})
|
||||||
|
logger.Log("Message<%s>: %v\n", eName, data)
|
||||||
|
|
||||||
|
reqID := data["request_id"].(string) // FIXME Check input params!
|
||||||
|
basketID := data["basket_id"].(string) // FIXME Check input params!
|
||||||
|
|
||||||
|
switch true {
|
||||||
|
case strings.Contains(eName, event.EVENT_PRODUCT_ADDED_TO_BASKET):
|
||||||
|
productID := int(data["product_id"].(float64))
|
||||||
|
qty := int(data["quantity"].(float64))
|
||||||
|
|
||||||
|
basket, err := ui.AddProductToBasket(basketSrv, productID, qty, basketID, reqID)
|
||||||
|
if err == nil {
|
||||||
|
logger.Log("Product #%d added to basket #%s. ReqID: #%s", productID, basket.ID, reqID)
|
||||||
|
}
|
||||||
|
case strings.Contains(eName, event.EVENT_PRODUCT_REMOVED_FROM_BASKET):
|
||||||
|
productID := int(data["product_id"].(float64))
|
||||||
|
qty := int(data["quantity"].(float64))
|
||||||
|
|
||||||
|
basket, err := ui.RemoveProductFromBasket(basketSrv, productID, qty, basketID, reqID)
|
||||||
|
if err == nil {
|
||||||
|
logger.Log("Product #%d removed from basket #%s. ReqID: #%s", productID, basket.ID, reqID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
logger.Log("%s error: %s", eName, err.Error())
|
||||||
|
d.Reject(false) // FIXME: or Nack? how to handle erros in queue...
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Log("ACK: %s", eName)
|
||||||
|
d.Ack(false)
|
||||||
|
}(d)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
logger.Log("Waiting for messages...")
|
||||||
|
<-forever
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateKVConfig(s *consul.Service, oldCnf *common.Config) error { // FIXME: duplicated in internal/app/server/server.go
|
||||||
|
// data, _, err := s.KV().Get(oldCnf.KVNamespace, nil)
|
||||||
|
// if err != nil {
|
||||||
|
// return err
|
||||||
|
// }
|
||||||
|
|
||||||
|
// if data == nil {
|
||||||
|
// return errors.New("empty KV config data")
|
||||||
|
// }
|
||||||
|
|
||||||
|
// buf := bytes.NewBuffer(data.Value)
|
||||||
|
// decoder := json.NewDecoder(buf)
|
||||||
|
// if err := decoder.Decode(oldCnf); err != nil {
|
||||||
|
// return err
|
||||||
|
// }
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
100
src/go.mod
100
src/go.mod
@ -1,116 +1,62 @@
|
|||||||
module git.pbiernat.io/egommerce/basket-service
|
module git.pbiernat.dev/egommerce/basket-service
|
||||||
|
|
||||||
go 1.18
|
go 1.18
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.pbiernat.io/egommerce/api-entities v0.2.3
|
git.pbiernat.dev/egommerce/api-entities v0.0.26
|
||||||
git.pbiernat.io/egommerce/go-api-pkg v0.3.24
|
git.pbiernat.dev/egommerce/go-api-pkg v0.0.150
|
||||||
github.com/georgysavva/scany/v2 v2.0.0
|
github.com/georgysavva/scany/v2 v2.0.0
|
||||||
github.com/go-pg/migrations/v8 v8.1.0
|
github.com/go-pg/migrations/v8 v8.1.0
|
||||||
github.com/go-pg/pg/v10 v10.10.7
|
github.com/go-pg/pg/v10 v10.10.7
|
||||||
github.com/go-redis/redis/v8 v8.11.5
|
github.com/go-redis/redis/v8 v8.11.5
|
||||||
github.com/gofiber/fiber/v2 v2.52.5
|
github.com/gofiber/fiber/v2 v2.40.1
|
||||||
github.com/jackc/pgx/v5 v5.1.1
|
github.com/jackc/pgx/v5 v5.1.1
|
||||||
github.com/rabbitmq/amqp091-go v1.10.0
|
github.com/joho/godotenv v1.4.0
|
||||||
|
github.com/streadway/amqp v1.0.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/DataDog/datadog-go v3.2.0+incompatible // indirect
|
github.com/andybalholm/brotli v1.0.4 // indirect
|
||||||
github.com/andybalholm/brotli v1.0.5 // indirect
|
|
||||||
github.com/armon/go-metrics v0.4.1 // indirect
|
github.com/armon/go-metrics v0.4.1 // indirect
|
||||||
github.com/armon/go-radix v1.0.0 // indirect
|
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||||
github.com/aws/aws-sdk-go v1.42.34 // indirect
|
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
|
||||||
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
|
|
||||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
|
||||||
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible // indirect
|
|
||||||
github.com/circonus-labs/circonusllhist v0.1.3 // indirect
|
|
||||||
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 // indirect
|
|
||||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
|
||||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||||
github.com/envoyproxy/go-control-plane v0.11.0 // indirect
|
github.com/fatih/color v1.13.0 // indirect
|
||||||
github.com/envoyproxy/protoc-gen-validate v0.10.0 // indirect
|
|
||||||
github.com/fatih/color v1.14.1 // indirect
|
|
||||||
github.com/fluent/fluent-logger-golang v1.9.0 // indirect
|
github.com/fluent/fluent-logger-golang v1.9.0 // indirect
|
||||||
github.com/go-pg/zerochecker v0.2.0 // indirect
|
github.com/go-pg/zerochecker v0.2.0 // indirect
|
||||||
github.com/golang/protobuf v1.5.3 // indirect
|
github.com/hashicorp/consul/api v1.18.0 // indirect
|
||||||
github.com/google/btree v1.0.1 // indirect
|
|
||||||
github.com/google/uuid v1.5.0 // indirect
|
|
||||||
github.com/hashicorp/consul v1.16.0 // indirect
|
|
||||||
github.com/hashicorp/consul-net-rpc v0.0.0-20221205195236-156cfab66a69 // indirect
|
|
||||||
github.com/hashicorp/consul/api v1.22.0 // indirect
|
|
||||||
github.com/hashicorp/consul/envoyextensions v0.3.0 // indirect
|
|
||||||
github.com/hashicorp/consul/sdk v0.14.0 // indirect
|
|
||||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
|
||||||
github.com/hashicorp/go-bexpr v0.1.2 // indirect
|
|
||||||
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
|
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
|
||||||
github.com/hashicorp/go-hclog v1.5.0 // indirect
|
github.com/hashicorp/go-hclog v1.3.1 // indirect
|
||||||
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
|
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
|
||||||
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
|
||||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
|
||||||
github.com/hashicorp/go-retryablehttp v0.6.7 // indirect
|
|
||||||
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
|
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
|
||||||
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
|
|
||||||
github.com/hashicorp/go-syslog v1.0.0 // indirect
|
|
||||||
github.com/hashicorp/go-uuid v1.0.3 // indirect
|
|
||||||
github.com/hashicorp/go-version v1.2.1 // indirect
|
|
||||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
|
||||||
github.com/hashicorp/memberlist v0.5.0 // indirect
|
|
||||||
github.com/hashicorp/raft v1.5.0 // indirect
|
|
||||||
github.com/hashicorp/raft-autopilot v0.1.6 // indirect
|
|
||||||
github.com/hashicorp/serf v0.10.1 // indirect
|
github.com/hashicorp/serf v0.10.1 // indirect
|
||||||
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
|
|
||||||
github.com/jackc/pgio v1.0.0 // indirect
|
github.com/jackc/pgio v1.0.0 // indirect
|
||||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
|
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
|
||||||
github.com/jackc/pgtype v1.14.3 // indirect
|
github.com/jackc/pgtype v1.13.0 // indirect
|
||||||
github.com/jackc/puddle/v2 v2.1.2 // indirect
|
github.com/jackc/puddle/v2 v2.1.2 // indirect
|
||||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||||
github.com/joho/godotenv v1.5.1 // indirect
|
github.com/klauspost/compress v1.15.9 // indirect
|
||||||
github.com/klauspost/compress v1.17.0 // indirect
|
|
||||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
github.com/mattn/go-isatty v0.0.16 // indirect
|
||||||
github.com/mattn/go-runewidth v0.0.15 // indirect
|
github.com/mattn/go-runewidth v0.0.14 // indirect
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
|
||||||
github.com/miekg/dns v1.1.41 // indirect
|
|
||||||
github.com/mitchellh/copystructure v1.2.0 // indirect
|
|
||||||
github.com/mitchellh/go-homedir v1.1.0 // indirect
|
github.com/mitchellh/go-homedir v1.1.0 // indirect
|
||||||
github.com/mitchellh/go-testing-interface v1.14.0 // indirect
|
|
||||||
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452 // indirect
|
|
||||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||||
github.com/mitchellh/reflectwalk v1.0.2 // indirect
|
github.com/philhofer/fwd v1.1.1 // indirect
|
||||||
github.com/philhofer/fwd v1.1.2 // indirect
|
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
|
||||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
|
||||||
github.com/prometheus/client_golang v1.14.0 // indirect
|
|
||||||
github.com/prometheus/client_model v0.3.0 // indirect
|
|
||||||
github.com/prometheus/common v0.37.0 // indirect
|
|
||||||
github.com/prometheus/procfs v0.8.0 // indirect
|
|
||||||
github.com/rivo/uniseg v0.2.0 // indirect
|
github.com/rivo/uniseg v0.2.0 // indirect
|
||||||
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
|
github.com/tinylib/msgp v1.1.6 // indirect
|
||||||
github.com/stretchr/objx v0.5.0 // indirect
|
|
||||||
github.com/stretchr/testify v1.8.3 // indirect
|
|
||||||
github.com/tinylib/msgp v1.1.8 // indirect
|
|
||||||
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
|
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
|
||||||
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926 // indirect
|
|
||||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||||
github.com/valyala/fasthttp v1.51.0 // indirect
|
github.com/valyala/fasthttp v1.41.0 // indirect
|
||||||
github.com/valyala/tcplisten v1.0.0 // indirect
|
github.com/valyala/tcplisten v1.0.0 // indirect
|
||||||
github.com/vmihailenco/bufpool v0.1.11 // indirect
|
github.com/vmihailenco/bufpool v0.1.11 // indirect
|
||||||
github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect
|
github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect
|
||||||
github.com/vmihailenco/tagparser v0.1.2 // indirect
|
github.com/vmihailenco/tagparser v0.1.2 // indirect
|
||||||
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
||||||
go.uber.org/atomic v1.10.0 // indirect
|
go.uber.org/atomic v1.10.0 // indirect
|
||||||
golang.org/x/crypto v0.20.0 // indirect
|
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect
|
||||||
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
|
golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 // indirect
|
||||||
golang.org/x/net v0.21.0 // indirect
|
golang.org/x/sys v0.2.0 // indirect
|
||||||
golang.org/x/sync v0.2.0 // indirect
|
golang.org/x/text v0.3.8 // indirect
|
||||||
golang.org/x/sys v0.17.0 // indirect
|
|
||||||
golang.org/x/text v0.14.0 // indirect
|
|
||||||
golang.org/x/time v0.3.0 // indirect
|
|
||||||
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
|
|
||||||
google.golang.org/protobuf v1.30.0 // indirect
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
|
||||||
mellium.im/sasl v0.2.1 // indirect
|
mellium.im/sasl v0.2.1 // indirect
|
||||||
)
|
)
|
||||||
|
708
src/go.sum
708
src/go.sum
File diff suppressed because it is too large
Load Diff
@ -1,81 +0,0 @@
|
|||||||
package app
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"strconv"
|
|
||||||
"syscall"
|
|
||||||
)
|
|
||||||
|
|
||||||
type (
|
|
||||||
Doer interface {
|
|
||||||
Start() error
|
|
||||||
RegisterHandler(string, func() any)
|
|
||||||
OnShutdown()
|
|
||||||
}
|
|
||||||
Application interface {
|
|
||||||
Start(while chan struct{})
|
|
||||||
RegisterPlugin(PluginFn) error
|
|
||||||
Shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
App struct {
|
|
||||||
doer Doer
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewApp(d Doer) *App {
|
|
||||||
return &App{
|
|
||||||
doer: d,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *App) Start(while chan struct{}) error {
|
|
||||||
go func() {
|
|
||||||
sigint := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
<-sigint
|
|
||||||
|
|
||||||
a.Shutdown()
|
|
||||||
|
|
||||||
close(while)
|
|
||||||
}()
|
|
||||||
|
|
||||||
run := a.createRunFile("./app.run") // FIXME path...
|
|
||||||
defer a.removeRunFile(run)
|
|
||||||
|
|
||||||
err := a.doer.Start()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to start app. Reason: %v\n", err)
|
|
||||||
close(while)
|
|
||||||
}
|
|
||||||
<-while
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *App) RegisterPlugin(p Plugin) error {
|
|
||||||
a.doer.RegisterHandler(p.name, p.fn)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *App) Shutdown() {
|
|
||||||
a.doer.OnShutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *App) createRunFile(path string) *os.File {
|
|
||||||
run, err := os.Create(path)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to create run file. Reason: %v\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
run.WriteString(strconv.Itoa(os.Getpid()))
|
|
||||||
|
|
||||||
return run
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *App) removeRunFile(f *os.File) error {
|
|
||||||
return f.Close()
|
|
||||||
}
|
|
@ -1,17 +1,17 @@
|
|||||||
package event
|
package event
|
||||||
|
|
||||||
const (
|
const (
|
||||||
EVENT_PRODUCT_ADDED_TO_BASKET = "event.ProductAddedToBasket"
|
EVENT_PRODUCT_ADDED_TO_BASKET = "event.ProductAddedToBasketEvent"
|
||||||
EVENT_PRODUCT_REMOVED_FROM_BASKET = "event.ProductRemovedFromBasket"
|
EVENT_PRODUCT_REMOVED_FROM_BASKET = "event.ProductRemovedFromBasketEvent"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ProductAddedToBasket struct {
|
type ProductAddedToBasketEvent struct {
|
||||||
*Event
|
*Event
|
||||||
ProductID string `json:"product_id"`
|
ProductID string `json:"product_id"`
|
||||||
BasketID string `json:"basket_id"`
|
BasketID string `json:"basket_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ProductRemovedFromBasket struct {
|
type ProductRemovedFromBasketEvent struct {
|
||||||
*Event
|
*Event
|
||||||
ProductID string `json:"product_id"`
|
ProductID string `json:"product_id"`
|
||||||
BasketID string `json:"basket_id"`
|
BasketID string `json:"basket_id"`
|
@ -1,13 +1,11 @@
|
|||||||
package event
|
package event
|
||||||
|
|
||||||
type Event struct {
|
type Event struct {
|
||||||
Command string `json:"command"`
|
|
||||||
RequestID string `json:"request_id"`
|
RequestID string `json:"request_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEvent(command, reqID string) *Event {
|
func NewEvent(reqID string) *Event {
|
||||||
em := new(Event)
|
em := new(Event)
|
||||||
em.Command = command
|
|
||||||
em.RequestID = reqID
|
em.RequestID = reqID
|
||||||
|
|
||||||
return em
|
return em
|
@ -1,139 +0,0 @@
|
|||||||
package app
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"strconv"
|
|
||||||
|
|
||||||
redis "github.com/go-redis/redis/v8"
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
|
|
||||||
"git.pbiernat.io/egommerce/go-api-pkg/consul"
|
|
||||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
|
||||||
|
|
||||||
db "git.pbiernat.io/egommerce/basket-service/pkg/database"
|
|
||||||
)
|
|
||||||
|
|
||||||
type (
|
|
||||||
Plugin struct {
|
|
||||||
name string
|
|
||||||
fn PluginFn
|
|
||||||
}
|
|
||||||
PluginFn func() any
|
|
||||||
)
|
|
||||||
|
|
||||||
func CachePlugin(cArr map[string]string) Plugin {
|
|
||||||
return Plugin{
|
|
||||||
name: "cache",
|
|
||||||
fn: func() any {
|
|
||||||
return redis.NewClient(&redis.Options{
|
|
||||||
Addr: cArr["cacheAddr"],
|
|
||||||
Password: cArr["cachePassword"],
|
|
||||||
DB: 0,
|
|
||||||
})
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func DatabasePlugin(cArr map[string]string) Plugin {
|
|
||||||
return Plugin{
|
|
||||||
name: "database",
|
|
||||||
fn: func() any {
|
|
||||||
dbConn, err := db.Connect(cArr["dbURL"])
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", cArr["dbURL"], err)
|
|
||||||
os.Exit(1) // TODO: retry in background...
|
|
||||||
}
|
|
||||||
|
|
||||||
return dbConn
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func EventbusPlugin(cArr map[string]string) Plugin {
|
|
||||||
return Plugin{
|
|
||||||
name: "eventbus",
|
|
||||||
fn: func() any {
|
|
||||||
conn, err := amqp.Dial(cArr["eventBusURL"])
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to connect to the EventBus: %s. Err: %v\n", cArr["eventBusURL"], err)
|
|
||||||
os.Exit(1) // TODO: retry in background...
|
|
||||||
}
|
|
||||||
|
|
||||||
chn, err := conn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to open new EventBus channel. Err: %v\n", err)
|
|
||||||
os.Exit(1) // TODO: retry in background...
|
|
||||||
}
|
|
||||||
|
|
||||||
return chn
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func LoggerPlugin(cArr map[string]string) Plugin {
|
|
||||||
return Plugin{
|
|
||||||
name: "logger",
|
|
||||||
fn: func() any {
|
|
||||||
logHost, logPort, err := fluentd.ParseAddr(cArr["loggerAddr"])
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to parse FluentD address: %s. Err: %v", cArr["loggerAddr"], err)
|
|
||||||
os.Exit(1) // TODO: retry in background...
|
|
||||||
}
|
|
||||||
|
|
||||||
logger, err := fluentd.NewLogger(cArr["appFullname"], logHost, logPort)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to connect to the FluentD on %s:%d. Err: %v", logHost, logPort, err)
|
|
||||||
os.Exit(1) // TODO: retry in background...
|
|
||||||
}
|
|
||||||
|
|
||||||
return logger
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func RegistryPlugin(cArr map[string]string) Plugin {
|
|
||||||
return Plugin{
|
|
||||||
name: "registry",
|
|
||||||
fn: func() any {
|
|
||||||
port, _ := strconv.Atoi(cArr["netAddr"][1:]) // FIXME: can be IP:PORT or :PORT
|
|
||||||
// log.Printf("Consul retrieved port: %v", port)
|
|
||||||
registry, err := consul.NewService(cArr["registryAddr"], cArr["id"], cArr["name"], cArr["registryDomainOverIP"], cArr["ip"], cArr["domain"], cArr["pathPrefix"], port)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", cArr["registryAddr"], err)
|
|
||||||
os.Exit(1) // TODO: retry in background...
|
|
||||||
}
|
|
||||||
|
|
||||||
err = registry.Register()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to register in the Consul service. Err: %v", err)
|
|
||||||
os.Exit(1) // TODO: retry in background...
|
|
||||||
}
|
|
||||||
|
|
||||||
registry.RegisterHealthChecks()
|
|
||||||
// a.registerKVUpdater() // FIXME run as goroutine
|
|
||||||
|
|
||||||
return registry
|
|
||||||
|
|
||||||
// svc, _ := registry.Connect()
|
|
||||||
// tlsCnf := svc.ServerTLSConfig()
|
|
||||||
// s.Base.App.Server().TLSConfig = tlsCnf
|
|
||||||
// fmt.Println("Podmiana configa TLS")
|
|
||||||
// defer svc.Close()
|
|
||||||
|
|
||||||
// go func() { // Consul KV updater
|
|
||||||
// ticker := time.NewTicker(time.Second * 15)
|
|
||||||
// for range ticker.C {
|
|
||||||
// fetchKVConfig(s) // FIXME: duplicated in worker
|
|
||||||
// }
|
|
||||||
// }()
|
|
||||||
|
|
||||||
// go func() { // Server metadata cache updater
|
|
||||||
// ticker := time.NewTicker(time.Second * 5)
|
|
||||||
// for range ticker.C {
|
|
||||||
// s.cacheMetadata()
|
|
||||||
// }
|
|
||||||
// }()
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
@ -2,25 +2,27 @@ package server
|
|||||||
|
|
||||||
// REFACTOR: APP DEDICATED CODE
|
// REFACTOR: APP DEDICATED CODE
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.pbiernat.io/egommerce/api-entities/http"
|
"git.pbiernat.dev/egommerce/api-entities/http"
|
||||||
"git.pbiernat.io/egommerce/basket-service/internal/service"
|
"git.pbiernat.dev/egommerce/basket-service/internal/app/service"
|
||||||
"git.pbiernat.io/egommerce/basket-service/internal/ui"
|
"git.pbiernat.dev/egommerce/basket-service/internal/app/ui"
|
||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s *Server) GetBasketHandler(c *fiber.Ctx) error {
|
func (s *Server) GetBasketHandler(c *fiber.Ctx) error {
|
||||||
req := new(http.GetBasketRequest)
|
req := new(http.GetBasketRequest)
|
||||||
if err := c.BodyParser(req); err != nil {
|
if err := c.BodyParser(req); err != nil {
|
||||||
return s.Error(c, 400, err.Error())
|
return s.Base.Error400(c, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
basketID := req.BasketID
|
basketID := req.BasketID
|
||||||
basketSrv := service.NewBasketService(s.GetDatabase(), s.GetCache(), s.GetEventBus(), s.GetLogger())
|
basketSrv := service.NewBasketService(s.Database, s.Cache, s.Eventbus, s.Logger)
|
||||||
basket, err := basketSrv.FetchFromDB(c.Context(), basketID)
|
ctx := context.Background()
|
||||||
|
basket, err := basketSrv.FetchFromDB(ctx, basketID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return s.Error(c, 400, "Failed to retrieve basket")
|
return s.Base.Error400(c, "Failed to retrieve basket")
|
||||||
}
|
}
|
||||||
|
|
||||||
res := &http.GetBasketResponse{
|
res := &http.GetBasketResponse{
|
||||||
@ -37,10 +39,11 @@ func (s *Server) GetBasketHandler(c *fiber.Ctx) error {
|
|||||||
|
|
||||||
func (s *Server) GetBasketItemsHandler(c *fiber.Ctx) error {
|
func (s *Server) GetBasketItemsHandler(c *fiber.Ctx) error {
|
||||||
basketID := c.Params("basketId", "")
|
basketID := c.Params("basketId", "")
|
||||||
basketSrv := service.NewBasketService(s.GetDatabase(), s.GetCache(), s.GetEventBus(), s.GetLogger())
|
basketSrv := service.NewBasketService(s.Database, s.Cache, s.Eventbus, s.Logger)
|
||||||
items, err := basketSrv.FetchItems(c.Context(), basketID)
|
ctx := context.Background()
|
||||||
|
items, err := basketSrv.FetchItems(ctx, basketID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return s.Error(c, 400, "Failed to retrieve basket")
|
return s.Base.Error400(c, "Failed to retrieve basket items")
|
||||||
}
|
}
|
||||||
|
|
||||||
var res []*http.GetBasketItemsResponse
|
var res []*http.GetBasketItemsResponse
|
||||||
@ -63,17 +66,17 @@ func (s *Server) GetBasketItemsHandler(c *fiber.Ctx) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) CheckoutHandler(c *fiber.Ctx) error {
|
func (s *Server) CheckoutHandler(c *fiber.Ctx) error {
|
||||||
reqID, _ := s.GetRequestID(c)
|
reqID, _ := s.Base.GetRequestID(c)
|
||||||
req := new(http.BasketCheckoutRequest)
|
req := new(http.BasketCheckoutRequest)
|
||||||
if err := c.BodyParser(req); err != nil {
|
if err := c.BodyParser(req); err != nil {
|
||||||
return s.Error(c, 400, err.Error())
|
return s.Base.Error400(c, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
basketID := req.BasketID
|
basketID := req.BasketID
|
||||||
basketSrv := service.NewBasketService(s.GetDatabase(), s.GetCache(), s.GetEventBus(), s.GetLogger())
|
basketSrv := service.NewBasketService(s.Database, s.Cache, s.Eventbus, s.Logger)
|
||||||
res, err := ui.CheckoutBasket(basketSrv, basketID, reqID)
|
res, err := ui.CheckoutBasket(basketSrv, basketID, reqID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return s.Error(c, 400, "Failed to create order")
|
return s.Base.Error400(c, "Failed to create order")
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.JSON(res)
|
return c.JSON(res)
|
18
src/internal/app/server/health_handler.go
Normal file
18
src/internal/app/server/health_handler.go
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
// REFACTOR: UNIVERSAL SERVER CODE
|
||||||
|
import (
|
||||||
|
"github.com/gofiber/fiber/v2"
|
||||||
|
|
||||||
|
"git.pbiernat.dev/egommerce/api-entities/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *Server) HealthHandler(c *fiber.Ctx) error { // TODO add necessary logic
|
||||||
|
return c.JSON(&http.HealthResponse{
|
||||||
|
Status: "OKa",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) ConfigHandler(c *fiber.Ctx) error {
|
||||||
|
return c.JSON(s.Config)
|
||||||
|
}
|
@ -1,24 +1,27 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
|
|
||||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||||
)
|
)
|
||||||
|
|
||||||
// "github.com/gofiber/fiber/v2"
|
// "github.com/gofiber/fiber/v2"
|
||||||
// "github.com/gofiber/fiber/v2/middleware/cors"
|
// "github.com/gofiber/fiber/v2/middleware/cors"
|
||||||
|
|
||||||
func SetupMiddleware(s *Server) {
|
func SetupMiddleware(s *Server) {
|
||||||
s.Use(LoggingMiddleware(s.GetLogger()))
|
s.Base.Use(defaultCORS)
|
||||||
|
s.Base.Use(LoggingMiddleware(s.Logger))
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error {
|
func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error {
|
||||||
return func(c *fiber.Ctx) error {
|
return func(c *fiber.Ctx) error {
|
||||||
// path := string(c.Request().URI().Path())
|
path := string(c.Request().URI().Path())
|
||||||
// if strings.Contains(path, "/health") {
|
if strings.Contains(path, "/health") {
|
||||||
// return c.Next()
|
return c.Next()
|
||||||
// }
|
}
|
||||||
|
|
||||||
log.Log("Request: %s, remote: %s, via: %s",
|
log.Log("Request: %s, remote: %s, via: %s",
|
||||||
c.Request().URI().String(),
|
c.Request().URI().String(),
|
40
src/internal/app/server/router.go
Normal file
40
src/internal/app/server/router.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
// REFACTOR: APP DEDICATED CODE
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/gofiber/fiber/v2"
|
||||||
|
"github.com/gofiber/fiber/v2/middleware/cors"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
defaultCORS = cors.New(cors.Config{
|
||||||
|
AllowOrigins: "*",
|
||||||
|
AllowCredentials: true,
|
||||||
|
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
|
||||||
|
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
func SetupRouter(s *Server) {
|
||||||
|
s.Base.Options("*", defaultCORS)
|
||||||
|
|
||||||
|
s.Base.Get("/health", s.HealthHandler)
|
||||||
|
s.Base.Get("/config", s.ConfigHandler)
|
||||||
|
|
||||||
|
api := s.Base.Group("/api")
|
||||||
|
v1 := api.Group("/v1")
|
||||||
|
v1.Get("/basket", s.GetBasketHandler)
|
||||||
|
v1.Get("/basket/:basketId/items", s.GetBasketItemsHandler)
|
||||||
|
v1.Post("/checkout", s.CheckoutHandler)
|
||||||
|
}
|
||||||
|
|
||||||
|
func CORSPreflightMiddleware(c *fiber.Ctx) error {
|
||||||
|
if string(c.Request().Header.Method()) == http.MethodOptions {
|
||||||
|
c.Response().SetStatusCode(http.StatusOK)
|
||||||
|
c.Next()
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.Next()
|
||||||
|
}
|
261
src/internal/app/server/server.go
Normal file
261
src/internal/app/server/server.go
Normal file
@ -0,0 +1,261 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/v8"
|
||||||
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/consul"
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||||
|
|
||||||
|
db "git.pbiernat.dev/egommerce/basket-service/pkg/database"
|
||||||
|
srv "git.pbiernat.dev/egommerce/basket-service/pkg/server"
|
||||||
|
|
||||||
|
"git.pbiernat.dev/egommerce/basket-service/internal/common"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
Server struct {
|
||||||
|
Base *srv.Server
|
||||||
|
Config *common.Config
|
||||||
|
|
||||||
|
Cache *redis.Client
|
||||||
|
Database *pgxpool.Pool
|
||||||
|
Eventbus *amqp.Channel
|
||||||
|
Logger *fluentd.Logger
|
||||||
|
Registry *consul.Service
|
||||||
|
|
||||||
|
kvNmspc string
|
||||||
|
}
|
||||||
|
|
||||||
|
OptionFn func(*Server) error
|
||||||
|
)
|
||||||
|
|
||||||
|
func New(c *common.Config, opts ...OptionFn) *Server {
|
||||||
|
svr := &Server{
|
||||||
|
Base: srv.New(c.Base),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, opt := range opts {
|
||||||
|
if err := opt(svr); err != nil {
|
||||||
|
log.Fatalf("Failed to start HTTP Server. Err: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SetupMiddleware(svr)
|
||||||
|
SetupRouter(svr)
|
||||||
|
|
||||||
|
return svr
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithCache(c *common.Config) OptionFn {
|
||||||
|
redis := redis.NewClient(&redis.Options{
|
||||||
|
Addr: c.CacheAddr,
|
||||||
|
Password: c.CachePassword,
|
||||||
|
DB: 0,
|
||||||
|
})
|
||||||
|
|
||||||
|
return func(s *Server) error {
|
||||||
|
s.Cache = redis
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// defer redis.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithDatabase(c *common.Config) OptionFn {
|
||||||
|
dbConn, err := db.Connect(c.DbURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", c.DbURL, err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(s *Server) error {
|
||||||
|
s.Database = dbConn
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// defer dbConn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithEventbus(c *common.Config) OptionFn {
|
||||||
|
conn, err := amqp.Dial(c.EventBusURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to the Eventbus: %s. Err: %v\n", c.EventBusURL, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
chn, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to open new Eventbus channel. Err: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(s *Server) error {
|
||||||
|
s.Eventbus = chn
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// defer ebCh.Close()
|
||||||
|
// defer amqp.Close(ebConn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithLogger(c *common.Config) OptionFn {
|
||||||
|
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger, err := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(s *Server) error {
|
||||||
|
s.Logger = logger
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// defer logger.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithRegistry(c *common.Config) OptionFn {
|
||||||
|
// fmt.Printf("WithRegistry constructor: config: %v", c.Base)
|
||||||
|
port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which now will cause error
|
||||||
|
registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, c.Base.AppName, c.Base.PathPrefix, port)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = registry.Register()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to register in the Consul service. Err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(s *Server) error {
|
||||||
|
s.Registry = registry
|
||||||
|
|
||||||
|
go func(s *Server) { // Consul KV updater
|
||||||
|
ticker := time.NewTicker(time.Second * 15)
|
||||||
|
for range ticker.C {
|
||||||
|
s.updateKVConfig()
|
||||||
|
}
|
||||||
|
}(s)
|
||||||
|
|
||||||
|
go func(s *Server) { // Server metadata cache updater
|
||||||
|
ticker := time.NewTicker(time.Second * 5)
|
||||||
|
for range ticker.C {
|
||||||
|
s.cacheMetadata()
|
||||||
|
}
|
||||||
|
}(s)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// REFACTOR IN PROGRESS
|
||||||
|
// func (s *Server) Shutdown() error {
|
||||||
|
// s.Logger.Log("Server is going down... Unregistering service: %s", s.Discovery.GetID())
|
||||||
|
// s.Discovery.Unregister()
|
||||||
|
// s.clearMetadataCache()
|
||||||
|
|
||||||
|
// s.Cache.Close()
|
||||||
|
// s.Database.Close()
|
||||||
|
// s.Eventbus.Close()
|
||||||
|
// s.Logger.Close()
|
||||||
|
|
||||||
|
// return nil
|
||||||
|
// }
|
||||||
|
|
||||||
|
func (s *Server) Shutdown() srv.PurgeFn {
|
||||||
|
return func(srv *srv.Server) error {
|
||||||
|
fmt.Printf("%v", s.Base)
|
||||||
|
// s.Logger.Log("Server is going down... Unregistering service: %s", s.Base.AppID)
|
||||||
|
s.Logger.Log("Server is going down... Unregistering service...")
|
||||||
|
|
||||||
|
s.Registry.Unregister()
|
||||||
|
s.clearMetadataCache()
|
||||||
|
s.Eventbus.Close()
|
||||||
|
s.Database.Close()
|
||||||
|
s.Logger.Close()
|
||||||
|
|
||||||
|
return s.Base.Shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// END: REFACTOR IN PROGRESS
|
||||||
|
|
||||||
|
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
|
||||||
|
func (s *Server) updateKVConfig() {
|
||||||
|
config, _, err := s.Registry.KV().Get(s.kvNmspc, nil)
|
||||||
|
if err != nil || config == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
kvCnf := bytes.NewBuffer(config.Value)
|
||||||
|
decoder := json.NewDecoder(kvCnf)
|
||||||
|
if err := decoder.Decode(&s.Config); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) cacheMetadata() {
|
||||||
|
ctx := context.TODO()
|
||||||
|
key, address := s.getMetadataIPsKey(), s.Base.Config.AppID
|
||||||
|
|
||||||
|
pos := s.Cache.LPos(ctx, key, address, redis.LPosArgs{}).Val()
|
||||||
|
if pos >= 0 {
|
||||||
|
s.Cache.LRem(ctx, key, 0, address)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.Cache.LPush(ctx, key, address).Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) clearMetadataCache() {
|
||||||
|
ctx := context.TODO()
|
||||||
|
fmt.Printf("metadata: %v", s.Config.Base)
|
||||||
|
key, address := s.getMetadataIPsKey(), s.Config.Base.AppID
|
||||||
|
|
||||||
|
s.Cache.LRem(ctx, key, 0, address)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) getMetadataIPsKey() string {
|
||||||
|
return "internal__" + s.Base.Config.AppName + "__ips"
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
//
|
||||||
|
//
|
||||||
|
//// OLD CODE TO BE REMOVED
|
||||||
|
// func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, cache *redis.Client, ebCh *amqp.Channel) *Server {
|
||||||
|
|
||||||
|
// cnf := fiber.Config{
|
||||||
|
// AppName: conf.AppName,
|
||||||
|
// ServerHeader: conf.AppName,
|
||||||
|
// ReadTimeout: conf.ReadTimeout * time.Millisecond,
|
||||||
|
// WriteTimeout: conf.WriteTimeout * time.Millisecond,
|
||||||
|
// IdleTimeout: conf.IdleTimeout * time.Millisecond,
|
||||||
|
// }
|
||||||
|
// s := &Server{
|
||||||
|
// fiber.New(cnf),
|
||||||
|
// conf,
|
||||||
|
// logger,
|
||||||
|
// db,
|
||||||
|
// cache,
|
||||||
|
// ebCh,
|
||||||
|
// consul,
|
||||||
|
// conf.AppName,
|
||||||
|
// conf.NetAddr,
|
||||||
|
// conf.KVNamespace,
|
||||||
|
// }
|
||||||
|
|
||||||
|
// return s
|
||||||
|
// }
|
@ -2,23 +2,20 @@ package service
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.pbiernat.io/egommerce/api-entities/model"
|
|
||||||
"git.pbiernat.io/egommerce/basket-service/internal/event"
|
|
||||||
|
|
||||||
"git.pbiernat.io/egommerce/go-api-pkg/api"
|
|
||||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
|
||||||
"git.pbiernat.io/egommerce/go-api-pkg/rabbitmq"
|
|
||||||
|
|
||||||
|
"git.pbiernat.dev/egommerce/api-entities/model"
|
||||||
|
"git.pbiernat.dev/egommerce/basket-service/internal/app/event"
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/api"
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||||
|
"git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
|
||||||
"github.com/georgysavva/scany/v2/pgxscan"
|
"github.com/georgysavva/scany/v2/pgxscan"
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
"github.com/streadway/amqp"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ServiceUserAgent = "basket-api/internal"
|
ServiceUserAgent = "basket-httpclient"
|
||||||
)
|
)
|
||||||
|
|
||||||
type BasketService struct {
|
type BasketService struct {
|
||||||
@ -84,7 +81,7 @@ func (s *BasketService) FetchItem(ctx context.Context, basketID string, productI
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *BasketService) AddItem(ctx context.Context, itemID int, basketID string, qty int) error {
|
func (s *BasketService) AddItem(ctx context.Context, itemID int, basketID string, qty int) error {
|
||||||
var price int = 0
|
var price float64 = 0
|
||||||
pricingAPI := api.NewPricingAPI(ServiceUserAgent, s.redis)
|
pricingAPI := api.NewPricingAPI(ServiceUserAgent, s.redis)
|
||||||
|
|
||||||
productPrice, err := pricingAPI.GetProductPrice(itemID)
|
productPrice, err := pricingAPI.GetProductPrice(itemID)
|
||||||
@ -107,35 +104,35 @@ func (s *BasketService) RemoveItem(ctx context.Context, itemID int, basketID str
|
|||||||
if _, err := s.dbConn.Exec(ctx, sql, basketID, itemID); err != nil {
|
if _, err := s.dbConn.Exec(ctx, sql, basketID, itemID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// update basket updated_at field...
|
// update basket updated_at field...
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BasketService) UpdateItem(ctx context.Context, item *model.BasketItemModel, qty int) error {
|
func (s *BasketService) UpdateItem(ctx context.Context, item *model.BasketItemModel, qty int) error {
|
||||||
var price int = 0
|
var price float64 = 0
|
||||||
pricingAPI := api.NewPricingAPI(ServiceUserAgent, s.redis) // FIXME
|
pricingAPI := api.NewPricingAPI(ServiceUserAgent, s.redis)
|
||||||
|
|
||||||
productPrice, err := pricingAPI.GetProductPrice(item.ProductID)
|
productPrice, err := pricingAPI.GetProductPrice(item.ProductID)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
price = productPrice.Price
|
price = productPrice.Price
|
||||||
}
|
}
|
||||||
|
|
||||||
sql := `UPDATE basket.basket_item SET quantity=$1, price=$2, updated_at=$3 WHERE basket_id=$4 AND product_id=$5`
|
sql := `UPDATE basket.basket_item SET quantity=$1, price=$2 WHERE basket_id=$3 AND product_id=$4`
|
||||||
if _, err := s.dbConn.Exec(ctx, sql, qty, price, time.Now(), item.BasketID, item.ProductID); err != nil {
|
if _, err := s.dbConn.Exec(ctx, sql, qty, price, item.BasketID, item.ProductID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update basket updated_at field...
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BasketService) CheckoutBasket(reqID, basketID string) (string, error) {
|
func (s *BasketService) Checkout(reqID, basketID string) (string, error) {
|
||||||
s.log.Log("Checkout basket#:%s", basketID)
|
s.log.Log("Creating initial order from basket#:%s", basketID)
|
||||||
|
|
||||||
msg := &event.BasketCheckoutEvent{
|
msg := &event.BasketCheckoutEvent{Event: event.NewEvent(reqID), BasketID: basketID} // FIXME: send more info...
|
||||||
Event: event.NewEvent("CheckoutBasket", reqID),
|
|
||||||
BasketID: basketID,
|
|
||||||
} // FIXME: send more info...
|
|
||||||
rabbitmq.Publish(s.ebCh, "api-events", "basket.order.basketCheckout", msg)
|
rabbitmq.Publish(s.ebCh, "api-events", "basket.order.basketCheckout", msg)
|
||||||
|
|
||||||
return basketID, nil
|
return basketID, nil
|
@ -2,16 +2,13 @@ package ui
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
|
|
||||||
entity "git.pbiernat.io/egommerce/api-entities/http"
|
def "git.pbiernat.dev/egommerce/api-entities/http"
|
||||||
"git.pbiernat.io/egommerce/api-entities/model"
|
"git.pbiernat.dev/egommerce/api-entities/model"
|
||||||
|
"git.pbiernat.dev/egommerce/basket-service/internal/app/service"
|
||||||
"git.pbiernat.io/egommerce/basket-service/internal/service"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func AddProductToBasket(srv *service.BasketService, productID, qty int, basketID, reqID string) (*model.BasketModel, error) {
|
func AddProductToBasket(srv *service.BasketService, productID, qty int, basketID, reqID string) (*model.BasketModel, error) {
|
||||||
// FIXME: error occurs when 0 is passed
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
basket, err := srv.FetchFromDB(ctx, basketID)
|
basket, err := srv.FetchFromDB(ctx, basketID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -35,7 +32,8 @@ func AddProductToBasket(srv *service.BasketService, productID, qty int, basketID
|
|||||||
return basket, nil
|
return basket, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
err = srv.UpdateItem(ctx, item, item.Quantity+qty)
|
qty = item.Quantity + qty
|
||||||
|
err = srv.UpdateItem(ctx, item, qty)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
srv.Log("UpdateItem error: %v", err)
|
srv.Log("UpdateItem error: %v", err)
|
||||||
}
|
}
|
||||||
@ -54,16 +52,15 @@ func RemoveProductFromBasket(srv *service.BasketService, productID, qty int, bas
|
|||||||
|
|
||||||
item, err := srv.FetchItem(ctx, basket.ID, productID)
|
item, err := srv.FetchItem(ctx, basket.ID, productID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("FetchItem not found item in db: %v\n", err)
|
|
||||||
ctx.Done() // FIXME
|
ctx.Done() // FIXME
|
||||||
return basket, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if item.Quantity <= qty {
|
if item.Quantity <= qty {
|
||||||
err = srv.RemoveItem(ctx, item.ProductID, item.BasketID)
|
err = srv.RemoveItem(ctx, item.ProductID, item.BasketID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ctx.Done() // FIXME
|
ctx.Done() // FIXME
|
||||||
return basket, err
|
return nil, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
qty = item.Quantity - qty
|
qty = item.Quantity - qty
|
||||||
@ -77,14 +74,15 @@ func RemoveProductFromBasket(srv *service.BasketService, productID, qty int, bas
|
|||||||
return basket, nil
|
return basket, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CheckoutBasket(srv *service.BasketService, basketID, reqID string) (*entity.BasketCheckoutResponse, error) {
|
func CheckoutBasket(srv *service.BasketService, basketID, reqID string) (*def.BasketCheckoutResponse, error) {
|
||||||
res := &entity.BasketCheckoutResponse{}
|
// ctx := context.Background()
|
||||||
basketID, err := srv.CheckoutBasket(reqID, basketID)
|
res := &def.BasketCheckoutResponse{}
|
||||||
|
basketID, err := srv.Checkout(reqID, basketID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return res, err
|
return res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
res.ID = basketID
|
res.ID = basketID
|
||||||
|
// ctx.Done() // FIXME
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
84
src/internal/common/config.go
Normal file
84
src/internal/common/config.go
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
cnf "git.pbiernat.dev/egommerce/basket-service/pkg/config"
|
||||||
|
srv "git.pbiernat.dev/egommerce/basket-service/pkg/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// defAppDomain = "basket-svc"
|
||||||
|
defAppName = "basket-svc"
|
||||||
|
defCacheAddr = "api-cache:6379"
|
||||||
|
defCachePassword = "12345678"
|
||||||
|
defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce"
|
||||||
|
defEventBusURL = "amqp://guest:guest@api-eventbus:56721"
|
||||||
|
defKVNmspc = "dev.egommerce/service/basket-svc"
|
||||||
|
defLoggerAddr = "api-logger:24224"
|
||||||
|
defNetAddr = ":80"
|
||||||
|
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
||||||
|
defPathPrefix = "/basket"
|
||||||
|
defRegistryAddr = "api-registry:8500"
|
||||||
|
defEbEventsExchange = "api-events"
|
||||||
|
defEbEventsQueue = "basket-svc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
Base *srv.Config
|
||||||
|
// AppID string
|
||||||
|
// AppName string
|
||||||
|
// NetAddr string
|
||||||
|
// PathPrefix string
|
||||||
|
|
||||||
|
// IdleTimeout time.Duration `json:"idle_timeout"` // miliseconds
|
||||||
|
// ReadTimeout time.Duration `json:"read_timeout"` // miliseconds
|
||||||
|
// WriteTimeout time.Duration `json:"write_timeout"` // miliseconds
|
||||||
|
|
||||||
|
DbURL string `json:"db_url"`
|
||||||
|
CacheAddr string `json:"cache_addr"`
|
||||||
|
CachePassword string `json:"cache_password"`
|
||||||
|
EventBusExchange string `json:"eventbus_exchange"`
|
||||||
|
EventBusQueue string `json:"eventbus_queue"`
|
||||||
|
EventBusURL string `json:"eventbus_url"`
|
||||||
|
LoggerAddr string `json:"logger_addr"`
|
||||||
|
RegistryAddr string
|
||||||
|
|
||||||
|
// Fields with JSON mappings are available through Consul KV storage
|
||||||
|
|
||||||
|
// Port int
|
||||||
|
// KVNamespace string
|
||||||
|
// MongoDbUrl string `json:"mongodb_url"`
|
||||||
|
// HttpReadTimeout int `json:"http_read_timeout"`
|
||||||
|
// HttpWriteTimeout int `json:"http_write_timeout"`
|
||||||
|
// HttpIdleTimeout int `json:"http_idle_timeout"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConfig() *Config {
|
||||||
|
c := new(Config)
|
||||||
|
c.Base = new(srv.Config)
|
||||||
|
|
||||||
|
c.Base.AppID, _ = os.Hostname()
|
||||||
|
c.Base.AppName = cnf.GetEnv("APP_NAME", defAppName)
|
||||||
|
c.Base.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr)
|
||||||
|
c.Base.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix)
|
||||||
|
|
||||||
|
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
|
||||||
|
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
|
||||||
|
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
|
||||||
|
c.EventBusExchange = defEbEventsExchange
|
||||||
|
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
|
||||||
|
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
||||||
|
c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr)
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Config) GetAppFullName() string {
|
||||||
|
return fmt.Sprintf("%s_%s", c.Base.AppName, c.Base.AppID) // @TODO check if Base prop can be private
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Config) GetListenAddr() string {
|
||||||
|
return "" // @TODO: Implement me!
|
||||||
|
}
|
@ -1,111 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
defName = "basket-svc"
|
|
||||||
defDomain = "basket-svc"
|
|
||||||
defCacheAddr = "egommerce.local:6379"
|
|
||||||
defCachePassword = "12345678"
|
|
||||||
defDbURL = "postgres://postgres:12345678@db-postgres:5432/egommerce"
|
|
||||||
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
|
|
||||||
defKVNmspc = "dev.egommerce/service/basket"
|
|
||||||
defLoggerAddr = "api-logger:24224"
|
|
||||||
defNetAddr = ":443"
|
|
||||||
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
|
||||||
defPathPrefix = "/basket"
|
|
||||||
defRegistryAddr = "api-registry:8501"
|
|
||||||
defEbEventsExchange = "api-events"
|
|
||||||
defEbEventsQueue = "basket-svc"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Config struct {
|
|
||||||
ID string
|
|
||||||
Name string
|
|
||||||
Domain string
|
|
||||||
NetAddr string
|
|
||||||
RegistryDomainOverIP string
|
|
||||||
PathPrefix string
|
|
||||||
|
|
||||||
IdleTimeout time.Duration // miliseconds
|
|
||||||
ReadTimeout time.Duration // miliseconds
|
|
||||||
WriteTimeout time.Duration // miliseconds
|
|
||||||
|
|
||||||
LoggerAddr string `json:"logger_addr"`
|
|
||||||
DbURL string `json:"db_url"`
|
|
||||||
CacheAddr string `json:"cache_addr"`
|
|
||||||
CachePassword string `json:"cache_password"`
|
|
||||||
MongoDbUrl string `json:"mongodb_url"`
|
|
||||||
EventBusURL string `json:"eventbus_url"`
|
|
||||||
EventBusExchange string `json:"eventbus_exchange"`
|
|
||||||
EventBusQueue string `json:"eventbus_queue"`
|
|
||||||
KVNamespace string
|
|
||||||
RegistryAddr string
|
|
||||||
|
|
||||||
// Fields with JSON mappings are available through Consul KV storage
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewConfig(name string) *Config {
|
|
||||||
c := new(Config)
|
|
||||||
|
|
||||||
c.ID, _ = os.Hostname()
|
|
||||||
c.Name = name
|
|
||||||
c.Domain = cnf.GetEnv("APP_DOMAIN", defDomain)
|
|
||||||
c.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr)
|
|
||||||
c.RegistryDomainOverIP = cnf.GetEnv("REGISTRY_USE_DOMAIN_OVER_IP", "false")
|
|
||||||
c.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix)
|
|
||||||
|
|
||||||
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
|
|
||||||
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
|
|
||||||
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
|
|
||||||
c.EventBusExchange = defEbEventsExchange
|
|
||||||
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
|
|
||||||
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
|
||||||
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
|
||||||
c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr)
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Config) GetAppFullName() string {
|
|
||||||
return fmt.Sprintf("%s_%s", c.Name, c.ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Config) GetIP() string {
|
|
||||||
host, _ := os.Hostname()
|
|
||||||
ips, _ := net.LookupIP(host)
|
|
||||||
// for _, ip := range ips {
|
|
||||||
// return ip.String()
|
|
||||||
// }
|
|
||||||
|
|
||||||
return ips[0].String()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Config) GetArray() map[string]string { // FIXME fix types etc
|
|
||||||
arr := make(map[string]string)
|
|
||||||
arr["id"] = c.ID
|
|
||||||
arr["name"] = c.Name
|
|
||||||
arr["appFullname"] = c.GetAppFullName()
|
|
||||||
arr["domain"] = c.Domain
|
|
||||||
arr["ip"] = c.GetIP()
|
|
||||||
arr["netAddr"] = c.NetAddr
|
|
||||||
arr["registryDomainOverIP"] = c.RegistryDomainOverIP
|
|
||||||
arr["pathPrefix"] = c.PathPrefix
|
|
||||||
arr["cacheAddr"] = c.CacheAddr
|
|
||||||
arr["cachePassword"] = c.CachePassword
|
|
||||||
arr["dbURL"] = c.DbURL
|
|
||||||
arr["eventBusExchange"] = c.EventBusExchange
|
|
||||||
arr["eventBusURL"] = c.EventBusURL
|
|
||||||
arr["kvNamespace"] = c.KVNamespace
|
|
||||||
arr["loggerAddr"] = c.LoggerAddr
|
|
||||||
arr["registryAddr"] = c.RegistryAddr
|
|
||||||
|
|
||||||
return arr
|
|
||||||
}
|
|
@ -1,10 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
// REFACTOR: UNIVERSAL SERVER CODE
|
|
||||||
import (
|
|
||||||
"github.com/gofiber/fiber/v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (s *Server) ConfigHandler(c *fiber.Ctx) error {
|
|
||||||
return c.JSON(s.Config)
|
|
||||||
}
|
|
@ -1,13 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
// REFACTOR: UNIVERSAL SERVER CODE
|
|
||||||
import (
|
|
||||||
def "git.pbiernat.io/egommerce/api-entities/http"
|
|
||||||
"github.com/gofiber/fiber/v2"
|
|
||||||
)
|
|
||||||
|
|
||||||
func (s *Server) HealthHandler(c *fiber.Ctx) error {
|
|
||||||
return c.JSON(&def.HealthResponse{
|
|
||||||
Status: "OK",
|
|
||||||
})
|
|
||||||
}
|
|
@ -1,29 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
// REFACTOR: APP DEDICATED CODE
|
|
||||||
import (
|
|
||||||
"github.com/gofiber/fiber/v2/middleware/cors"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
defaultCORS = cors.New(cors.Config{
|
|
||||||
AllowOrigins: "*",
|
|
||||||
// AllowCredentials: true,
|
|
||||||
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
|
|
||||||
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
|
|
||||||
})
|
|
||||||
)
|
|
||||||
|
|
||||||
func SetupRouter(s *Server) {
|
|
||||||
s.Options("*", defaultCORS)
|
|
||||||
s.Use(defaultCORS)
|
|
||||||
|
|
||||||
s.Get("/health", s.HealthHandler)
|
|
||||||
s.Get("/config", s.ConfigHandler)
|
|
||||||
|
|
||||||
api := s.Group("/api")
|
|
||||||
v1 := api.Group("/v1")
|
|
||||||
v1.Get("/basket", s.GetBasketHandler)
|
|
||||||
v1.Get("/basket/:basketId/items", s.GetBasketItemsHandler)
|
|
||||||
v1.Post("/checkout", s.CheckoutHandler)
|
|
||||||
}
|
|
@ -1,144 +0,0 @@
|
|||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"crypto/tls"
|
|
||||||
"log"
|
|
||||||
"net"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/go-redis/redis/v8"
|
|
||||||
"github.com/gofiber/fiber/v2"
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
|
|
||||||
"git.pbiernat.io/egommerce/api-entities/http"
|
|
||||||
"git.pbiernat.io/egommerce/go-api-pkg/consul"
|
|
||||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
|
||||||
)
|
|
||||||
|
|
||||||
type (
|
|
||||||
Server struct {
|
|
||||||
*fiber.App
|
|
||||||
|
|
||||||
ID string
|
|
||||||
addr string // e.g. "127.0.0.1:443"
|
|
||||||
handlers map[string]any
|
|
||||||
}
|
|
||||||
HeaderRequestID struct {
|
|
||||||
RequestID string `reqHeader:"x-request-id"`
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
func New(c *Config) *Server {
|
|
||||||
return &Server{
|
|
||||||
ID: c.ID,
|
|
||||||
App: fiber.New(fiber.Config{
|
|
||||||
AppName: c.ID,
|
|
||||||
ServerHeader: c.Name + ":" + c.ID,
|
|
||||||
ReadTimeout: c.ReadTimeout * time.Millisecond,
|
|
||||||
WriteTimeout: c.WriteTimeout * time.Millisecond,
|
|
||||||
IdleTimeout: c.IdleTimeout * time.Millisecond,
|
|
||||||
}),
|
|
||||||
addr: c.NetAddr,
|
|
||||||
handlers: make(map[string]any),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) Start() error {
|
|
||||||
SetupMiddleware(s)
|
|
||||||
SetupRouter(s)
|
|
||||||
|
|
||||||
// fmt.Printf("Starting server at: %s...\n", s.addr)
|
|
||||||
cer, err := tls.LoadX509KeyPair("certs/client.crt", "certs/client.key")
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
tlsCnf := &tls.Config{Certificates: []tls.Certificate{cer}}
|
|
||||||
|
|
||||||
ln, _ := net.Listen("tcp", s.addr)
|
|
||||||
ln = tls.NewListener(ln, tlsCnf)
|
|
||||||
|
|
||||||
return s.Listener(ln)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) RegisterHandler(name string, fn func() any) {
|
|
||||||
// fmt.Printf("Registering plugin( with handler): %s... OK\n", name)
|
|
||||||
s.handlers[name] = fn()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) OnShutdown() {
|
|
||||||
// s.GetLogger().Log("Server %s is going down...", s.ID)
|
|
||||||
|
|
||||||
// s.GetRegistry().Unregister()
|
|
||||||
// a.clearMetadataCache()
|
|
||||||
s.GetEventBus().Close()
|
|
||||||
s.GetDatabase().Close()
|
|
||||||
s.GetLogger().Log("Gone.")
|
|
||||||
s.GetLogger().Close()
|
|
||||||
|
|
||||||
s.Shutdown()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
|
|
||||||
var hdr = new(HeaderRequestID)
|
|
||||||
if err := c.ReqHeaderParser(hdr); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return hdr.RequestID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) Error(c *fiber.Ctx, code int, msg string) error {
|
|
||||||
return c.Status(code).JSON(http.ErrorResponse{Error: msg})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Plugin helper funcitons
|
|
||||||
func (s *Server) GetCache() *redis.Client {
|
|
||||||
return (s.handlers["cache"]).(*redis.Client)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue
|
|
||||||
return (s.handlers["database"]).(*pgxpool.Pool)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) GetEventBus() *amqp.Channel {
|
|
||||||
return (s.handlers["eventbus"]).(*amqp.Channel)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) GetLogger() *fluentd.Logger {
|
|
||||||
return (s.handlers["logger"]).(*fluentd.Logger)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) GetRegistry() *consul.Service {
|
|
||||||
return (s.handlers["registry"]).(*consul.Service)
|
|
||||||
}
|
|
||||||
|
|
||||||
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
|
|
||||||
// func (s *Server) registerKVUpdater() { // @FIXME: merge duplication in server.go and worker.go
|
|
||||||
// go func() {
|
|
||||||
// ticker := time.NewTicker(time.Second * 10)
|
|
||||||
// for range ticker.C {
|
|
||||||
// config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil)
|
|
||||||
// if err != nil || config == nil {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
|
|
||||||
// kvCnf := bytes.NewBuffer(config.Value)
|
|
||||||
// decoder := json.NewDecoder(kvCnf)
|
|
||||||
// if err := decoder.Decode(&s.Config); err != nil {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }()
|
|
||||||
// }
|
|
||||||
|
|
||||||
// func (s *Server) clearMetadataCache() {
|
|
||||||
// ctx := context.Background()
|
|
||||||
// key, address := s.getMetadataIPsKey(), s.Config.Base.AppID
|
|
||||||
|
|
||||||
// s.Cache.LRem(ctx, key, 0, address)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// func (s *Server) getMetadataIPsKey() string {
|
|
||||||
// return "internal__" + s.Base.Config.AppName + "__ips"
|
|
||||||
// }
|
|
@ -1,76 +0,0 @@
|
|||||||
package worker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.pbiernat.io/egommerce/basket-service/internal/service"
|
|
||||||
"git.pbiernat.io/egommerce/basket-service/internal/ui"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
AddProductToBasket = "event.ProductAddedToBasket"
|
|
||||||
RemoveProductFromBasket = "event.ProductRemovedFromBasket"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Command interface {
|
|
||||||
run(CommandData) (bool, any)
|
|
||||||
}
|
|
||||||
|
|
||||||
type CommandData map[string]interface{}
|
|
||||||
|
|
||||||
type CommandRunner struct {
|
|
||||||
cmd Command
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewCommandRunner(cmd string, srvc *service.BasketService) *CommandRunner {
|
|
||||||
rnr := &CommandRunner{}
|
|
||||||
rnr.cmd = initCommand(cmd, srvc)
|
|
||||||
|
|
||||||
return rnr
|
|
||||||
}
|
|
||||||
|
|
||||||
func initCommand(cmd string, srvc *service.BasketService) Command {
|
|
||||||
// fmt.Printf("getCommand: %v\n", cmd)
|
|
||||||
var c Command
|
|
||||||
|
|
||||||
switch cmd { // FIXME
|
|
||||||
case "AddProductToBasket":
|
|
||||||
c = &AddProductToBasketCommand{srvc}
|
|
||||||
case "RemoveProductFromBasket":
|
|
||||||
c = &RemoveProductFromBasketCommand{srvc}
|
|
||||||
}
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *CommandRunner) run(data CommandData) (bool, any) {
|
|
||||||
return r.cmd.run(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
type AddProductToBasketCommand struct {
|
|
||||||
srvc *service.BasketService // FIXME: Remove service dep
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *AddProductToBasketCommand) run(data CommandData) (bool, any) { // FIXME: Move run func to WorkerPool
|
|
||||||
reqID := data["request_id"].(string) // FIXME Check input params!
|
|
||||||
productID := int(data["product_id"].(float64)) // FIXME Check input params!
|
|
||||||
basketID := data["basket_id"].(string) // FIXME Check input params!
|
|
||||||
qty := int(data["quantity"].(float64)) // FIXME Check input params!
|
|
||||||
|
|
||||||
basket, err := ui.AddProductToBasket(c.srvc, productID, qty, basketID, reqID)
|
|
||||||
|
|
||||||
return err == nil, basket
|
|
||||||
}
|
|
||||||
|
|
||||||
type RemoveProductFromBasketCommand struct {
|
|
||||||
srvc *service.BasketService // FIXME: Remove service dep
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *RemoveProductFromBasketCommand) run(data CommandData) (bool, any) { // FIXME: Move run func to WorkerPool
|
|
||||||
reqID := data["request_id"].(string) // FIXME Check input params!
|
|
||||||
productID := int(data["product_id"].(float64)) // FIXME Check input params!
|
|
||||||
basketID := data["basket_id"].(string) // FIXME Check input params!
|
|
||||||
qty := int(data["quantity"].(float64)) // FIXME Check input params!
|
|
||||||
|
|
||||||
basket, err := ui.RemoveProductFromBasket(c.srvc, productID, qty, basketID, reqID)
|
|
||||||
|
|
||||||
return err == nil, basket
|
|
||||||
}
|
|
@ -1,85 +0,0 @@
|
|||||||
package worker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"net"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
defName = "catalog-worker"
|
|
||||||
defDomain = "catalog-worker"
|
|
||||||
defCacheAddr = "egommerce.local:6379"
|
|
||||||
defCachePassword = "12345678"
|
|
||||||
defDbURL = "postgres://postgres:12345678@db-postgres:5432/egommerce"
|
|
||||||
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
|
|
||||||
defKVNmspc = "dev.egommerce/service/catalog-worker"
|
|
||||||
defLoggerAddr = "api-logger:24224"
|
|
||||||
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
|
||||||
defEbEventsExchange = "api-events"
|
|
||||||
defEbEventsQueue = "catalog-svc"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Config struct {
|
|
||||||
ID string
|
|
||||||
Name string
|
|
||||||
|
|
||||||
LoggerAddr string `json:"logger_addr"`
|
|
||||||
DbURL string `json:"db_url"`
|
|
||||||
CacheAddr string `json:"cache_addr"`
|
|
||||||
CachePassword string `json:"cache_password"`
|
|
||||||
MongoDbUrl string `json:"mongodb_url"`
|
|
||||||
EventBusURL string `json:"eventbus_url"`
|
|
||||||
EventBusExchange string `json:"eventbus_exchange"`
|
|
||||||
EventBusQueue string `json:"eventbus_queue"`
|
|
||||||
KVNamespace string
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewConfig(name string) *Config {
|
|
||||||
c := new(Config)
|
|
||||||
|
|
||||||
c.ID, _ = os.Hostname()
|
|
||||||
c.Name = name
|
|
||||||
|
|
||||||
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
|
|
||||||
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
|
|
||||||
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
|
|
||||||
c.EventBusExchange = defEbEventsExchange
|
|
||||||
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
|
|
||||||
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
|
||||||
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Config) GetAppFullName() string {
|
|
||||||
return fmt.Sprintf("%s_%s", c.Name, c.ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Config) GetIP() string {
|
|
||||||
host, _ := os.Hostname()
|
|
||||||
ips, _ := net.LookupIP(host)
|
|
||||||
// for _, ip := range ips {
|
|
||||||
// return ip.String()
|
|
||||||
// }
|
|
||||||
|
|
||||||
return ips[0].String()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Config) GetArray() map[string]string { // FIXME fix types etc
|
|
||||||
arr := make(map[string]string)
|
|
||||||
arr["id"] = c.ID
|
|
||||||
arr["name"] = c.Name
|
|
||||||
arr["appFullname"] = c.GetAppFullName()
|
|
||||||
arr["cacheAddr"] = c.CacheAddr
|
|
||||||
arr["cachePassword"] = c.CachePassword
|
|
||||||
arr["dbURL"] = c.DbURL
|
|
||||||
arr["eventBusExchange"] = c.EventBusExchange
|
|
||||||
arr["eventBusURL"] = c.EventBusURL
|
|
||||||
arr["kvNamespace"] = c.KVNamespace
|
|
||||||
arr["loggerAddr"] = c.LoggerAddr
|
|
||||||
|
|
||||||
return arr
|
|
||||||
}
|
|
@ -1,226 +0,0 @@
|
|||||||
package worker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/go-redis/redis/v8"
|
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
|
|
||||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
|
||||||
"git.pbiernat.io/egommerce/go-api-pkg/rabbitmq"
|
|
||||||
|
|
||||||
"git.pbiernat.io/egommerce/basket-service/internal/event"
|
|
||||||
"git.pbiernat.io/egommerce/basket-service/internal/service"
|
|
||||||
)
|
|
||||||
|
|
||||||
type (
|
|
||||||
Worker struct {
|
|
||||||
ID string
|
|
||||||
cnf *Config
|
|
||||||
handlers map[string]any
|
|
||||||
services map[string]any
|
|
||||||
doWrkUntil chan struct{}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
func New(c *Config) *Worker {
|
|
||||||
return &Worker{
|
|
||||||
ID: c.ID,
|
|
||||||
cnf: c,
|
|
||||||
handlers: make(map[string]any),
|
|
||||||
services: make(map[string]any),
|
|
||||||
doWrkUntil: make(chan struct{}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) Start() error {
|
|
||||||
setupQueues(w)
|
|
||||||
|
|
||||||
err := w.doWork(w.doWrkUntil)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.ID, err)
|
|
||||||
close(w.doWrkUntil)
|
|
||||||
}
|
|
||||||
<-w.doWrkUntil
|
|
||||||
|
|
||||||
return err
|
|
||||||
|
|
||||||
// go func() {
|
|
||||||
// sigint := make(chan os.Signal, 1)
|
|
||||||
// signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
|
||||||
// <-sigint
|
|
||||||
|
|
||||||
// w.Shutdown()
|
|
||||||
// close(while)
|
|
||||||
// }()
|
|
||||||
|
|
||||||
// run := w.createRunFile("./app.run") // TODO move to common library (shared between server and worker)
|
|
||||||
// defer w.removeRunFile(run)
|
|
||||||
|
|
||||||
// w.Logger.Log("Waiting for messages...")
|
|
||||||
|
|
||||||
// return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) RegisterHandler(name string, fn func() any) {
|
|
||||||
// fmt.Printf("Registering plugin( with handler): %s... OK\n", name)
|
|
||||||
w.handlers[name] = fn()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) OnShutdown() {
|
|
||||||
w.GetLogger().Log("Worker %s is going down...", w.ID)
|
|
||||||
// fmt.Printf("Worker %s is going down...\n", w.ID)
|
|
||||||
|
|
||||||
unbindQueues(w)
|
|
||||||
w.GetEventBus().Close()
|
|
||||||
w.GetDatabase().Close()
|
|
||||||
w.GetLogger().Log("Gone.")
|
|
||||||
w.GetLogger().Close()
|
|
||||||
|
|
||||||
close(w.doWrkUntil)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Plugin helper funcitons
|
|
||||||
func (w *Worker) GetCache() *redis.Client {
|
|
||||||
return (w.handlers["cache"]).(*redis.Client)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue
|
|
||||||
return (w.handlers["database"]).(*pgxpool.Pool)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) GetEventBus() *amqp.Channel {
|
|
||||||
return (w.handlers["eventbus"]).(*amqp.Channel)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) GetLogger() *fluentd.Logger {
|
|
||||||
return (w.handlers["logger"]).(*fluentd.Logger)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) doWork(while chan struct{}) error {
|
|
||||||
w.services["basket"] =
|
|
||||||
service.NewBasketService(w.GetDatabase(), w.GetCache(), w.GetEventBus(), w.GetLogger())
|
|
||||||
|
|
||||||
bSrv := (w.services["basket"]).(*service.BasketService)
|
|
||||||
|
|
||||||
msgs, err := w.GetEventBus().Consume(
|
|
||||||
w.cnf.EventBusQueue, // queue
|
|
||||||
"", // consumer
|
|
||||||
false, // auto-ack
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-local
|
|
||||||
false, // no-wait
|
|
||||||
nil, // args
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
w.GetLogger().Log("Failed to register a consumer: %s", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for d := range msgs {
|
|
||||||
// go func(d amqp.Delivery) {
|
|
||||||
w.processMsg(bSrv, d)
|
|
||||||
// }(d)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
<-while
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Worker) processMsg(srvc *service.BasketService, m amqp.Delivery) {
|
|
||||||
msg, err := rabbitmq.Deserialize(m.Body)
|
|
||||||
if err != nil {
|
|
||||||
w.GetLogger().Log("Deserialization error: %v\n", err)
|
|
||||||
fmt.Printf("Deserialization error: %v\n", err)
|
|
||||||
m.Reject(false)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
name := fmt.Sprintf("%s", msg["event"])
|
|
||||||
data := (msg["data"]).(map[string]interface{})
|
|
||||||
// reqID := (data["request_id"]).(string) // FIXME Check input params!
|
|
||||||
|
|
||||||
w.GetLogger().Log("Processing message \"%s\" with data: %v\n", name, data)
|
|
||||||
fmt.Printf("Processing message \"%s\" with data: %v\n", name, data)
|
|
||||||
|
|
||||||
var ok = false
|
|
||||||
switch true { // Refactor -> use case for polymorphism
|
|
||||||
case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET):
|
|
||||||
w.GetLogger().Log("Event: %s", event.EVENT_PRODUCT_ADDED_TO_BASKET)
|
|
||||||
case strings.Contains(name, event.EVENT_PRODUCT_REMOVED_FROM_BASKET):
|
|
||||||
w.GetLogger().Log("Event: %s", event.EVENT_PRODUCT_REMOVED_FROM_BASKET)
|
|
||||||
case strings.Contains(name, event.EVENT_BASKET_CHECKOUT):
|
|
||||||
w.GetLogger().Log("Event: %s", event.EVENT_BASKET_CHECKOUT)
|
|
||||||
}
|
|
||||||
|
|
||||||
// case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET):
|
|
||||||
// basketID := data["basket_id"].(string) // FIXME Check input params!
|
|
||||||
// productID := data["product_id"] // FIXME Check input params!
|
|
||||||
|
|
||||||
// rnr.cmd = &AddProductToBasketCommand{srvc}
|
|
||||||
// w.Logger.Log("Adding product #%d to basket #%s. ReqID: #%s", productID, basketID, reqID)
|
|
||||||
// case strings.Contains(name, event.EVENT_PRODUCT_REMOVED_FROM_BASKET):
|
|
||||||
// basketID := data["basket_id"].(string)
|
|
||||||
// productID := data["product_id"].(float64)
|
|
||||||
|
|
||||||
// rnr.cmd = &RemoveProductFromBasketCommand{srvc}
|
|
||||||
// w.Logger.Log("Removing product #%d from basket #%s. ReqID: #%s", productID, basketID, reqID)
|
|
||||||
// }
|
|
||||||
|
|
||||||
r := NewCommandRunner((data["command"]).(string), srvc)
|
|
||||||
ok, _ = r.run(data)
|
|
||||||
if ok {
|
|
||||||
w.GetLogger().Log("Successful executed message \"%s\"\n", name)
|
|
||||||
fmt.Printf("Successful executed message \"%s\"\n", name)
|
|
||||||
m.Ack(false)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
w.GetLogger().Log("Error processing \"%s\": %v", name, err)
|
|
||||||
fmt.Printf("Error processing \"%s\": %v\n", name, err)
|
|
||||||
m.Reject(true) // FIXME: or Nack(repeat until success - maybe message shout know...?
|
|
||||||
}
|
|
||||||
|
|
||||||
func setupQueues(w *Worker) {
|
|
||||||
err := rabbitmq.NewExchange(w.GetEventBus(), w.cnf.EventBusExchange)
|
|
||||||
if err != nil {
|
|
||||||
w.GetLogger().Log("Failed to declare EventBus exchange: %v\n", err)
|
|
||||||
fmt.Printf("Failed to declare EventBus exchange: %v\n", err)
|
|
||||||
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
args := amqp.Table{}
|
|
||||||
args["x-message-ttl"] = 5
|
|
||||||
_, err = w.GetEventBus().QueueDeclare(
|
|
||||||
w.cnf.EventBusQueue, // name
|
|
||||||
true, // durable
|
|
||||||
false, // delete when unused
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-wait
|
|
||||||
args, // arguments
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
w.GetLogger().Log("Failed to declare EventBus queue: %v\n", err)
|
|
||||||
fmt.Printf("Failed to declare EventBus queue: %v\n", err)
|
|
||||||
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.productAddedToBasket")
|
|
||||||
rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.productRemovedFromBasket")
|
|
||||||
rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.updateQuantity")
|
|
||||||
}
|
|
||||||
|
|
||||||
func unbindQueues(w *Worker) {
|
|
||||||
w.GetEventBus().QueueUnbind(w.cnf.EventBusQueue, "catalog.basket.productAddedToBasket", w.cnf.EventBusExchange, nil)
|
|
||||||
w.GetEventBus().QueueUnbind(w.cnf.EventBusQueue, "catalog.basket.productRemovedFromBasket", w.cnf.EventBusExchange, nil)
|
|
||||||
w.GetEventBus().QueueUnbind(w.cnf.EventBusQueue, "catalog.basket.updateQuantity", w.cnf.EventBusExchange, nil)
|
|
||||||
}
|
|
22
src/pkg/config/config.go
Normal file
22
src/pkg/config/config.go
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/joho/godotenv"
|
||||||
|
)
|
||||||
|
|
||||||
|
var ErrLoadingEnvs error
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
ErrLoadingEnvs = godotenv.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetEnv(name string, defVal string) string {
|
||||||
|
env := os.Getenv(name)
|
||||||
|
if env == "" {
|
||||||
|
return defVal
|
||||||
|
}
|
||||||
|
|
||||||
|
return env
|
||||||
|
}
|
21
src/pkg/server/config.go
Normal file
21
src/pkg/server/config.go
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
AppID string
|
||||||
|
AppName string
|
||||||
|
NetAddr string
|
||||||
|
PathPrefix string
|
||||||
|
|
||||||
|
IdleTimeout time.Duration // miliseconds
|
||||||
|
ReadTimeout time.Duration // miliseconds
|
||||||
|
WriteTimeout time.Duration // miliseconds
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Config) GetAppFullName() string {
|
||||||
|
return fmt.Sprintf("%s_%s", c.AppName, c.AppID)
|
||||||
|
}
|
101
src/pkg/server/server.go
Normal file
101
src/pkg/server/server.go
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gofiber/fiber/v2"
|
||||||
|
|
||||||
|
"git.pbiernat.dev/egommerce/api-entities/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
HeaderRequestID struct {
|
||||||
|
RequestID string `reqHeader:"x-request-id"`
|
||||||
|
}
|
||||||
|
Server struct {
|
||||||
|
*fiber.App
|
||||||
|
*Config
|
||||||
|
|
||||||
|
addr string // e.g. "127.0.0.1:8080"
|
||||||
|
|
||||||
|
// name string // e.g. "awesome-rest-api"
|
||||||
|
// kvNmspc string
|
||||||
|
// cache *redis.Client
|
||||||
|
// db *pgxpool.Pool
|
||||||
|
// discovery *discovery.Service
|
||||||
|
// ebCh *amqp.Channel
|
||||||
|
// log *fluentd.Logger
|
||||||
|
}
|
||||||
|
PurgeFn func(*Server) error
|
||||||
|
)
|
||||||
|
|
||||||
|
func New(conf *Config) *Server {
|
||||||
|
return &Server{
|
||||||
|
App: fiber.New(fiber.Config{
|
||||||
|
AppName: conf.AppID,
|
||||||
|
ServerHeader: conf.AppName,
|
||||||
|
ReadTimeout: conf.ReadTimeout * time.Millisecond,
|
||||||
|
WriteTimeout: conf.WriteTimeout * time.Millisecond,
|
||||||
|
IdleTimeout: conf.IdleTimeout * time.Millisecond,
|
||||||
|
}),
|
||||||
|
addr: conf.NetAddr,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
|
||||||
|
var hdr = new(HeaderRequestID)
|
||||||
|
if err := c.ReqHeaderParser(hdr); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return hdr.RequestID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// @Refactor make single func with error message and optional http status code...
|
||||||
|
func (s *Server) Error400(c *fiber.Ctx, msg string) error {
|
||||||
|
return c.Status(fiber.StatusBadRequest).JSON(http.ErrorResponse{Error: msg})
|
||||||
|
// test with &(reference) before http.ErrorMessage, but probably it's gonna be erroneous
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) Error401(c *fiber.Ctx, msg string) error {
|
||||||
|
return c.Status(fiber.StatusUnauthorized).JSON(http.ErrorResponse{Error: msg})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) Error403(c *fiber.Ctx, msg string) error {
|
||||||
|
return c.Status(fiber.StatusForbidden).JSON(http.ErrorResponse{Error: msg})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) Error404(c *fiber.Ctx, msg string) error {
|
||||||
|
return c.Status(fiber.StatusNotFound).JSON(http.ErrorResponse{Error: msg})
|
||||||
|
}
|
||||||
|
|
||||||
|
// @EndRefactor
|
||||||
|
|
||||||
|
func (s *Server) Start(forever chan struct{}, prgFn PurgeFn) error {
|
||||||
|
go func() {
|
||||||
|
fmt.Println("Starting...")
|
||||||
|
sigint := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
<-sigint
|
||||||
|
|
||||||
|
fmt.Println("shutting down: after term signal.")
|
||||||
|
if err := prgFn(s); err != nil {
|
||||||
|
log.Fatalf("Failed to shutdown server. Reason: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(forever)
|
||||||
|
}()
|
||||||
|
|
||||||
|
err := s.Listen(s.addr)
|
||||||
|
<-forever
|
||||||
|
// if err := s.Listen(s.addr); err != nil {
|
||||||
|
// s.logger.Log("Failed to start server! Reason: %v\n", err)
|
||||||
|
// }
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user