develop #1
19
.env.dist
19
.env.dist
@ -1,4 +1,15 @@
|
||||
SERVER_ADDR=:80
|
||||
DATABASE_URL=postgres://postgres:12345678@postgres-db:5432/egommerce
|
||||
MONGODB_URL=mongodb://mongodb:12345678@mongo-db:27017
|
||||
EVENTBUS_URL=amqp://guest:guest@api-eventbus:5672
|
||||
SERVER_ADDR=:1883
|
||||
|
||||
APP_NAME=order-svc
|
||||
APP_DOMAIN=host.docker.internal
|
||||
REGISTRY_USE_DOMAIN_OVER_IP=true
|
||||
APP_PATH_PREFIX=/order
|
||||
APP_KV_NAMESPACE=dev.egommerce/service/order-svc
|
||||
|
||||
LOGGER_ADDR=egommerce.local:48400
|
||||
REGISTRY_ADDR=egommerce.local:48100
|
||||
DATABASE_URL=postgres://postgres:12345678@egommerce.local:48500/egommerce
|
||||
CACHE_ADDR=egommerce.local:48300
|
||||
CACHE_PASSWORD=12345678
|
||||
MONGODB_URL=mongodb://mongodb:12345678@egommerce.local:48600
|
||||
EVENTBUS_URL=amqp://guest:guest@egommerce.local:48201
|
||||
|
@ -5,6 +5,7 @@ ARG BIN_OUTPUT=/go/bin
|
||||
ARG GO_SERVER=cmd/server/main.go
|
||||
ARG GO_MIGRATE=cmd/migrate/main.go
|
||||
ARG GO_WORKER=cmd/worker/main.go
|
||||
ARG GO_HEALTH=cmd/health/main.go
|
||||
|
||||
WORKDIR /go/src/app
|
||||
COPY src ./
|
||||
@ -12,4 +13,5 @@ COPY src ./
|
||||
RUN export CGO_ENABLED=0 ; export GOOS=linux ; export GOARCH=amd64 && \
|
||||
go build -ldflags="-w -s" -o "$BIN_OUTPUT/server" $GO_SERVER && \
|
||||
go build -ldflags="-w -s" -o "$BIN_OUTPUT/migrate" $GO_MIGRATE && \
|
||||
go build -ldflags="-w -s" -o "$BIN_OUTPUT/worker" $GO_WORKER
|
||||
go build -ldflags="-w -s" -o "$BIN_OUTPUT/worker" $GO_WORKER && \
|
||||
go build -ldflags="-w -s" -o "$BIN_OUTPUT/health" $GO_HEALTH
|
||||
|
@ -19,12 +19,17 @@ LABEL dev.egommerce.image.build_time=${BUILD_TIME}
|
||||
|
||||
WORKDIR /
|
||||
COPY --from=builder $BIN_OUTPUT /app
|
||||
COPY --from=builder /go/bin/migrate /bin/go_migrate
|
||||
COPY --from=builder /go/bin/migrate /bin/migrate
|
||||
COPY --from=builder /go/bin/health /bin/health
|
||||
COPY .env.dist /.env
|
||||
COPY ./bin /bin
|
||||
RUN chmod 755 /bin/entrypoint.sh /bin/migrate.sh
|
||||
|
||||
RUN apk add curl
|
||||
|
||||
EXPOSE 80
|
||||
|
||||
CMD ["/app"]
|
||||
ENTRYPOINT ["entrypoint.sh"]
|
||||
CMD ["sh", "-c", "/app"]
|
||||
|
||||
HEALTHCHECK --interval=5s --timeout=1s --retries=20 CMD health >/dev/null || exit 1
|
||||
|
3
Makefile
3
Makefile
@ -8,6 +8,9 @@ build-image-dev:
|
||||
build-image-prod:
|
||||
- sh ${DEPLOY_DIR}/image-build.sh
|
||||
|
||||
push-image-dev:
|
||||
- sh ${DEPLOY_DIR}/image-push.sh dev
|
||||
|
||||
push-image-prod:
|
||||
- sh ${DEPLOY_DIR}/image-push.sh
|
||||
|
||||
|
@ -14,10 +14,13 @@ waitForService()
|
||||
done
|
||||
}
|
||||
|
||||
update-resolv # provided by stack - better approach - single copy
|
||||
update-ca-certificates
|
||||
|
||||
waitForService "api-registry:8500"
|
||||
waitForService "postgres-db:5432"
|
||||
waitForService "api-eventbus:5672"
|
||||
waitForService "api-logger:24224"
|
||||
waitForService "api-registry:8500"
|
||||
waitForService "basket-svc:80"
|
||||
|
||||
# run migrations
|
||||
|
@ -1,17 +1,17 @@
|
||||
#!/usr/bin/env sh
|
||||
|
||||
# ensure migrate env is initialized
|
||||
$(go_migrate version >/dev/null 2>&1)
|
||||
$(migrate version >/dev/null 2>&1)
|
||||
version=$?
|
||||
if [ $version != "0" ]
|
||||
then
|
||||
echo "Creating base table..."
|
||||
$(go_migrate init >/dev/null 2>&1)
|
||||
$(migrate init >/dev/null 2>&1)
|
||||
init=$?
|
||||
fi
|
||||
|
||||
# check again
|
||||
$(go_migrate version >/dev/null 2>&1)
|
||||
$(migrate version >/dev/null 2>&1)
|
||||
version=$?
|
||||
if [ $version != "0" ]
|
||||
then
|
||||
@ -20,7 +20,6 @@ then
|
||||
fi
|
||||
|
||||
# run migrations
|
||||
go_migrate up
|
||||
echo "Done."
|
||||
migrate up
|
||||
|
||||
exit $version
|
||||
exit 0
|
||||
|
@ -1,7 +1,7 @@
|
||||
#!/bin/sh
|
||||
# RUN IN REPO ROOT DIR !!
|
||||
|
||||
export IMAGE_PREFIX="git.pbiernat.dev/egommerce/order"
|
||||
export IMAGE_PREFIX="git.pbiernat.io/egommerce/order"
|
||||
export BUILDER_IMAGE="egommerce-builder:order"
|
||||
export BUILD_TIME=$(date +"%Y%m%d%H%M%S")
|
||||
export SERVER_IMAGE="$IMAGE_PREFIX-svc"
|
||||
|
@ -1,13 +1,17 @@
|
||||
#!/bin/sh
|
||||
# RUN IN REPO ROOT DIR !!
|
||||
|
||||
export IMAGE_BASE="git.pbiernat.dev/egommerce/order"
|
||||
export IMAGE_BASE="git.pbiernat.io/egommerce/order"
|
||||
export SERVER_IMAGE="$IMAGE_BASE-svc"
|
||||
export WORKER_IMAGE="$IMAGE_BASE-worker"
|
||||
|
||||
TARGET=${1:-latest}
|
||||
|
||||
echo $DOCKER_PASSWORD | docker login git.pbiernat.dev -u $DOCKER_USERNAME --password-stdin
|
||||
echo $DOCKER_PASSWORD | docker login git.pbiernat.io -u $DOCKER_USERNAME --password-stdin
|
||||
|
||||
docker push "$SERVER_IMAGE:$TARGET"
|
||||
docker push "$WORKER_IMAGE:$TARGET"
|
||||
|
||||
# Restart container
|
||||
curl -X POST http://127.0.0.1:9001/api/webhooks/c9657d12-22fb-48c4-a7a9-add42f2f71cd
|
||||
curl -X POST http://127.0.0.1:9001/api/webhooks/9f979396-5cdf-46a2-bf7a-eae07760b04e
|
||||
|
1
src/app.run
Normal file
1
src/app.run
Normal file
@ -0,0 +1 @@
|
||||
1698893
|
39
src/cmd/health/main.go
Normal file
39
src/cmd/health/main.go
Normal file
@ -0,0 +1,39 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
const usageText = `This program runs healthcheck on the app.
|
||||
Usage:
|
||||
go run cmd/health/main.go
|
||||
`
|
||||
|
||||
func init() {
|
||||
flag.Usage = func() {
|
||||
fmt.Print(usageText)
|
||||
flag.PrintDefaults()
|
||||
os.Exit(2)
|
||||
}
|
||||
flag.Parse()
|
||||
}
|
||||
|
||||
func main() {
|
||||
var exitCode = 1
|
||||
if isOk := healthCheck(); isOk {
|
||||
exitCode = 0
|
||||
}
|
||||
os.Exit(exitCode)
|
||||
}
|
||||
|
||||
func healthCheck() bool {
|
||||
run, err := os.Open("/app.run")
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
defer run.Close()
|
||||
|
||||
return true
|
||||
}
|
@ -6,10 +6,13 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/config"
|
||||
"github.com/go-pg/migrations/v8"
|
||||
"github.com/go-pg/pg/v10"
|
||||
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
||||
|
||||
baseCnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
||||
cnf "git.pbiernat.io/egommerce/order-service/internal/server"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -32,57 +35,56 @@ Usage:
|
||||
`
|
||||
|
||||
func main() {
|
||||
if config.ErrLoadingEnvs != nil {
|
||||
log.Panicln("Error loading .env file", config.ErrLoadingEnvs)
|
||||
flag.Usage = func() {
|
||||
fmt.Print(usageText)
|
||||
flag.PrintDefaults()
|
||||
os.Exit(2)
|
||||
}
|
||||
flag.Parse()
|
||||
|
||||
if baseCnf.ErrLoadingEnvs != nil {
|
||||
log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs)
|
||||
}
|
||||
|
||||
// dbURL := config.GetEnv("DATABASE_URL", defDbURL)
|
||||
loggerAddr := config.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
||||
mTblName := config.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName)
|
||||
c := cnf.NewConfig("order-migrator")
|
||||
|
||||
logHost, logPort := fluentd.ParseAddr(loggerAddr)
|
||||
logger := fluentd.NewLogger(defAppName, logHost, logPort)
|
||||
// dbURL := baseCnf.GetEnv("DATABASE_URL", defDbURL)
|
||||
|
||||
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
|
||||
if err != nil {
|
||||
log.Fatalf("Error parsing logger addr: %s. Err: %v", c.LoggerAddr, err)
|
||||
}
|
||||
|
||||
logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error)
|
||||
if err != nil {
|
||||
log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err)
|
||||
}
|
||||
defer logger.Close()
|
||||
|
||||
flag.Usage = usage
|
||||
flag.Parse()
|
||||
|
||||
db := pg.Connect(&pg.Options{ // FIXME
|
||||
Addr: "postgres-db:5432",
|
||||
User: "postgres",
|
||||
Password: "12345678",
|
||||
Database: "egommerce",
|
||||
})
|
||||
defer db.Close()
|
||||
|
||||
mTbl := baseCnf.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName)
|
||||
mig := migrations.NewCollection()
|
||||
mig.SetTableName(mTblName)
|
||||
err := mig.DiscoverSQLMigrations("./migrations")
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
mig.SetTableName(mTbl)
|
||||
if err := mig.DiscoverSQLMigrations("./migrations"); err != nil {
|
||||
logger.Log("migration dicovery error: %#v", err)
|
||||
}
|
||||
|
||||
oldVersion, newVersion, err := mig.Run(db, flag.Args()...)
|
||||
if err != nil {
|
||||
exitf(err.Error())
|
||||
logger.Log("migration runner error: %#v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if newVersion != oldVersion {
|
||||
fmt.Printf("migrated from version %d to %d\n", oldVersion, newVersion)
|
||||
logger.Log("migrated from version %d to %d\n", oldVersion, newVersion)
|
||||
} else {
|
||||
fmt.Printf("version is %d\n", oldVersion)
|
||||
logger.Log("version is %d\n", oldVersion)
|
||||
}
|
||||
}
|
||||
|
||||
func usage() {
|
||||
fmt.Print(usageText)
|
||||
flag.PrintDefaults()
|
||||
os.Exit(2)
|
||||
}
|
||||
|
||||
func errorf(s string, args ...interface{}) {
|
||||
fmt.Fprintf(os.Stderr, s+"\n", args...)
|
||||
}
|
||||
|
||||
func exitf(s string, args ...interface{}) {
|
||||
errorf(s, args...)
|
||||
os.Exit(1)
|
||||
// os.Exit(0)
|
||||
}
|
||||
|
@ -1,97 +1,41 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||
amqp "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/config"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/database"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/server"
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
||||
|
||||
const (
|
||||
defAppName = "order-svc"
|
||||
defAppDomain = "order-svc"
|
||||
defPathPrefix = "/order"
|
||||
defNetAddr = ":80"
|
||||
defLoggerAddr = "api-logger:24224"
|
||||
defRegistryAddr = "api-registry:8500"
|
||||
defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce"
|
||||
defCacheAddr = "api-cache:6379"
|
||||
defCachePassword = "12345678"
|
||||
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
||||
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
|
||||
ebEventsExchange = "api-events"
|
||||
ebEventsQueue = "order-svc"
|
||||
defKVNmspc = "dev.egommerce/service/order-svc"
|
||||
"git.pbiernat.io/egommerce/order-service/internal/app"
|
||||
"git.pbiernat.io/egommerce/order-service/internal/server"
|
||||
)
|
||||
|
||||
func main() {
|
||||
if config.ErrLoadingEnvs != nil {
|
||||
log.Panicln("Error loading .env file", config.ErrLoadingEnvs)
|
||||
if cnf.ErrLoadingEnvs != nil {
|
||||
log.Panicln("Error loading .env file", cnf.ErrLoadingEnvs)
|
||||
}
|
||||
|
||||
c := new(server.Config)
|
||||
c.AppID, _ = os.Hostname()
|
||||
c.AppName = config.GetEnv("APP_NAME", defAppName)
|
||||
c.AppDomain = config.GetEnv("APP_DOMAIN", defAppDomain)
|
||||
c.PathPrefix = config.GetEnv("APP_PATH_PREFIX", defPathPrefix)
|
||||
c.NetAddr = config.GetEnv("SERVER_ADDR", defNetAddr)
|
||||
c.Port, _ = strconv.Atoi(c.NetAddr[1:])
|
||||
c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
||||
c.RegistryAddr = config.GetEnv("REGISTRY_ADDR", defRegistryAddr)
|
||||
c.DbURL = config.GetEnv("DATABASE_URL", defDbURL)
|
||||
c.CacheAddr = config.GetEnv("CACHE_ADDR", defCacheAddr)
|
||||
c.CachePassword = config.GetEnv("CACHE_PASSWORD", defCachePassword)
|
||||
c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL)
|
||||
c.EventBusExchange = ebEventsExchange
|
||||
c.KVNamespace = config.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
||||
c := server.NewConfig("order")
|
||||
cArr := c.GetArray()
|
||||
|
||||
logHost, logPort := fluentd.ParseAddr(c.LoggerAddr)
|
||||
logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort)
|
||||
defer logger.Close()
|
||||
doer := server.New(c)
|
||||
a := app.NewApp(doer)
|
||||
a.RegisterPlugin(app.LoggerPlugin(cArr))
|
||||
a.RegisterPlugin(app.CachePlugin(cArr))
|
||||
a.RegisterPlugin(app.DatabasePlugin(cArr))
|
||||
a.RegisterPlugin(app.EventbusPlugin(cArr))
|
||||
a.RegisterPlugin(app.RegistryPlugin(cArr))
|
||||
|
||||
// db conn
|
||||
dbConn, err := database.Connect(c.DbURL)
|
||||
if err != nil { // fixme: add wait-for-db...
|
||||
logger.Log("Failed to connect to Database server: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer dbConn.Close()
|
||||
while := make(chan struct{})
|
||||
err := a.Start(while)
|
||||
<-while
|
||||
|
||||
// redis conn
|
||||
redis := redis.NewClient(&redis.Options{
|
||||
Addr: c.CacheAddr,
|
||||
Password: c.CachePassword,
|
||||
DB: 0,
|
||||
})
|
||||
defer redis.Close()
|
||||
|
||||
// eventbus conn
|
||||
ebConn, ebCh, err := amqp.Open(c.EventBusURL)
|
||||
if err != nil {
|
||||
logger.Log("Failed to connect to EventBus server: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer ebCh.Close()
|
||||
defer amqp.Close(ebConn)
|
||||
|
||||
err = amqp.NewExchange(ebCh, c.EventBusExchange)
|
||||
if err != nil {
|
||||
logger.Log("Failed to declare EventBus exchange: %v\n", err)
|
||||
log.Fatalf("Failed to start server. Reason: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// start server
|
||||
srv := server.NewServer(c, logger, dbConn, redis, ebCh)
|
||||
|
||||
forever := make(chan struct{})
|
||||
srv.StartWithGracefulShutdown(forever)
|
||||
<-forever
|
||||
|
||||
// os.Exit(1)
|
||||
fmt.Println("Gone")
|
||||
os.Exit(0)
|
||||
}
|
||||
|
@ -1,212 +1,40 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul"
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/config"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/database"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/event"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/server"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/service"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/ui"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
||||
|
||||
const (
|
||||
defAppName = "order-worker"
|
||||
defLoggerAddr = "api-logger:24224"
|
||||
defRegistryAddr = "api-registry:8500"
|
||||
defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce"
|
||||
defCacheAddr = "api-cache:6379"
|
||||
defCachePassword = "12345678"
|
||||
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
||||
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
|
||||
ebEventsExchange = "api-events"
|
||||
ebEventsQueue = "order-worker"
|
||||
defKVNmspc = "dev.egommerce/service/order-worker"
|
||||
"git.pbiernat.io/egommerce/order-service/internal/app"
|
||||
"git.pbiernat.io/egommerce/order-service/internal/worker"
|
||||
)
|
||||
|
||||
func main() {
|
||||
if config.ErrLoadingEnvs != nil {
|
||||
log.Panicln("Error loading .env file", config.ErrLoadingEnvs)
|
||||
if cnf.ErrLoadingEnvs != nil {
|
||||
log.Fatalln("Error loading .env file.")
|
||||
}
|
||||
|
||||
c := new(server.Config)
|
||||
c.AppID, _ = os.Hostname()
|
||||
c.AppName = config.GetEnv("APP_NAME", defAppName)
|
||||
c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
||||
c.RegistryAddr = config.GetEnv("REGISTRY_ADDR", defRegistryAddr)
|
||||
c.DbURL = config.GetEnv("DATABASE_URL", defDbURL)
|
||||
c.CacheAddr = config.GetEnv("CACHE_ADDR", defCacheAddr)
|
||||
c.CachePassword = config.GetEnv("CACHE_PASSWORD", defCachePassword)
|
||||
c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL)
|
||||
c.EventBusExchange = ebEventsExchange
|
||||
c.EventBusQueue = ebEventsQueue
|
||||
c.KVNamespace = config.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
||||
c := worker.NewConfig("order-worker")
|
||||
cArr := c.GetArray()
|
||||
|
||||
logHost, logPort := fluentd.ParseAddr(c.LoggerAddr)
|
||||
logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort)
|
||||
defer logger.Close()
|
||||
doer := worker.New(c)
|
||||
a := app.NewApp(doer)
|
||||
a.RegisterPlugin(app.LoggerPlugin(cArr))
|
||||
a.RegisterPlugin(app.CachePlugin(cArr))
|
||||
a.RegisterPlugin(app.DatabasePlugin(cArr))
|
||||
a.RegisterPlugin(app.EventbusPlugin(cArr))
|
||||
|
||||
while := make(chan struct{})
|
||||
err := a.Start(while)
|
||||
<-while
|
||||
|
||||
consul, err := discovery.NewService(c.RegistryAddr, c.AppID, c.AppName, c.AppID, "", "", 0)
|
||||
if err != nil {
|
||||
logger.Log("Error connecting to %s: %v", c.RegistryAddr, err)
|
||||
}
|
||||
|
||||
go func(consul *discovery.Service) {
|
||||
interval := time.Second * 3
|
||||
ticker := time.NewTicker(interval)
|
||||
for range ticker.C {
|
||||
updateKVConfig(consul, c) // FIXME: duplicated in internal/app/server/server.go
|
||||
}
|
||||
}(consul)
|
||||
|
||||
// db conn
|
||||
dbConn, err := database.Connect(c.DbURL)
|
||||
if err != nil { // fixme: add wait-for-db...
|
||||
logger.Log("Failed to connect to Database server: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer dbConn.Close()
|
||||
|
||||
// redis conn
|
||||
redis := redis.NewClient(&redis.Options{
|
||||
Addr: c.CacheAddr,
|
||||
Password: c.CachePassword,
|
||||
DB: 0,
|
||||
})
|
||||
defer redis.Close()
|
||||
|
||||
// eventbus conn
|
||||
ebConn, ebCh, err := rabbitmq.Open(c.EventBusURL)
|
||||
if err != nil {
|
||||
logger.Log("Failed to connect to EventBus server: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer ebCh.Close()
|
||||
defer rabbitmq.Close(ebConn)
|
||||
|
||||
err = rabbitmq.NewExchange(ebCh, c.EventBusExchange)
|
||||
if err != nil {
|
||||
logger.Log("Failed to declare EventBus exchange: %v\n", err)
|
||||
log.Fatalf("Failed to start worker. Reason: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// create and bind queues
|
||||
_, err = ebCh.QueueDeclare(
|
||||
c.EventBusQueue, // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
if err != nil {
|
||||
logger.Log("Failed to declare EventBus queue: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "basket.order.basketCheckout")
|
||||
// if err != nil {
|
||||
// logger.Log("Failed to prepare EventBus queue: %v\n", err)
|
||||
// os.Exit(1)
|
||||
// }
|
||||
|
||||
// event consume
|
||||
msgs, err := ebCh.Consume(
|
||||
c.EventBusQueue, // queue
|
||||
"", // consumer
|
||||
false, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
if err != nil {
|
||||
logger.Log("Failed to register a consumer: %s", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
forever := make(chan struct{})
|
||||
go func() {
|
||||
sigint := make(chan os.Signal, 1)
|
||||
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigint
|
||||
|
||||
logger.Log("Worker %s stopped working...\n", c.GetAppFullName())
|
||||
|
||||
close(forever)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
orderSrv := service.NewOrderService(dbConn, redis, ebCh, logger)
|
||||
|
||||
for d := range msgs {
|
||||
go func(d amqp.Delivery) {
|
||||
msg, err := rabbitmq.Deserialize(d.Body)
|
||||
if err != nil {
|
||||
logger.Log("deserialize error: %v\n", err)
|
||||
d.Reject(false) // FIXME: how to handle erros in queue...????
|
||||
return
|
||||
}
|
||||
|
||||
eName := fmt.Sprintf("%s", msg["event"])
|
||||
data := (msg["data"]).(map[string]interface{})
|
||||
logger.Log("Message<%s>: %s\n", eName, data)
|
||||
|
||||
reqID := data["request_id"].(string) // FIXME Check input params!
|
||||
|
||||
switch true {
|
||||
case strings.Contains(eName, event.EVENT_BASKET_CHECKOUT):
|
||||
basketID := data["basket_id"].(string) // FIXME Check input params!
|
||||
// create new order based on basket
|
||||
order, err := ui.CreateOrder(orderSrv, basketID, reqID)
|
||||
if err != nil {
|
||||
logger.Log("%s error: %v", event.EVENT_BASKET_CHECKOUT, err)
|
||||
d.Reject(false) // FIXME: or Nack? how to handle erros in queue...
|
||||
break
|
||||
}
|
||||
logger.Log("Event: %s. Created Order: %v", event.EVENT_BASKET_CHECKOUT, order)
|
||||
}
|
||||
|
||||
logger.Log("ACK: %s", eName)
|
||||
d.Ack(false)
|
||||
}(d)
|
||||
}
|
||||
}()
|
||||
|
||||
logger.Log("Waiting for messages...")
|
||||
<-forever
|
||||
}
|
||||
|
||||
func updateKVConfig(s *discovery.Service, oldCnf *server.Config) error { // FIXME: duplicated in internal/app/server/server.go
|
||||
data, _, err := s.KV().Get(oldCnf.KVNamespace, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if data == nil {
|
||||
return errors.New("empty KV config data")
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(data.Value)
|
||||
decoder := json.NewDecoder(buf)
|
||||
if err := decoder.Decode(oldCnf); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
fmt.Println("Gone")
|
||||
os.Exit(0)
|
||||
}
|
||||
|
92
src/go.mod
92
src/go.mod
@ -1,53 +1,98 @@
|
||||
module git.pbiernat.dev/egommerce/order-service
|
||||
module git.pbiernat.io/egommerce/order-service
|
||||
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
git.pbiernat.dev/egommerce/api-entities v0.0.26
|
||||
git.pbiernat.dev/egommerce/go-api-pkg v0.0.136
|
||||
git.pbiernat.io/egommerce/api-entities v0.2.3
|
||||
git.pbiernat.io/egommerce/go-api-pkg v0.2.88
|
||||
github.com/georgysavva/scany/v2 v2.0.0
|
||||
github.com/go-pg/migrations/v8 v8.1.0
|
||||
github.com/go-pg/pg/v10 v10.10.7
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
github.com/gofiber/fiber/v2 v2.40.1
|
||||
github.com/jackc/pgx/v4 v4.17.2
|
||||
github.com/joho/godotenv v1.4.0
|
||||
github.com/streadway/amqp v1.0.0
|
||||
github.com/jackc/pgx/v5 v5.4.1
|
||||
github.com/rabbitmq/amqp091-go v1.10.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/DataDog/datadog-go v3.2.0+incompatible // indirect
|
||||
github.com/andybalholm/brotli v1.0.4 // indirect
|
||||
github.com/armon/go-metrics v0.4.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/armon/go-radix v1.0.0 // indirect
|
||||
github.com/aws/aws-sdk-go v1.42.34 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible // indirect
|
||||
github.com/circonus-labs/circonusllhist v0.1.3 // indirect
|
||||
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/fatih/color v1.13.0 // indirect
|
||||
github.com/envoyproxy/go-control-plane v0.11.0 // indirect
|
||||
github.com/envoyproxy/protoc-gen-validate v0.10.0 // indirect
|
||||
github.com/fatih/color v1.14.1 // indirect
|
||||
github.com/fluent/fluent-logger-golang v1.9.0 // indirect
|
||||
github.com/go-pg/zerochecker v0.2.0 // indirect
|
||||
github.com/hashicorp/consul/api v1.18.0 // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/google/btree v1.0.1 // indirect
|
||||
github.com/hashicorp/consul v1.16.0 // indirect
|
||||
github.com/hashicorp/consul-net-rpc v0.0.0-20221205195236-156cfab66a69 // indirect
|
||||
github.com/hashicorp/consul/api v1.22.0 // indirect
|
||||
github.com/hashicorp/consul/envoyextensions v0.3.0 // indirect
|
||||
github.com/hashicorp/consul/sdk v0.14.0 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-bexpr v0.1.2 // indirect
|
||||
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
|
||||
github.com/hashicorp/go-hclog v1.3.1 // indirect
|
||||
github.com/hashicorp/go-hclog v1.5.0 // indirect
|
||||
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
|
||||
github.com/hashicorp/go-msgpack v0.5.5 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/go-retryablehttp v0.6.7 // indirect
|
||||
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
|
||||
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
|
||||
github.com/hashicorp/go-syslog v1.0.0 // indirect
|
||||
github.com/hashicorp/go-uuid v1.0.3 // indirect
|
||||
github.com/hashicorp/go-version v1.2.1 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/hashicorp/memberlist v0.5.0 // indirect
|
||||
github.com/hashicorp/raft v1.5.0 // indirect
|
||||
github.com/hashicorp/raft-autopilot v0.1.6 // indirect
|
||||
github.com/hashicorp/serf v0.10.1 // indirect
|
||||
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
|
||||
github.com/jackc/pgconn v1.13.0 // indirect
|
||||
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
|
||||
github.com/jackc/pgio v1.0.0 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgproto3/v2 v2.3.1 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
|
||||
github.com/jackc/pgtype v1.13.0 // indirect
|
||||
github.com/jackc/puddle v1.3.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
|
||||
github.com/jackc/pgtype v1.14.3 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.0 // indirect
|
||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||
github.com/joho/godotenv v1.5.1 // indirect
|
||||
github.com/klauspost/compress v1.15.9 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.16 // indirect
|
||||
github.com/mattn/go-isatty v0.0.17 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.14 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
||||
github.com/miekg/dns v1.1.41 // indirect
|
||||
github.com/mitchellh/copystructure v1.2.0 // indirect
|
||||
github.com/mitchellh/go-homedir v1.1.0 // indirect
|
||||
github.com/mitchellh/go-testing-interface v1.14.0 // indirect
|
||||
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/mitchellh/reflectwalk v1.0.2 // indirect
|
||||
github.com/philhofer/fwd v1.1.1 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/prometheus/client_golang v1.14.0 // indirect
|
||||
github.com/prometheus/client_model v0.3.0 // indirect
|
||||
github.com/prometheus/common v0.37.0 // indirect
|
||||
github.com/prometheus/procfs v0.8.0 // indirect
|
||||
github.com/rivo/uniseg v0.2.0 // indirect
|
||||
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
|
||||
github.com/stretchr/objx v0.5.0 // indirect
|
||||
github.com/stretchr/testify v1.8.3 // indirect
|
||||
github.com/tinylib/msgp v1.1.6 // indirect
|
||||
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
|
||||
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/valyala/fasthttp v1.41.0 // indirect
|
||||
github.com/valyala/tcplisten v1.0.0 // indirect
|
||||
@ -55,8 +100,15 @@ require (
|
||||
github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect
|
||||
github.com/vmihailenco/tagparser v0.1.2 // indirect
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
|
||||
golang.org/x/sys v0.2.0 // indirect
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
golang.org/x/crypto v0.20.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
|
||||
golang.org/x/net v0.21.0 // indirect
|
||||
golang.org/x/sync v0.2.0 // indirect
|
||||
golang.org/x/sys v0.17.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/time v0.3.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
|
||||
google.golang.org/protobuf v1.30.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
mellium.im/sasl v0.2.1 // indirect
|
||||
)
|
||||
|
680
src/go.sum
680
src/go.sum
File diff suppressed because it is too large
Load Diff
81
src/internal/app/app.go
Normal file
81
src/internal/app/app.go
Normal file
@ -0,0 +1,81 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
type (
|
||||
Doer interface {
|
||||
Start() error
|
||||
RegisterHandler(string, func() any)
|
||||
OnShutdown()
|
||||
}
|
||||
Application interface {
|
||||
Start(while chan struct{})
|
||||
RegisterPlugin(PluginFn) error
|
||||
Shutdown()
|
||||
}
|
||||
|
||||
App struct {
|
||||
doer Doer
|
||||
}
|
||||
)
|
||||
|
||||
func NewApp(d Doer) *App {
|
||||
return &App{
|
||||
doer: d,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *App) Start(while chan struct{}) error {
|
||||
go func() {
|
||||
sigint := make(chan os.Signal, 1)
|
||||
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigint
|
||||
|
||||
a.Shutdown()
|
||||
|
||||
close(while)
|
||||
}()
|
||||
|
||||
run := a.createRunFile("./app.run") // FIXME path...
|
||||
defer a.removeRunFile(run)
|
||||
|
||||
err := a.doer.Start()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to start app. Reason: %v\n", err)
|
||||
close(while)
|
||||
}
|
||||
<-while
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *App) RegisterPlugin(p Plugin) error {
|
||||
a.doer.RegisterHandler(p.name, p.fn)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *App) Shutdown() {
|
||||
a.doer.OnShutdown()
|
||||
}
|
||||
|
||||
func (a *App) createRunFile(path string) *os.File {
|
||||
run, err := os.Create(path)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create run file. Reason: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
run.WriteString(strconv.Itoa(os.Getpid()))
|
||||
|
||||
return run
|
||||
}
|
||||
|
||||
func (a *App) removeRunFile(f *os.File) error {
|
||||
return f.Close()
|
||||
}
|
@ -1,22 +0,0 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
)
|
||||
|
||||
var ErrLoadingEnvs error
|
||||
|
||||
func init() {
|
||||
ErrLoadingEnvs = godotenv.Load()
|
||||
}
|
||||
|
||||
func GetEnv(name string, defVal string) string { // FIXME defVal and return types
|
||||
env := os.Getenv(name)
|
||||
if env == "" {
|
||||
return defVal
|
||||
}
|
||||
|
||||
return env
|
||||
}
|
@ -1,20 +0,0 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"log"
|
||||
)
|
||||
|
||||
const AppName = "order-svc"
|
||||
|
||||
func Panic(v ...any) {
|
||||
log.Panicln(AppName+":", v)
|
||||
}
|
||||
|
||||
func Panicf(format string, v ...any) {
|
||||
log.Panicf(AppName+": "+format, v...)
|
||||
}
|
||||
|
||||
func Panicln(v ...any) {
|
||||
v = append([]any{AppName + ":"}, v...)
|
||||
log.Panicln(v...)
|
||||
}
|
139
src/internal/app/plugins.go
Normal file
139
src/internal/app/plugins.go
Normal file
@ -0,0 +1,139 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
redis "github.com/go-redis/redis/v8"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/consul"
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
||||
|
||||
db "git.pbiernat.io/egommerce/order-service/pkg/database"
|
||||
)
|
||||
|
||||
type (
|
||||
Plugin struct {
|
||||
name string
|
||||
fn PluginFn
|
||||
}
|
||||
PluginFn func() any
|
||||
)
|
||||
|
||||
func CachePlugin(cArr map[string]string) Plugin {
|
||||
return Plugin{
|
||||
name: "cache",
|
||||
fn: func() any {
|
||||
return redis.NewClient(&redis.Options{
|
||||
Addr: cArr["cacheAddr"],
|
||||
Password: cArr["cachePassword"],
|
||||
DB: 0,
|
||||
})
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func DatabasePlugin(cArr map[string]string) Plugin {
|
||||
return Plugin{
|
||||
name: "database",
|
||||
fn: func() any {
|
||||
dbConn, err := db.Connect(cArr["dbURL"])
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", cArr["dbURL"], err)
|
||||
os.Exit(1) // TODO: retry in background...
|
||||
}
|
||||
|
||||
return dbConn
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func EventbusPlugin(cArr map[string]string) Plugin {
|
||||
return Plugin{
|
||||
name: "eventbus",
|
||||
fn: func() any {
|
||||
conn, err := amqp.Dial(cArr["eventBusURL"])
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to the EventBus: %s. Err: %v\n", cArr["eventBusURL"], err)
|
||||
os.Exit(1) // TODO: retry in background...
|
||||
}
|
||||
|
||||
chn, err := conn.Channel()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to open new EventBus channel. Err: %v\n", err)
|
||||
os.Exit(1) // TODO: retry in background...
|
||||
}
|
||||
|
||||
return chn
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func LoggerPlugin(cArr map[string]string) Plugin {
|
||||
return Plugin{
|
||||
name: "logger",
|
||||
fn: func() any {
|
||||
logHost, logPort, err := fluentd.ParseAddr(cArr["loggerAddr"])
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to parse FluentD address: %s. Err: %v", cArr["loggerAddr"], err)
|
||||
os.Exit(1) // TODO: retry in background...
|
||||
}
|
||||
|
||||
logger, err := fluentd.NewLogger(cArr["appFullname"], logHost, logPort)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to the FluentD on %s:%d. Err: %v", logHost, logPort, err)
|
||||
os.Exit(1) // TODO: retry in background...
|
||||
}
|
||||
|
||||
return logger
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func RegistryPlugin(cArr map[string]string) Plugin {
|
||||
return Plugin{
|
||||
name: "registry",
|
||||
fn: func() any {
|
||||
port, _ := strconv.Atoi(cArr["netAddr"][1:]) // FIXME: can be IP:PORT or :PORT
|
||||
// log.Printf("Consul retrieved port: %v", port)
|
||||
registry, err := consul.NewService(cArr["registryAddr"], cArr["id"], cArr["name"], cArr["registryDomainOverIP"], cArr["ip"], cArr["domain"], cArr["pathPrefix"], port)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", cArr["registryAddr"], err)
|
||||
os.Exit(1) // TODO: retry in background...
|
||||
}
|
||||
|
||||
err = registry.Register()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to register in the Consul service. Err: %v", err)
|
||||
os.Exit(1) // TODO: retry in background...
|
||||
}
|
||||
|
||||
registry.RegisterHealthChecks()
|
||||
// a.registerKVUpdater() // FIXME run as goroutine
|
||||
|
||||
return registry
|
||||
|
||||
// svc, _ := registry.Connect()
|
||||
// tlsCnf := svc.ServerTLSConfig()
|
||||
// s.Base.App.Server().TLSConfig = tlsCnf
|
||||
// fmt.Println("Podmiana configa TLS")
|
||||
// defer svc.Close()
|
||||
|
||||
// go func() { // Consul KV updater
|
||||
// ticker := time.NewTicker(time.Second * 15)
|
||||
// for range ticker.C {
|
||||
// fetchKVConfig(s) // FIXME: duplicated in worker
|
||||
// }
|
||||
// }()
|
||||
|
||||
// go func() { // Server metadata cache updater
|
||||
// ticker := time.NewTicker(time.Second * 5)
|
||||
// for range ticker.C {
|
||||
// s.cacheMetadata()
|
||||
// }
|
||||
// }()
|
||||
},
|
||||
}
|
||||
}
|
@ -1,31 +0,0 @@
|
||||
package server
|
||||
|
||||
import "fmt"
|
||||
|
||||
type Config struct {
|
||||
AppID string
|
||||
AppName string
|
||||
AppDomain string
|
||||
PathPrefix string
|
||||
NetAddr string
|
||||
Port int
|
||||
RegistryAddr string
|
||||
KVNamespace string
|
||||
|
||||
LoggerAddr string `json:"logger_addr"`
|
||||
DbURL string `json:"db_url"`
|
||||
CacheAddr string `json:"cache_addr"`
|
||||
CachePassword string `json:"cache_password"`
|
||||
MongoDbUrl string `json:"mongodb_url"`
|
||||
EventBusURL string `json:"eventbus_url"`
|
||||
EventBusExchange string `json:"eventbus_exchange"`
|
||||
EventBusQueue string `json:"eventbus_queue"`
|
||||
HttpReadTimeout int `json:"http_read_timeout"`
|
||||
HttpWriteTimeout int `json:"http_write_timeout"`
|
||||
HttpIdleTimeout int `json:"http_idle_timeout"`
|
||||
// Fields with json mapping are available trough ConsulKV
|
||||
}
|
||||
|
||||
func (c *Config) GetAppFullName() string {
|
||||
return fmt.Sprintf("%s_%s", c.AppName, c.AppID)
|
||||
}
|
@ -1,52 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/fiber/v2/middleware/cors"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultCORS = cors.New(cors.Config{
|
||||
AllowOrigins: "*",
|
||||
AllowCredentials: true,
|
||||
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
|
||||
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
|
||||
})
|
||||
)
|
||||
|
||||
func SetupRoutes(s *Server) {
|
||||
s.App.Options("*", defaultCORS)
|
||||
|
||||
s.App.Get("/health", s.HealthHandler)
|
||||
s.App.Get("/config", s.ConfigHandler)
|
||||
|
||||
api := s.App.Group("/api")
|
||||
v1 := api.Group("/v1")
|
||||
order := v1.Group("/order")
|
||||
order.Put("/:orderId/status", s.UpdateOrderStatusHandler)
|
||||
}
|
||||
|
||||
func SetupMiddlewares(s *Server) {
|
||||
s.App.Use(defaultCORS)
|
||||
s.App.Use(LoggingMiddleware(s.log))
|
||||
}
|
||||
|
||||
// Middlewares
|
||||
func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error {
|
||||
return func(c *fiber.Ctx) error {
|
||||
path := string(c.Request().URI().Path())
|
||||
if strings.Contains(path, "/health") {
|
||||
return c.Next()
|
||||
}
|
||||
|
||||
log.Log("Request: %s, remote: %s, via: %s",
|
||||
c.Request().URI().String(),
|
||||
c.Context().RemoteIP().String(),
|
||||
string(c.Context().UserAgent()))
|
||||
|
||||
return c.Next()
|
||||
}
|
||||
}
|
@ -1,178 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
"github.com/streadway/amqp"
|
||||
|
||||
def "git.pbiernat.dev/egommerce/api-entities/http"
|
||||
discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul"
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
*fiber.App
|
||||
conf *Config
|
||||
log *fluentd.Logger
|
||||
db *pgxpool.Pool
|
||||
cache *redis.Client
|
||||
ebCh *amqp.Channel
|
||||
discovery *discovery.Service
|
||||
name string
|
||||
addr string
|
||||
kvNmspc string
|
||||
}
|
||||
|
||||
type Headers struct {
|
||||
RequestID string `reqHeader:"x-request-id"`
|
||||
}
|
||||
|
||||
func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, cache *redis.Client, ebCh *amqp.Channel) *Server {
|
||||
consul, err := discovery.NewService(conf.RegistryAddr, conf.AppID, conf.AppName, conf.AppID, conf.AppDomain, conf.PathPrefix, conf.Port)
|
||||
if err != nil {
|
||||
logger.Log("Error connecting to %s: %v", conf.RegistryAddr, err)
|
||||
}
|
||||
|
||||
logger.Log("Registering service with name: %s, address: %s", consul.Name, consul.Address)
|
||||
err = consul.Register()
|
||||
if err != nil {
|
||||
logger.Log("register error: %v", err)
|
||||
}
|
||||
|
||||
cnf := fiber.Config{
|
||||
AppName: conf.AppName,
|
||||
ServerHeader: conf.AppName,
|
||||
ReadTimeout: time.Millisecond * 50,
|
||||
WriteTimeout: time.Millisecond * 50,
|
||||
IdleTimeout: time.Millisecond * 50,
|
||||
}
|
||||
s := &Server{
|
||||
fiber.New(cnf),
|
||||
conf,
|
||||
logger,
|
||||
db,
|
||||
cache,
|
||||
ebCh,
|
||||
consul,
|
||||
conf.AppName,
|
||||
conf.NetAddr,
|
||||
conf.KVNamespace,
|
||||
}
|
||||
|
||||
go func(s *Server) { // Consul KV config updater
|
||||
interval := time.Second * 15
|
||||
ticker := time.NewTicker(interval)
|
||||
for range ticker.C {
|
||||
s.updateKVConfig()
|
||||
}
|
||||
}(s)
|
||||
|
||||
go func(s *Server) { // Server metadata cache updater
|
||||
interval := time.Second * 5
|
||||
ticker := time.NewTicker(interval)
|
||||
for range ticker.C {
|
||||
s.cacheMetadata()
|
||||
}
|
||||
}(s)
|
||||
|
||||
SetupMiddlewares(s)
|
||||
SetupRoutes(s)
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Server) Start() {
|
||||
err := s.Listen(s.addr)
|
||||
s.log.Log("Starting error: %v", err)
|
||||
}
|
||||
|
||||
func (s *Server) StartWithGracefulShutdown(forever chan struct{}) {
|
||||
go func() {
|
||||
sigint := make(chan os.Signal, 1)
|
||||
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigint
|
||||
|
||||
if err := s.gracefulShutdown(); err != nil {
|
||||
s.log.Log("Server is not shutting down! Reason: %v", err)
|
||||
}
|
||||
|
||||
close(forever)
|
||||
}()
|
||||
|
||||
if err := s.Listen(s.addr); err != nil {
|
||||
s.log.Log("Server is not running! Reason: %v", err)
|
||||
}
|
||||
|
||||
<-forever
|
||||
}
|
||||
|
||||
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
|
||||
var hdr = new(Headers)
|
||||
if err := c.ReqHeaderParser(hdr); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return hdr.RequestID, nil
|
||||
}
|
||||
|
||||
func (s *Server) Error400(c *fiber.Ctx, msg string) error {
|
||||
return c.Status(fiber.StatusBadRequest).JSON(&def.ErrorResponse{Error: msg})
|
||||
}
|
||||
|
||||
func (s *Server) Error404(c *fiber.Ctx, msg string) error {
|
||||
return c.Status(fiber.StatusNotFound).JSON(&def.ErrorResponse{Error: msg})
|
||||
}
|
||||
|
||||
func (s *Server) updateKVConfig() { // FIXME: duplicated in cmd/worker/main.go
|
||||
config, _, err := s.discovery.KV().Get(s.kvNmspc, nil)
|
||||
if err != nil || config == nil {
|
||||
return
|
||||
}
|
||||
|
||||
kvCnf := bytes.NewBuffer(config.Value)
|
||||
decoder := json.NewDecoder(kvCnf)
|
||||
if err := decoder.Decode(&s.conf); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) cacheMetadata() {
|
||||
ctx := context.Background()
|
||||
key, address := "internal__"+s.conf.AppName+"__ips", s.conf.AppID // FIXME: key name
|
||||
|
||||
pos := s.cache.LPos(ctx, key, address, redis.LPosArgs{}).Val()
|
||||
if pos >= 0 {
|
||||
s.cache.LRem(ctx, key, 0, address)
|
||||
}
|
||||
|
||||
s.cache.LPush(ctx, key, address).Err()
|
||||
}
|
||||
|
||||
func (s *Server) clearMetadataCache() {
|
||||
ctx := context.Background()
|
||||
key, address := "internal__"+s.conf.AppName+"__ips", s.conf.AppID // FIXME: key name
|
||||
|
||||
s.cache.LRem(ctx, key, 0, address)
|
||||
}
|
||||
|
||||
func (s *Server) gracefulShutdown() error {
|
||||
s.log.Log("Server is going down...")
|
||||
s.log.Log("Unregistering service: %s", s.discovery.GetID())
|
||||
s.discovery.Unregister()
|
||||
s.clearMetadataCache()
|
||||
|
||||
s.ebCh.Close()
|
||||
s.db.Close()
|
||||
s.log.Close()
|
||||
|
||||
return s.Shutdown()
|
||||
}
|
@ -1,72 +0,0 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.pbiernat.dev/egommerce/api-entities/model"
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/api"
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||
amqp "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/event"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
base "github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
const (
|
||||
SERVICE_USER_AGENT = "order-httpclient"
|
||||
)
|
||||
|
||||
type OrderService struct {
|
||||
dbConn *pgxpool.Pool
|
||||
redis *redis.Client
|
||||
ebCh *base.Channel
|
||||
log *fluentd.Logger
|
||||
}
|
||||
|
||||
func NewOrderService(dbConn *pgxpool.Pool, redis *redis.Client, chn *base.Channel, log *fluentd.Logger) *OrderService {
|
||||
return &OrderService{dbConn, redis, chn, log}
|
||||
}
|
||||
|
||||
func (s *OrderService) Log(format string, val ...any) {
|
||||
s.log.Log(format, val...)
|
||||
}
|
||||
|
||||
// Refactor
|
||||
func (s *OrderService) CreateOrder(ctx context.Context, basketID string) (*model.OrderModel, error) {
|
||||
basketAPI := api.NewBasketAPI(SERVICE_USER_AGENT, s.redis)
|
||||
basket, err := basketAPI.GetBasket(basketID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
order := new(model.OrderModel)
|
||||
order.State = basket.State // FIXME: are the same status?
|
||||
|
||||
sql := `INSERT INTO ordering."order"(state) VALUES($1) RETURNING id`
|
||||
if err := s.dbConn.QueryRow(ctx, sql, order.State).Scan(&order.ID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
items, err := basketAPI.GetBasketItems(basket.ID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, item := range items {
|
||||
sql := `INSERT INTO ordering.order_item(order_id,product_id,price,quantity) VALUES($1,$2,$3,$4)`
|
||||
if _, err := s.dbConn.Exec(ctx, sql, order.ID, item.ProductID, item.Price, item.Quantity); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return order, nil
|
||||
}
|
||||
func (s *OrderService) UpdateOrderStatus(reqID, orderID, status string) (string, error) {
|
||||
s.log.Log("Update order#%s status to %s", orderID, status)
|
||||
|
||||
msg := &event.StatusUpdateEvent{Event: event.NewEvent(reqID), OrderID: orderID, Status: status}
|
||||
amqp.Publish(s.ebCh, "api-events", "order.email.statusUpdate", msg)
|
||||
|
||||
return orderID, nil
|
||||
}
|
@ -1,31 +0,0 @@
|
||||
package ui
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
def "git.pbiernat.dev/egommerce/api-entities/http"
|
||||
"git.pbiernat.dev/egommerce/api-entities/model"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/service"
|
||||
)
|
||||
|
||||
// FIXME: REFACTOR !!
|
||||
func CreateOrder(srv *service.OrderService, basketID, reqID string) (*model.OrderModel, error) { // FIXME: model.Order
|
||||
ctx := context.Background()
|
||||
order, err := srv.CreateOrder(ctx, basketID)
|
||||
if err != nil {
|
||||
srv.Log("UI CreateOrder error: %v\n", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return order, nil
|
||||
}
|
||||
|
||||
func UpdateOrderStatus(srv *service.OrderService, orderID, status, reqID string) (*def.UpdateOrderStatusResponse, error) {
|
||||
res := &def.UpdateOrderStatusResponse{}
|
||||
_, err := srv.UpdateOrderStatus(reqID, orderID, status)
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
@ -1,11 +1,13 @@
|
||||
package event
|
||||
|
||||
type Event struct {
|
||||
Command string `json:"command"`
|
||||
RequestID string `json:"request_id"`
|
||||
}
|
||||
|
||||
func NewEvent(reqID string) *Event {
|
||||
func NewEvent(command, reqID string) *Event {
|
||||
em := new(Event)
|
||||
em.Command = command
|
||||
em.RequestID = reqID
|
||||
|
||||
return em
|
111
src/internal/server/config.go
Normal file
111
src/internal/server/config.go
Normal file
@ -0,0 +1,111 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
||||
)
|
||||
|
||||
const (
|
||||
defName = "order-svc"
|
||||
defDomain = "order-svc"
|
||||
defCacheAddr = "egommerce.local:6379"
|
||||
defCachePassword = "12345678"
|
||||
defDbURL = "postgres://postgres:12345678@db-postgres:5432/egommerce"
|
||||
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
|
||||
defKVNmspc = "dev.egommerce/service/order"
|
||||
defLoggerAddr = "api-logger:24224"
|
||||
defNetAddr = ":80"
|
||||
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
||||
defPathPrefix = "/order"
|
||||
defRegistryAddr = "api-registry:8500"
|
||||
defEbEventsExchange = "api-events"
|
||||
defEbEventsQueue = "order-svc"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
ID string
|
||||
Name string
|
||||
Domain string
|
||||
NetAddr string
|
||||
RegistryDomainOverIP string
|
||||
PathPrefix string
|
||||
|
||||
IdleTimeout time.Duration // miliseconds
|
||||
ReadTimeout time.Duration // miliseconds
|
||||
WriteTimeout time.Duration // miliseconds
|
||||
|
||||
LoggerAddr string `json:"logger_addr"`
|
||||
DbURL string `json:"db_url"`
|
||||
CacheAddr string `json:"cache_addr"`
|
||||
CachePassword string `json:"cache_password"`
|
||||
MongoDbUrl string `json:"mongodb_url"`
|
||||
EventBusURL string `json:"eventbus_url"`
|
||||
EventBusExchange string `json:"eventbus_exchange"`
|
||||
EventBusQueue string `json:"eventbus_queue"`
|
||||
KVNamespace string
|
||||
RegistryAddr string
|
||||
|
||||
// Fields with JSON mappings are available through Consul KV storage
|
||||
}
|
||||
|
||||
func NewConfig(name string) *Config {
|
||||
c := new(Config)
|
||||
|
||||
c.ID, _ = os.Hostname()
|
||||
c.Name = name
|
||||
c.Domain = cnf.GetEnv("APP_DOMAIN", defDomain)
|
||||
c.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr)
|
||||
c.RegistryDomainOverIP = cnf.GetEnv("REGISTRY_USE_DOMAIN_OVER_IP", "false")
|
||||
c.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix)
|
||||
|
||||
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
|
||||
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
|
||||
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
|
||||
c.EventBusExchange = defEbEventsExchange
|
||||
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
|
||||
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
||||
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
||||
c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Config) GetAppFullName() string {
|
||||
return fmt.Sprintf("%s_%s", c.Name, c.ID)
|
||||
}
|
||||
|
||||
func (c *Config) GetIP() string {
|
||||
host, _ := os.Hostname()
|
||||
ips, _ := net.LookupIP(host)
|
||||
// for _, ip := range ips {
|
||||
// return ip.String()
|
||||
// }
|
||||
|
||||
return ips[0].String()
|
||||
}
|
||||
|
||||
func (c *Config) GetArray() map[string]string { // FIXME fix types etc
|
||||
arr := make(map[string]string)
|
||||
arr["id"] = c.ID
|
||||
arr["name"] = c.Name
|
||||
arr["appFullname"] = c.GetAppFullName()
|
||||
arr["domain"] = c.Domain
|
||||
arr["ip"] = c.GetIP()
|
||||
arr["netAddr"] = c.NetAddr
|
||||
arr["registryDomainOverIP"] = c.RegistryDomainOverIP
|
||||
arr["pathPrefix"] = c.PathPrefix
|
||||
arr["cacheAddr"] = c.CacheAddr
|
||||
arr["cachePassword"] = c.CachePassword
|
||||
arr["dbURL"] = c.DbURL
|
||||
arr["eventBusExchange"] = c.EventBusExchange
|
||||
arr["eventBusURL"] = c.EventBusURL
|
||||
arr["kvNamespace"] = c.KVNamespace
|
||||
arr["loggerAddr"] = c.LoggerAddr
|
||||
arr["registryAddr"] = c.RegistryAddr
|
||||
|
||||
return arr
|
||||
}
|
@ -3,7 +3,7 @@ package server
|
||||
import (
|
||||
"github.com/gofiber/fiber/v2"
|
||||
|
||||
def "git.pbiernat.dev/egommerce/api-entities/http"
|
||||
def "git.pbiernat.io/egommerce/api-entities/http"
|
||||
)
|
||||
|
||||
func (s *Server) HealthHandler(c *fiber.Ctx) error {
|
||||
@ -13,5 +13,5 @@ func (s *Server) HealthHandler(c *fiber.Ctx) error {
|
||||
}
|
||||
|
||||
func (s *Server) ConfigHandler(c *fiber.Ctx) error {
|
||||
return c.JSON(s.conf)
|
||||
return c.JSON(s.Config)
|
||||
}
|
32
src/internal/server/middleware.go
Normal file
32
src/internal/server/middleware.go
Normal file
@ -0,0 +1,32 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/gofiber/fiber/v2"
|
||||
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
||||
)
|
||||
|
||||
// "github.com/gofiber/fiber/v2"
|
||||
// "github.com/gofiber/fiber/v2/middleware/cors"
|
||||
|
||||
func SetupMiddleware(s *Server) {
|
||||
s.Use(LoggingMiddleware(s.GetLogger()))
|
||||
}
|
||||
|
||||
func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error {
|
||||
return func(c *fiber.Ctx) error {
|
||||
path := string(c.Request().URI().Path())
|
||||
if strings.Contains(path, "/health") {
|
||||
return c.Next()
|
||||
}
|
||||
|
||||
log.Log("Request: %s, remote: %s, via: %s",
|
||||
c.Request().URI().String(),
|
||||
c.Context().RemoteIP().String(),
|
||||
string(c.Context().UserAgent()))
|
||||
|
||||
return c.Next()
|
||||
}
|
||||
}
|
@ -5,28 +5,28 @@ import (
|
||||
|
||||
"github.com/gofiber/fiber/v2"
|
||||
|
||||
def "git.pbiernat.dev/egommerce/api-entities/http"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/service"
|
||||
"git.pbiernat.dev/egommerce/order-service/internal/app/ui"
|
||||
def "git.pbiernat.io/egommerce/api-entities/http"
|
||||
"git.pbiernat.io/egommerce/order-service/internal/service"
|
||||
"git.pbiernat.io/egommerce/order-service/internal/ui"
|
||||
)
|
||||
|
||||
func (s *Server) UpdateOrderStatusHandler(c *fiber.Ctx) error {
|
||||
reqID, _ := s.GetRequestID(c)
|
||||
req := new(def.UpdateOrderStatusRequest)
|
||||
if err := c.BodyParser(req); err != nil {
|
||||
return s.Error400(c, err.Error())
|
||||
return s.Error(c, 400, err.Error())
|
||||
}
|
||||
|
||||
// check if order exists in DB service...
|
||||
orderID := c.Params("orderId", "")
|
||||
if orderID == "" {
|
||||
return s.Error400(c, "")
|
||||
return s.Error(c, 400, "Empty orderId")
|
||||
}
|
||||
|
||||
orderSrv := service.NewOrderService(s.db, s.cache, s.ebCh, s.log)
|
||||
orderSrv := service.NewOrderService(s.GetDatabase(), s.GetCache(), s.GetEventBus(), s.GetLogger())
|
||||
_, err := ui.UpdateOrderStatus(orderSrv, req.Status, orderID, reqID)
|
||||
if err != nil {
|
||||
return s.Error400(c, "Failed to update order status")
|
||||
return s.Error(c, 400, "Failed to update order status")
|
||||
}
|
||||
|
||||
return c.SendStatus(http.StatusNoContent)
|
27
src/internal/server/router.go
Normal file
27
src/internal/server/router.go
Normal file
@ -0,0 +1,27 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"github.com/gofiber/fiber/v2/middleware/cors"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultCORS = cors.New(cors.Config{
|
||||
AllowOrigins: "*",
|
||||
// AllowCredentials: true,
|
||||
AllowMethods: "GET, POST, PATCH, PUT, DELETE, OPTIONS",
|
||||
AllowHeaders: "Accept, Authorization, Content-Type, Vary, X-Request-Id",
|
||||
})
|
||||
)
|
||||
|
||||
func SetupRouter(s *Server) {
|
||||
s.Options("*", defaultCORS)
|
||||
s.Use(defaultCORS)
|
||||
|
||||
s.Get("/health", s.HealthHandler)
|
||||
s.Get("/config", s.ConfigHandler)
|
||||
|
||||
api := s.Group("/api")
|
||||
v1 := api.Group("/v1")
|
||||
order := v1.Group("/order")
|
||||
order.Put("/:orderId/status", s.UpdateOrderStatusHandler)
|
||||
}
|
136
src/internal/server/server.go
Normal file
136
src/internal/server/server.go
Normal file
@ -0,0 +1,136 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
|
||||
"git.pbiernat.io/egommerce/api-entities/http"
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/consul"
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
||||
)
|
||||
|
||||
type (
|
||||
Server struct {
|
||||
*fiber.App
|
||||
|
||||
ID string
|
||||
addr string // e.g. "127.0.0.1:80"
|
||||
handlers map[string]any
|
||||
}
|
||||
HeaderRequestID struct {
|
||||
RequestID string `reqHeader:"x-request-id"`
|
||||
}
|
||||
)
|
||||
|
||||
func New(c *Config) *Server {
|
||||
return &Server{
|
||||
ID: c.ID,
|
||||
App: fiber.New(fiber.Config{
|
||||
AppName: c.ID,
|
||||
ServerHeader: c.Name + ":" + c.ID,
|
||||
ReadTimeout: c.ReadTimeout * time.Millisecond,
|
||||
WriteTimeout: c.WriteTimeout * time.Millisecond,
|
||||
IdleTimeout: c.IdleTimeout * time.Millisecond,
|
||||
}),
|
||||
addr: c.NetAddr,
|
||||
handlers: make(map[string]any),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) Start() error {
|
||||
SetupMiddleware(s)
|
||||
SetupRouter(s)
|
||||
|
||||
// fmt.Printf("Starting server at: %s...\n", s.addr)
|
||||
ln, _ := net.Listen("tcp", s.addr)
|
||||
// ln = tls.NewListener(ln, s.App.Server().TLSConfig)
|
||||
|
||||
return s.Listener(ln)
|
||||
}
|
||||
|
||||
func (s *Server) RegisterHandler(name string, fn func() any) {
|
||||
// fmt.Printf("Registering plugin( with handler): %s... OK\n", name)
|
||||
s.handlers[name] = fn()
|
||||
}
|
||||
|
||||
func (s *Server) OnShutdown() {
|
||||
// s.GetLogger().Log("Server %s is going down...", s.ID)
|
||||
|
||||
s.GetRegistry().Unregister()
|
||||
// a.clearMetadataCache()
|
||||
s.GetEventBus().Close()
|
||||
s.GetDatabase().Close()
|
||||
s.GetLogger().Log("Gone.")
|
||||
s.GetLogger().Close()
|
||||
|
||||
s.Shutdown()
|
||||
}
|
||||
|
||||
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
|
||||
var hdr = new(HeaderRequestID)
|
||||
if err := c.ReqHeaderParser(hdr); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return hdr.RequestID, nil
|
||||
}
|
||||
|
||||
func (s *Server) Error(c *fiber.Ctx, code int, msg string) error {
|
||||
return c.Status(code).JSON(http.ErrorResponse{Error: msg})
|
||||
}
|
||||
|
||||
// Plugin helper funcitons
|
||||
func (s *Server) GetCache() *redis.Client {
|
||||
return (s.handlers["cache"]).(*redis.Client)
|
||||
}
|
||||
|
||||
func (s *Server) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue
|
||||
return (s.handlers["database"]).(*pgxpool.Pool)
|
||||
}
|
||||
|
||||
func (s *Server) GetEventBus() *amqp.Channel {
|
||||
return (s.handlers["eventbus"]).(*amqp.Channel)
|
||||
}
|
||||
|
||||
func (s *Server) GetLogger() *fluentd.Logger {
|
||||
return (s.handlers["logger"]).(*fluentd.Logger)
|
||||
}
|
||||
|
||||
func (s *Server) GetRegistry() *consul.Service {
|
||||
return (s.handlers["registry"]).(*consul.Service)
|
||||
}
|
||||
|
||||
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
|
||||
// func (s *Server) registerKVUpdater() { // @FIXME: merge duplication in server.go and worker.go
|
||||
// go func() {
|
||||
// ticker := time.NewTicker(time.Second * 10)
|
||||
// for range ticker.C {
|
||||
// config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil)
|
||||
// if err != nil || config == nil {
|
||||
// return
|
||||
// }
|
||||
|
||||
// kvCnf := bytes.NewBuffer(config.Value)
|
||||
// decoder := json.NewDecoder(kvCnf)
|
||||
// if err := decoder.Decode(&s.Config); err != nil {
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
// }()
|
||||
// }
|
||||
|
||||
// func (s *Server) clearMetadataCache() {
|
||||
// ctx := context.Background()
|
||||
// key, address := s.getMetadataIPsKey(), s.Config.Base.AppID
|
||||
|
||||
// s.Cache.LRem(ctx, key, 0, address)
|
||||
// }
|
||||
|
||||
// func (s *Server) getMetadataIPsKey() string {
|
||||
// return "internal__" + s.Base.Config.AppName + "__ips"
|
||||
// }
|
93
src/internal/service/order.go
Normal file
93
src/internal/service/order.go
Normal file
@ -0,0 +1,93 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.pbiernat.io/egommerce/api-entities/model"
|
||||
"git.pbiernat.io/egommerce/order-service/internal/event"
|
||||
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/api"
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/rabbitmq"
|
||||
|
||||
"github.com/georgysavva/scany/v2/pgxscan"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
const (
|
||||
SERVICE_USER_AGENT = "order-httpclient"
|
||||
)
|
||||
|
||||
type OrderService struct {
|
||||
dbConn *pgxpool.Pool
|
||||
redis *redis.Client
|
||||
ebCh *amqp.Channel
|
||||
log *fluentd.Logger
|
||||
}
|
||||
|
||||
func NewOrderService(dbConn *pgxpool.Pool, redis *redis.Client, chn *amqp.Channel, log *fluentd.Logger) *OrderService {
|
||||
return &OrderService{dbConn, redis, chn, log}
|
||||
}
|
||||
|
||||
func (s *OrderService) Log(format string, val ...any) {
|
||||
s.log.Log(format, val...)
|
||||
}
|
||||
|
||||
func (s *OrderService) GetOrder(ctx context.Context, id string) (*model.OrderModel, error) {
|
||||
order := new(model.OrderModel)
|
||||
err := pgxscan.Get(ctx, s.dbConn, order, `SELECT id, state, created_at, updated_at FROM ordering."order" WHERE id=$1`, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return order, nil
|
||||
}
|
||||
|
||||
func (s *OrderService) CreateOrder(ctx context.Context, basketID string) (*model.OrderModel, error) {
|
||||
basketAPI := api.NewBasketAPI(SERVICE_USER_AGENT, s.redis)
|
||||
basket, err := basketAPI.GetBasket(basketID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
order := new(model.OrderModel)
|
||||
order.ID = basketID
|
||||
order.State = basket.State // FIXME: are the same status?
|
||||
|
||||
sql := `INSERT INTO "ordering"."order"(id,state) VALUES($1,$2) RETURNING id`
|
||||
if err := s.dbConn.QueryRow(ctx, sql, order.ID, order.State).Scan(&order.ID); err != nil {
|
||||
return order, err
|
||||
}
|
||||
|
||||
items, err := basketAPI.GetBasketItems(basket.ID)
|
||||
if err != nil {
|
||||
return order, err
|
||||
}
|
||||
|
||||
for _, item := range items {
|
||||
sql := `INSERT INTO ordering.order_item(order_id,product_id,price,quantity) VALUES($1,$2,$3,$4)`
|
||||
if _, err := s.dbConn.Exec(ctx, sql, order.ID, item.ProductID, item.Price, item.Quantity); err != nil {
|
||||
return order, err
|
||||
}
|
||||
}
|
||||
|
||||
return order, nil
|
||||
}
|
||||
|
||||
// func (s *OrderService) UpdateOrder(ctx context.Context, orderID string, data any) (*model.OrderModel, error) {
|
||||
// }
|
||||
|
||||
func (s *OrderService) UpdateOrderStatus(reqID, orderID, status string) (string, error) {
|
||||
s.log.Log("Update order#%s status to %s", orderID, status)
|
||||
|
||||
msg := &event.StatusUpdateEvent{
|
||||
Event: event.NewEvent("UpdateOrderStatus", reqID),
|
||||
OrderID: orderID,
|
||||
Status: status,
|
||||
}
|
||||
rabbitmq.Publish(s.ebCh, "api-events", "order.email.statusUpdate", msg)
|
||||
|
||||
return orderID, nil
|
||||
}
|
37
src/internal/ui/order.go
Normal file
37
src/internal/ui/order.go
Normal file
@ -0,0 +1,37 @@
|
||||
package ui
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
def "git.pbiernat.io/egommerce/api-entities/http"
|
||||
"git.pbiernat.io/egommerce/api-entities/model"
|
||||
"git.pbiernat.io/egommerce/order-service/internal/service"
|
||||
)
|
||||
|
||||
func CreateOrder(srv *service.OrderService, orderID, reqID string) (*model.OrderModel, error) { // FIXME: model.Order
|
||||
ctx := context.Background()
|
||||
order, _ := srv.GetOrder(ctx, orderID)
|
||||
if order != nil {
|
||||
srv.Log("order#%s already exists. %v", order.ID, order)
|
||||
return nil, fmt.Errorf("order#%s already exists", order.ID)
|
||||
}
|
||||
|
||||
order, err := srv.CreateOrder(ctx, orderID)
|
||||
if err != nil {
|
||||
srv.Log("Failed to create an Order: %s. :: %v\n", err, order)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return order, nil
|
||||
}
|
||||
|
||||
func UpdateOrderStatus(srv *service.OrderService, orderID, status, reqID string) (*def.UpdateOrderStatusResponse, error) {
|
||||
res := &def.UpdateOrderStatusResponse{}
|
||||
_, err := srv.UpdateOrderStatus(reqID, orderID, status)
|
||||
if err != nil {
|
||||
return res, err
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
54
src/internal/worker/command.go
Normal file
54
src/internal/worker/command.go
Normal file
@ -0,0 +1,54 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"git.pbiernat.io/egommerce/order-service/internal/service"
|
||||
)
|
||||
|
||||
var (
|
||||
CheckoutBasket = "event.CheckoutBasket"
|
||||
)
|
||||
|
||||
type Command interface {
|
||||
run(CommandData) (bool, any)
|
||||
}
|
||||
|
||||
type CommandData map[string]interface{}
|
||||
|
||||
type CommandRunner struct {
|
||||
cmd Command
|
||||
}
|
||||
|
||||
func NewCommandRunner(data map[string]interface{}, srvc *service.OrderService) *CommandRunner {
|
||||
rnr := &CommandRunner{}
|
||||
rnr.cmd = getCommand((data["command"]).(string), srvc)
|
||||
|
||||
return rnr
|
||||
}
|
||||
|
||||
func getCommand(cmd string, srvc *service.OrderService) Command {
|
||||
// fmt.Printf("getCommand: %v\n", cmd)
|
||||
var c Command
|
||||
|
||||
switch cmd { // FIXME
|
||||
case "CheckoutBasket":
|
||||
c = &CheckoutBasketCommand{srvc}
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (r *CommandRunner) run(data CommandData) (bool, any) {
|
||||
return r.cmd.run(data)
|
||||
}
|
||||
|
||||
type CheckoutBasketCommand struct {
|
||||
srvc *service.OrderService // FIXME: Remove service dep
|
||||
}
|
||||
|
||||
func (c *CheckoutBasketCommand) run(data CommandData) (bool, any) { // FIXME: Move run func to WorkerPool
|
||||
// reqID := data["request_id"].(string) // FIXME Check input params!
|
||||
// basketID := data["basket_id"].(string) // FIXME Check input params!
|
||||
|
||||
// order, err := ui.CreateOrder(c.srvc, basketID, reqID)
|
||||
return true, nil //err == nil, basket
|
||||
}
|
85
src/internal/worker/config.go
Normal file
85
src/internal/worker/config.go
Normal file
@ -0,0 +1,85 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
|
||||
cnf "git.pbiernat.io/egommerce/go-api-pkg/config"
|
||||
)
|
||||
|
||||
const (
|
||||
defName = "order-worker"
|
||||
defDomain = "order-worker"
|
||||
defCacheAddr = "egommerce.local:6379"
|
||||
defCachePassword = "12345678"
|
||||
defDbURL = "postgres://postgres:12345678@db-postgres:5432/egommerce"
|
||||
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
|
||||
defKVNmspc = "dev.egommerce/service/order-worker"
|
||||
defLoggerAddr = "api-logger:24224"
|
||||
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
||||
defEbEventsExchange = "api-events"
|
||||
defEbEventsQueue = "order-svc"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
ID string
|
||||
Name string
|
||||
|
||||
LoggerAddr string `json:"logger_addr"`
|
||||
DbURL string `json:"db_url"`
|
||||
CacheAddr string `json:"cache_addr"`
|
||||
CachePassword string `json:"cache_password"`
|
||||
MongoDbUrl string `json:"mongodb_url"`
|
||||
EventBusURL string `json:"eventbus_url"`
|
||||
EventBusExchange string `json:"eventbus_exchange"`
|
||||
EventBusQueue string `json:"eventbus_queue"`
|
||||
KVNamespace string
|
||||
}
|
||||
|
||||
func NewConfig(name string) *Config {
|
||||
c := new(Config)
|
||||
|
||||
c.ID, _ = os.Hostname()
|
||||
c.Name = name
|
||||
|
||||
c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr)
|
||||
c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword)
|
||||
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
|
||||
c.EventBusExchange = defEbEventsExchange
|
||||
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
|
||||
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
||||
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *Config) GetAppFullName() string {
|
||||
return fmt.Sprintf("%s_%s", c.Name, c.ID)
|
||||
}
|
||||
|
||||
func (c *Config) GetIP() string {
|
||||
host, _ := os.Hostname()
|
||||
ips, _ := net.LookupIP(host)
|
||||
// for _, ip := range ips {
|
||||
// return ip.String()
|
||||
// }
|
||||
|
||||
return ips[0].String()
|
||||
}
|
||||
|
||||
func (c *Config) GetArray() map[string]string { // FIXME fix types etc
|
||||
arr := make(map[string]string)
|
||||
arr["id"] = c.ID
|
||||
arr["name"] = c.Name
|
||||
arr["appFullname"] = c.GetAppFullName()
|
||||
arr["cacheAddr"] = c.CacheAddr
|
||||
arr["cachePassword"] = c.CachePassword
|
||||
arr["dbURL"] = c.DbURL
|
||||
arr["eventBusExchange"] = c.EventBusExchange
|
||||
arr["eventBusURL"] = c.EventBusURL
|
||||
arr["kvNamespace"] = c.KVNamespace
|
||||
arr["loggerAddr"] = c.LoggerAddr
|
||||
|
||||
return arr
|
||||
}
|
133
src/internal/worker/ext.go
Normal file
133
src/internal/worker/ext.go
Normal file
@ -0,0 +1,133 @@
|
||||
package worker
|
||||
|
||||
// import (
|
||||
// "bytes"
|
||||
// "encoding/json"
|
||||
// "os"
|
||||
// "time"
|
||||
|
||||
// "git.pbiernat.io/egommerce/go-api-pkg/consul"
|
||||
// "git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
||||
// "git.pbiernat.io/egommerce/go-api-pkg/rabbitmq"
|
||||
// cnf "git.pbiernat.io/egommerce/order-service/internal/config"
|
||||
// "git.pbiernat.io/egommerce/order-service/pkg/database"
|
||||
// "github.com/go-redis/redis/v8"
|
||||
// )
|
||||
|
||||
// func WithCache(c *cnf.Config) OptionFn {
|
||||
// return func(w *Worker) error {
|
||||
// conn := redis.NewClient(&redis.Options{
|
||||
// Addr: c.CacheAddr,
|
||||
// Password: c.CachePassword,
|
||||
// DB: 0,
|
||||
// })
|
||||
|
||||
// w.Cache = conn
|
||||
|
||||
// return nil
|
||||
// }
|
||||
// }
|
||||
|
||||
// func WithDatabase(c *cnf.Config) OptionFn {
|
||||
// return func(w *Worker) error {
|
||||
// conn, err := database.Connect(c.DbURL)
|
||||
// if err != nil {
|
||||
// w.Logger.Log("Failed to connect to Database server: %v\n", err)
|
||||
// os.Exit(1)
|
||||
// }
|
||||
|
||||
// w.Database = conn
|
||||
|
||||
// return nil
|
||||
// }
|
||||
// }
|
||||
|
||||
// func WithEventbus(c *cnf.Config) OptionFn {
|
||||
// return func(w *Worker) error {
|
||||
// _, chn, err := rabbitmq.Open(c.EventBusURL)
|
||||
// if err != nil {
|
||||
// w.Logger.Log("Failed to connect to EventBus server: %v\n", err)
|
||||
// os.Exit(1)
|
||||
// }
|
||||
|
||||
// err = rabbitmq.NewExchange(chn, c.EventBusExchange)
|
||||
// if err != nil {
|
||||
// w.Logger.Log("Failed to declare EventBus exchange: %v\n", err)
|
||||
// os.Exit(1)
|
||||
// }
|
||||
|
||||
// _, err = chn.QueueDeclare(
|
||||
// c.EventBusQueue, // name
|
||||
// false, // durable
|
||||
// false, // delete when unused
|
||||
// false, // exclusive
|
||||
// false, // no-wait
|
||||
// nil, // arguments
|
||||
// )
|
||||
// if err != nil {
|
||||
// w.Logger.Log("Failed to declare EventBus queue: %v\n", err)
|
||||
// os.Exit(1)
|
||||
// }
|
||||
|
||||
// // w.bindQueues()
|
||||
// rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "basket.order.basketCheckout")
|
||||
|
||||
// w.Eventbus = chn
|
||||
|
||||
// return nil
|
||||
// }
|
||||
// }
|
||||
|
||||
// func WithLogger(c *cnf.Config) OptionFn {
|
||||
// return func(w *Worker) error {
|
||||
// logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
|
||||
// if err != nil {
|
||||
// w.Logger.Log("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err)
|
||||
// os.Exit(1)
|
||||
// }
|
||||
|
||||
// logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort)
|
||||
// if err != nil {
|
||||
// w.Logger.Log("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err)
|
||||
// os.Exit(1)
|
||||
// }
|
||||
|
||||
// w.Logger = logger
|
||||
|
||||
// return nil
|
||||
// }
|
||||
// }
|
||||
|
||||
// func WithRegistry(c *cnf.Config) OptionFn {
|
||||
// return func(w *Worker) error {
|
||||
// registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0)
|
||||
// if err != nil {
|
||||
// w.Logger.Log("Error connecting to %s: %v", c.RegistryAddr, err)
|
||||
// }
|
||||
|
||||
// w.Registry = registry
|
||||
|
||||
// go func(w *Worker) { // Fetch Consul KV config and store it in app config
|
||||
// ticker := time.NewTicker(time.Second * 15)
|
||||
// for range ticker.C {
|
||||
// fetchKVConfig(w) // FIXME: duplicated in server
|
||||
// }
|
||||
// }(w)
|
||||
|
||||
// return nil
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
// func fetchKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go
|
||||
// config, _, err := w.Registry.KV().Get(w.Config.KVNamespace, nil)
|
||||
// if err != nil || config == nil {
|
||||
// return
|
||||
// }
|
||||
|
||||
// kvCnf := bytes.NewBuffer(config.Value)
|
||||
// decoder := json.NewDecoder(kvCnf)
|
||||
// if err := decoder.Decode(&w.Config); err != nil {
|
||||
// return
|
||||
// }
|
||||
// }
|
206
src/internal/worker/worker.go
Normal file
206
src/internal/worker/worker.go
Normal file
@ -0,0 +1,206 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
||||
"git.pbiernat.io/egommerce/go-api-pkg/rabbitmq"
|
||||
|
||||
"git.pbiernat.io/egommerce/order-service/internal/event"
|
||||
"git.pbiernat.io/egommerce/order-service/internal/service"
|
||||
)
|
||||
|
||||
type (
|
||||
Worker struct {
|
||||
ID string
|
||||
cnf *Config
|
||||
handlers map[string]any
|
||||
services map[string]any
|
||||
doWrkUntil chan struct{}
|
||||
}
|
||||
)
|
||||
|
||||
func New(c *Config) *Worker {
|
||||
return &Worker{
|
||||
ID: c.ID,
|
||||
cnf: c,
|
||||
handlers: make(map[string]any),
|
||||
services: make(map[string]any),
|
||||
doWrkUntil: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) Start() error {
|
||||
// Init
|
||||
err := rabbitmq.NewExchange(w.GetEventBus(), w.cnf.EventBusExchange)
|
||||
if err != nil {
|
||||
w.GetLogger().Log("Failed to declare EventBus exchange: %v\n", err)
|
||||
fmt.Printf("Failed to declare EventBus exchange: %v\n", err)
|
||||
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
_, err = w.GetEventBus().QueueDeclare(
|
||||
w.cnf.EventBusQueue, // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
if err != nil {
|
||||
w.GetLogger().Log("Failed to declare EventBus queue: %v\n", err)
|
||||
fmt.Printf("Failed to declare EventBus queue: %v\n", err)
|
||||
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// // w.bindQueues()
|
||||
rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "basket.order.basketCheckout")
|
||||
|
||||
err = w.doWork(w.doWrkUntil)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.ID, err)
|
||||
close(w.doWrkUntil)
|
||||
}
|
||||
<-w.doWrkUntil
|
||||
|
||||
return err
|
||||
|
||||
// go func() {
|
||||
// sigint := make(chan os.Signal, 1)
|
||||
// signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
// <-sigint
|
||||
|
||||
// w.Shutdown()
|
||||
// close(while)
|
||||
// }()
|
||||
|
||||
// run := w.createRunFile("./app.run") // TODO move to common library (shared between server and worker)
|
||||
// defer w.removeRunFile(run)
|
||||
|
||||
// w.Logger.Log("Waiting for messages...")
|
||||
|
||||
// return nil
|
||||
}
|
||||
|
||||
func (w *Worker) RegisterHandler(name string, fn func() any) {
|
||||
// fmt.Printf("Registering plugin( with handler): %s... OK\n", name)
|
||||
w.handlers[name] = fn()
|
||||
}
|
||||
|
||||
func (w *Worker) OnShutdown() {
|
||||
w.GetLogger().Log("Worker %s is going down...", w.ID)
|
||||
// fmt.Printf("Worker %s is going down...\n", w.ID)
|
||||
w.GetEventBus().Close()
|
||||
w.GetDatabase().Close()
|
||||
w.GetLogger().Log("Gone.")
|
||||
w.GetLogger().Close()
|
||||
|
||||
close(w.doWrkUntil)
|
||||
}
|
||||
|
||||
// Plugin helper funcitons
|
||||
func (w *Worker) GetCache() *redis.Client {
|
||||
return (w.handlers["cache"]).(*redis.Client)
|
||||
}
|
||||
|
||||
func (w *Worker) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue
|
||||
return (w.handlers["database"]).(*pgxpool.Pool)
|
||||
}
|
||||
|
||||
func (w *Worker) GetEventBus() *amqp.Channel {
|
||||
return (w.handlers["eventbus"]).(*amqp.Channel)
|
||||
}
|
||||
|
||||
func (w *Worker) GetLogger() *fluentd.Logger {
|
||||
return (w.handlers["logger"]).(*fluentd.Logger)
|
||||
}
|
||||
|
||||
func (w *Worker) doWork(while chan struct{}) error {
|
||||
w.services["order"] =
|
||||
service.NewOrderService(w.GetDatabase(), w.GetCache(), w.GetEventBus(), w.GetLogger())
|
||||
|
||||
oSrv := (w.services["order"]).(*service.OrderService)
|
||||
|
||||
msgs, err := w.GetEventBus().Consume(
|
||||
w.cnf.EventBusQueue, // queue
|
||||
"", // consumer
|
||||
false, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
if err != nil {
|
||||
w.GetLogger().Log("Failed to register a consumer: %s", err)
|
||||
os.Exit(1)
|
||||
//close(while)
|
||||
}
|
||||
|
||||
go func() {
|
||||
for d := range msgs {
|
||||
// go func(d amqp.Delivery) {
|
||||
w.processMsg(oSrv, d)
|
||||
// }(d)
|
||||
}
|
||||
}()
|
||||
<-while
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Worker) processMsg(srvc *service.OrderService, m amqp.Delivery) {
|
||||
msg, err := rabbitmq.Deserialize(m.Body)
|
||||
if err != nil {
|
||||
w.GetLogger().Log("Deserialization error: %v\n", err)
|
||||
m.Reject(false)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
name := fmt.Sprintf("%s", msg["event"])
|
||||
data := (msg["data"]).(map[string]interface{})
|
||||
// reqID := (data["request_id"]).(string) // FIXME Check input params!
|
||||
|
||||
w.GetLogger().Log("Processing message \"%s\" with data: %v\n", name, data)
|
||||
|
||||
var ok = false
|
||||
switch true { // Refactor -> use case for polymorphism
|
||||
case strings.Contains(name, event.EVENT_BASKET_CHECKOUT):
|
||||
w.GetLogger().Log("Event: %s", event.EVENT_BASKET_CHECKOUT)
|
||||
}
|
||||
|
||||
rnr := NewCommandRunner(data, srvc)
|
||||
|
||||
// case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET):
|
||||
// basketID := data["basket_id"].(string) // FIXME Check input params!
|
||||
// productID := data["product_id"] // FIXME Check input params!
|
||||
|
||||
// rnr.cmd = &AddProductToBasketCommand{srvc}
|
||||
// w.Logger.Log("Adding product #%d to basket #%s. ReqID: #%s", productID, basketID, reqID)
|
||||
// case strings.Contains(name, event.EVENT_PRODUCT_REMOVED_FROM_BASKET):
|
||||
// basketID := data["basket_id"].(string)
|
||||
// productID := data["product_id"].(float64)
|
||||
|
||||
// rnr.cmd = &RemoveProductFromBasketCommand{srvc}
|
||||
// w.Logger.Log("Removing product #%d from basket #%s. ReqID: #%s", productID, basketID, reqID)
|
||||
// }
|
||||
|
||||
ok, _ = rnr.run(data)
|
||||
if ok {
|
||||
w.GetLogger().Log("Successful executed message \"%s\"\n", name)
|
||||
m.Ack(false)
|
||||
return
|
||||
}
|
||||
|
||||
w.GetLogger().Log("Error processing \"%s\": %s (%v)", name, err.Error(), err)
|
||||
m.Reject(false) // FIXME: or Nack(repeat until success - maybe message shout know...?
|
||||
}
|
@ -3,14 +3,14 @@ package database
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
func Connect(connStr string) (*pgxpool.Pool, error) {
|
||||
conn, err := pgxpool.Connect(context.Background(), connStr)
|
||||
pool, err := pgxpool.New(context.Background(), connStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
return pool, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user