Refactor & fixes

This commit is contained in:
Piotr Biernat 2024-04-17 19:13:59 +02:00
parent 9c40900d47
commit b3a25dee1c
30 changed files with 1102 additions and 406 deletions

View File

@ -12,5 +12,3 @@ CACHE_ADDR=api-cache:6379
CACHE_PASSWORD=12345678 CACHE_PASSWORD=12345678
MONGODB_URL=mongodb://mongodb:12345678@mongo-db:27017 MONGODB_URL=mongodb://mongodb:12345678@mongo-db:27017
EVENTBUS_URL=amqp://guest:guest@api-eventbus:5672 EVENTBUS_URL=amqp://guest:guest@api-eventbus:5672

View File

@ -5,6 +5,7 @@ ARG BIN_OUTPUT=/go/bin
ARG GO_SERVER=cmd/server/main.go ARG GO_SERVER=cmd/server/main.go
ARG GO_MIGRATE=cmd/migrate/main.go ARG GO_MIGRATE=cmd/migrate/main.go
ARG GO_WORKER=cmd/worker/main.go ARG GO_WORKER=cmd/worker/main.go
ARG GO_HEALTH=cmd/health/main.go
WORKDIR /go/src/app WORKDIR /go/src/app
COPY src ./ COPY src ./
@ -12,4 +13,5 @@ COPY src ./
RUN export CGO_ENABLED=0 ; export GOOS=linux ; export GOARCH=amd64 && \ RUN export CGO_ENABLED=0 ; export GOOS=linux ; export GOARCH=amd64 && \
go build -ldflags="-w -s" -o "$BIN_OUTPUT/server" $GO_SERVER && \ go build -ldflags="-w -s" -o "$BIN_OUTPUT/server" $GO_SERVER && \
go build -ldflags="-w -s" -o "$BIN_OUTPUT/migrate" $GO_MIGRATE && \ go build -ldflags="-w -s" -o "$BIN_OUTPUT/migrate" $GO_MIGRATE && \
go build -ldflags="-w -s" -o "$BIN_OUTPUT/worker" $GO_WORKER go build -ldflags="-w -s" -o "$BIN_OUTPUT/worker" $GO_WORKER && \
go build -ldflags="-w -s" -o "$BIN_OUTPUT/health" $GO_HEALTH

View File

@ -19,12 +19,17 @@ LABEL dev.egommerce.image.build_time=${BUILD_TIME}
WORKDIR / WORKDIR /
COPY --from=builder $BIN_OUTPUT /app COPY --from=builder $BIN_OUTPUT /app
COPY --from=builder /go/bin/migrate /bin/go_migrate COPY --from=builder /go/bin/migrate /bin/migrate
COPY .env.dist /.env COPY --from=builder /go/bin/health /bin/health
COPY .env.docker /.env
COPY ./bin /bin COPY ./bin /bin
RUN chmod 755 /bin/entrypoint.sh /bin/migrate.sh RUN chmod 755 /bin/entrypoint.sh /bin/migrate.sh
RUN apk add curl
EXPOSE 80 EXPOSE 80
CMD ["sh", "-c", "/app"]
ENTRYPOINT ["entrypoint.sh"] ENTRYPOINT ["entrypoint.sh"]
CMD ["sh", "-c", "/app"]
HEALTHCHECK --interval=5s --timeout=1s --retries=20 CMD health >/dev/null || exit 1

View File

@ -14,15 +14,21 @@ waitForService()
done done
} }
waitForService "postgres-db:5432" update-resolv # provided by stack - better approach - single copy
waitForService "api-eventbus:5672" update-ca-certificates
waitForService "api-logger:24224"
waitForService "api-registry:8500" waitForService "api-registry:8500"
waitForService "postgres-db:5432"
# waitForService "postgres-db.service.ego.io:5432"
waitForService "api-eventbus:5672"
# waitForService "esb.service.ego.io:5672"
waitForService "api-logger:24224"
# waitForService "logger.service.ego.io:24224"
waitForService "pricing-svc:80" waitForService "pricing-svc:80"
# waitForService "pricing.service.ego.io:80"
# run migrations # run migrations
migrate.sh migrate.sh
# set -euo pipefail # set -euo pipefail
exec "$@" exec "$@"

View File

@ -1,17 +1,17 @@
#!/usr/bin/env sh #!/usr/bin/env sh
# ensure migrate env is initialized # ensure migrate env is initialized
$(go_migrate version >/dev/null 2>&1) $(migrate version >/dev/null 2>&1)
version=$? version=$?
if [ $version != "0" ] if [ $version != "0" ]
then then
echo "Creating base table..." echo "Creating base table..."
$(go_migrate init >/dev/null 2>&1) $(migrate init >/dev/null 2>&1)
init=$? init=$?
fi fi
# check again # check again
$(go_migrate version >/dev/null 2>&1) $(migrate version >/dev/null 2>&1)
version=$? version=$?
if [ $version != "0" ] if [ $version != "0" ]
then then
@ -20,7 +20,6 @@ then
fi fi
# run migrations # run migrations
go_migrate up migrate up
echo "Done."
exit $version exit $version

View File

@ -1,5 +1,7 @@
#!/usr/bin/env sh #!/usr/bin/env sh
# Use this script to test if a given TCP host/port are available
# https://github.com/vishnubob/wait-for-it/blob/master/wait-for-it.sh
# Use this script to test if a given TCP host/port are available
set -e set -e

View File

@ -11,3 +11,7 @@ echo $DOCKER_PASSWORD | docker login git.pbiernat.dev -u $DOCKER_USERNAME --pass
docker push "$SERVER_IMAGE:$TARGET" docker push "$SERVER_IMAGE:$TARGET"
docker push "$WORKER_IMAGE:$TARGET" docker push "$WORKER_IMAGE:$TARGET"
# Restart container
curl -X POST http://127.0.0.1:9001/api/webhooks/70892c11-b14e-4fb5-b813-ed0daade00d8
curl -X POST http://127.0.0.1:9001/api/webhooks/309e753d-641d-48ea-9256-a267f4f8b5b4

1
src/.gitignore vendored
View File

@ -13,5 +13,6 @@
*.out *.out
.env .env
# Dependency directories (remove the comment below to include it) # Dependency directories (remove the comment below to include it)
vendor/ vendor/

1
src/app.run Normal file
View File

@ -0,0 +1 @@
954751

39
src/cmd/health/main.go Normal file
View File

@ -0,0 +1,39 @@
package main
import (
"flag"
"fmt"
"os"
)
const usageText = `This program runs healthcheck on the app.
Usage:
go run cmd/health/main.go
`
func init() {
flag.Usage = func() {
fmt.Print(usageText)
flag.PrintDefaults()
os.Exit(2)
}
flag.Parse()
}
func main() {
var exitCode = 1
if isOk := healthCheck(); isOk {
exitCode = 0
}
os.Exit(exitCode)
}
func healthCheck() bool {
run, err := os.Open("/app.run")
if err != nil {
return false
}
defer run.Close()
return true
}

View File

@ -11,9 +11,8 @@ import (
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd" "git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
baseCnf "git.pbiernat.dev/egommerce/basket-service/pkg/config"
cnf "git.pbiernat.dev/egommerce/basket-service/internal/config" cnf "git.pbiernat.dev/egommerce/basket-service/internal/config"
baseCnf "git.pbiernat.dev/egommerce/go-api-pkg/config"
) )
const ( const (
@ -36,6 +35,13 @@ Usage:
` `
func main() { func main() {
flag.Usage = func() {
fmt.Print(usageText)
flag.PrintDefaults()
os.Exit(2)
}
flag.Parse()
if baseCnf.ErrLoadingEnvs != nil { if baseCnf.ErrLoadingEnvs != nil {
log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs) log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs)
} }
@ -53,10 +59,7 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err) log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err)
} }
// defer logger.Close() defer logger.Close()
flag.Usage = usage
flag.Parse()
db := pg.Connect(&pg.Options{ // FIXME db := pg.Connect(&pg.Options{ // FIXME
Addr: "postgres-db:5432", Addr: "postgres-db:5432",
@ -64,6 +67,7 @@ func main() {
Password: "12345678", Password: "12345678",
Database: "egommerce", Database: "egommerce",
}) })
defer db.Close()
mTbl := baseCnf.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName) mTbl := baseCnf.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName)
mig := migrations.NewCollection() mig := migrations.NewCollection()
@ -82,10 +86,5 @@ func main() {
} else { } else {
logger.Log("version is %d\n", oldVersion) logger.Log("version is %d\n", oldVersion)
} }
} // os.Exit(0)
func usage() {
fmt.Print(usageText)
flag.PrintDefaults()
os.Exit(2)
} }

View File

@ -4,7 +4,7 @@ import (
"log" "log"
"os" "os"
baseCnf "git.pbiernat.dev/egommerce/basket-service/pkg/config" baseCnf "git.pbiernat.dev/egommerce/go-api-pkg/config"
cnf "git.pbiernat.dev/egommerce/basket-service/internal/config" cnf "git.pbiernat.dev/egommerce/basket-service/internal/config"
svr "git.pbiernat.dev/egommerce/basket-service/internal/server" svr "git.pbiernat.dev/egommerce/basket-service/internal/server"
@ -15,7 +15,7 @@ func main() {
log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs) log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs)
} }
c := cnf.NewConfig("basket-server") c := cnf.NewConfig("basket")
srv := svr.New( srv := svr.New(
c, c,
svr.WithCache(c), svr.WithCache(c),
@ -26,7 +26,7 @@ func main() {
) )
while := make(chan struct{}) while := make(chan struct{})
err := srv.Base.Start(while, srv.Shutdown()) err := srv.Base.Start(while)
<-while <-while
if err != nil { if err != nil {

View File

@ -2,8 +2,9 @@ package main
import ( import (
"log" "log"
"os"
"git.pbiernat.dev/egommerce/basket-service/pkg/config" "git.pbiernat.dev/egommerce/go-api-pkg/config"
cnf "git.pbiernat.dev/egommerce/basket-service/internal/config" cnf "git.pbiernat.dev/egommerce/basket-service/internal/config"
"git.pbiernat.dev/egommerce/basket-service/internal/worker" "git.pbiernat.dev/egommerce/basket-service/internal/worker"
@ -25,6 +26,13 @@ func main() {
) )
while := make(chan struct{}) while := make(chan struct{})
wrk.Start(while) err := wrk.Start(while)
<-while <-while
if err != nil {
log.Fatalf("Failed to start worker. Reason: %v\n", err)
os.Exit(1)
}
os.Exit(0)
} }

View File

@ -4,48 +4,95 @@ go 1.18
require ( require (
git.pbiernat.dev/egommerce/api-entities v0.0.26 git.pbiernat.dev/egommerce/api-entities v0.0.26
git.pbiernat.dev/egommerce/go-api-pkg v0.0.153 git.pbiernat.dev/egommerce/go-api-pkg v0.1.66
github.com/georgysavva/scany/v2 v2.0.0 github.com/georgysavva/scany/v2 v2.0.0
github.com/go-pg/migrations/v8 v8.1.0 github.com/go-pg/migrations/v8 v8.1.0
github.com/go-pg/pg/v10 v10.10.7 github.com/go-pg/pg/v10 v10.10.7
github.com/go-redis/redis/v8 v8.11.5 github.com/go-redis/redis/v8 v8.11.5
github.com/gofiber/fiber/v2 v2.40.1 github.com/gofiber/fiber/v2 v2.40.1
github.com/jackc/pgx/v5 v5.1.1 github.com/jackc/pgx/v5 v5.1.1
github.com/joho/godotenv v1.4.0
github.com/streadway/amqp v1.0.0 github.com/streadway/amqp v1.0.0
) )
require ( require (
github.com/DataDog/datadog-go v3.2.0+incompatible // indirect
github.com/andybalholm/brotli v1.0.4 // indirect github.com/andybalholm/brotli v1.0.4 // indirect
github.com/armon/go-metrics v0.4.1 // indirect github.com/armon/go-metrics v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/armon/go-radix v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.42.34 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible // indirect
github.com/circonus-labs/circonusllhist v0.1.3 // indirect
github.com/cncf/xds/go v0.0.0-20230310173818-32f1caf87195 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fatih/color v1.13.0 // indirect github.com/envoyproxy/go-control-plane v0.11.0 // indirect
github.com/envoyproxy/protoc-gen-validate v0.10.0 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/fluent/fluent-logger-golang v1.9.0 // indirect github.com/fluent/fluent-logger-golang v1.9.0 // indirect
github.com/go-pg/zerochecker v0.2.0 // indirect github.com/go-pg/zerochecker v0.2.0 // indirect
github.com/hashicorp/consul/api v1.18.0 // indirect github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/hashicorp/consul v1.16.0 // indirect
github.com/hashicorp/consul-net-rpc v0.0.0-20221205195236-156cfab66a69 // indirect
github.com/hashicorp/consul/api v1.22.0 // indirect
github.com/hashicorp/consul/envoyextensions v0.3.0 // indirect
github.com/hashicorp/consul/sdk v0.14.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-bexpr v0.1.2 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.3.1 // indirect github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-retryablehttp v0.6.7 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/go-syslog v1.0.0 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/go-version v1.2.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/memberlist v0.5.0 // indirect
github.com/hashicorp/raft v1.5.0 // indirect
github.com/hashicorp/raft-autopilot v0.1.6 // indirect
github.com/hashicorp/serf v0.10.1 // indirect github.com/hashicorp/serf v0.10.1 // indirect
github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.13.0 // indirect github.com/jackc/pgtype v1.13.0 // indirect
github.com/jackc/puddle/v2 v2.1.2 // indirect github.com/jackc/puddle/v2 v2.1.2 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/klauspost/compress v1.15.9 // indirect github.com/klauspost/compress v1.15.9 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.41 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/go-testing-interface v1.14.0 // indirect
github.com/mitchellh/hashstructure v0.0.0-20170609045927-2bca23e0e452 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/philhofer/fwd v1.1.1 // indirect github.com/philhofer/fwd v1.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.3 // indirect
github.com/tinylib/msgp v1.1.6 // indirect github.com/tinylib/msgp v1.1.6 // indirect
github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc // indirect
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.41.0 // indirect github.com/valyala/fasthttp v1.41.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect
@ -54,9 +101,15 @@ require (
github.com/vmihailenco/tagparser v0.1.2 // indirect github.com/vmihailenco/tagparser v0.1.2 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.uber.org/atomic v1.10.0 // indirect go.uber.org/atomic v1.10.0 // indirect
golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 // indirect golang.org/x/crypto v0.1.0 // indirect
golang.org/x/sync v0.0.0-20220923202941-7f9b1623fab7 // indirect golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/sys v0.2.0 // indirect golang.org/x/net v0.10.0 // indirect
golang.org/x/text v0.3.8 // indirect golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
mellium.im/sasl v0.2.1 // indirect mellium.im/sasl v0.2.1 // indirect
) )

File diff suppressed because it is too large Load Diff

View File

@ -3,17 +3,19 @@ package common
import ( import (
"os" "os"
cnf "git.pbiernat.dev/egommerce/basket-service/pkg/config"
srv "git.pbiernat.dev/egommerce/basket-service/pkg/server" srv "git.pbiernat.dev/egommerce/basket-service/pkg/server"
cnf "git.pbiernat.dev/egommerce/go-api-pkg/config"
) )
const ( const (
// defAppDomain = "basket-svc" // defAppDomain = "basket-svc"
// defEventBusURL = "amqp://guest:guest@esb.service.ego.io:5672"
defAppName = "basket-svc" defAppName = "basket-svc"
defAppDomain = "basket-svc"
defCacheAddr = "api-cache:6379" defCacheAddr = "api-cache:6379"
defCachePassword = "12345678" defCachePassword = "12345678"
defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce" defDbURL = "postgres://postgres:12345678@db-postgres:5432/egommerce"
defEventBusURL = "amqp://guest:guest@api-gateway:5672" defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
defKVNmspc = "dev.egommerce/service/basket" defKVNmspc = "dev.egommerce/service/basket"
defLoggerAddr = "api-logger:24224" defLoggerAddr = "api-logger:24224"
defNetAddr = ":80" defNetAddr = ":80"
@ -27,13 +29,14 @@ const (
type Config struct { type Config struct {
Base *srv.Config Base *srv.Config
LoggerAddr string `json:"logger_addr"`
DbURL string `json:"db_url"` DbURL string `json:"db_url"`
CacheAddr string `json:"cache_addr"` CacheAddr string `json:"cache_addr"`
CachePassword string `json:"cache_password"` CachePassword string `json:"cache_password"`
MongoDbUrl string `json:"mongodb_url"`
EventBusURL string `json:"eventbus_url"`
EventBusExchange string `json:"eventbus_exchange"` EventBusExchange string `json:"eventbus_exchange"`
EventBusQueue string `json:"eventbus_queue"` EventBusQueue string `json:"eventbus_queue"`
EventBusURL string `json:"eventbus_url"`
LoggerAddr string `json:"logger_addr"`
KVNamespace string KVNamespace string
RegistryAddr string RegistryAddr string
@ -46,6 +49,7 @@ func NewConfig(name string) *Config {
c.Base.AppID, _ = os.Hostname() c.Base.AppID, _ = os.Hostname()
c.Base.AppName = name c.Base.AppName = name
c.Base.AppDomain = cnf.GetEnv("APP_DOMAIN", defAppDomain)
c.Base.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr) c.Base.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr)
c.Base.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix) c.Base.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix)

View File

@ -1,11 +1,13 @@
package event package event
type Event struct { type Event struct {
Command string `json:"command"`
RequestID string `json:"request_id"` RequestID string `json:"request_id"`
} }
func NewEvent(reqID string) *Event { func NewEvent(command, reqID string) *Event {
em := new(Event) em := new(Event)
em.Command = command
em.RequestID = reqID em.RequestID = reqID
return em return em

View File

@ -0,0 +1,10 @@
package server
// REFACTOR: UNIVERSAL SERVER CODE
import (
"github.com/gofiber/fiber/v2"
)
func (s *Server) ConfigHandler(c *fiber.Ctx) error {
return c.JSON(s.Config)
}

View File

@ -2,30 +2,12 @@ package server
// REFACTOR: UNIVERSAL SERVER CODE // REFACTOR: UNIVERSAL SERVER CODE
import ( import (
def "git.pbiernat.dev/egommerce/api-entities/http"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"git.pbiernat.dev/egommerce/api-entities/http"
) )
func (s *Server) HealthHandler(c *fiber.Ctx) error { func (s *Server) HealthHandler(c *fiber.Ctx) error {
// ctx := context.Background() return c.JSON(&def.HealthResponse{
if s.Cache.Ping(c.Context()).Val() != "PONG" {
// TODO: log cache connection error...
return fiber.ErrBadRequest
}
if s.Database.Ping(c.Context()) != nil {
// TODO: log database connection error...
return fiber.ErrBadRequest
}
// TODO add rest of services checks
return c.JSON(&http.HealthResponse{
Status: "OK", Status: "OK",
}) })
} }
func (s *Server) ConfigHandler(c *fiber.Ctx) error {
return c.JSON(s.Config)
}

View File

@ -12,7 +12,6 @@ import (
// "github.com/gofiber/fiber/v2/middleware/cors" // "github.com/gofiber/fiber/v2/middleware/cors"
func SetupMiddleware(s *Server) { func SetupMiddleware(s *Server) {
s.Base.Use(defaultCORS)
s.Base.Use(LoggingMiddleware(s.Logger)) s.Base.Use(LoggingMiddleware(s.Logger))
} }

View File

@ -16,6 +16,7 @@ var (
func SetupRouter(s *Server) { func SetupRouter(s *Server) {
s.Base.Options("*", defaultCORS) s.Base.Options("*", defaultCORS)
s.Base.Use(defaultCORS)
s.Base.Get("/health", s.HealthHandler) s.Base.Get("/health", s.HealthHandler)
s.Base.Get("/config", s.ConfigHandler) s.Base.Get("/config", s.ConfigHandler)
@ -26,12 +27,3 @@ func SetupRouter(s *Server) {
v1.Get("/basket/:basketId/items", s.GetBasketItemsHandler) v1.Get("/basket/:basketId/items", s.GetBasketItemsHandler)
v1.Post("/checkout", s.CheckoutHandler) v1.Post("/checkout", s.CheckoutHandler)
} }
// func CORSPreflightMiddleware(c *fiber.Ctx) error {
// if string(c.Request().Header.Method()) == http.MethodOptions {
// c.Response().SetStatusCode(http.StatusOK)
// c.Next()
// }
// return c.Next()
// }

View File

@ -2,14 +2,13 @@ package server
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"log" "log"
"os" "os"
"strconv" "strconv"
"time" "time"
"github.com/go-redis/redis/v8" redis "github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"github.com/streadway/amqp" "github.com/streadway/amqp"
@ -17,29 +16,27 @@ import (
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd" "git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
db "git.pbiernat.dev/egommerce/basket-service/pkg/database" db "git.pbiernat.dev/egommerce/basket-service/pkg/database"
srv "git.pbiernat.dev/egommerce/basket-service/pkg/server" p "git.pbiernat.dev/egommerce/basket-service/pkg/server"
cnf "git.pbiernat.dev/egommerce/basket-service/internal/config" cnf "git.pbiernat.dev/egommerce/basket-service/internal/config"
) )
type ( type (
Server struct { Server struct {
Base *srv.Server Base *p.Server
Config *cnf.Config Config *cnf.Config
Cache *redis.Client Cache *redis.Client
Database *pgxpool.Pool Database *pgxpool.Pool
Eventbus *amqp.Channel Eventbus *amqp.Channel
Logger *fluentd.Logger Logger *fluentd.Logger
Registry *consul.Service Registry *consul.Service
} }
OptionFn func(*Server) error // FIXME: similar in worker OptionFn func(*Server) error // FIXME: similar in worker
) )
func New(c *cnf.Config, opts ...OptionFn) *Server { func New(c *cnf.Config, opts ...OptionFn) *Server {
svr := &Server{ svr := &Server{
Base: srv.New(c.Base), Base: p.New(c.Base),
Config: c, Config: c,
} }
@ -49,21 +46,66 @@ func New(c *cnf.Config, opts ...OptionFn) *Server {
} }
} }
svr.Base.ShutdownFn = svr.Shutdown()
SetupMiddleware(svr) SetupMiddleware(svr)
SetupRouter(svr) SetupRouter(svr)
return svr return svr
} }
func WithCache(c *cnf.Config) OptionFn { func (s *Server) Shutdown() p.ShutdownFn {
redis := redis.NewClient(&redis.Options{ return func() error {
Addr: c.CacheAddr, s.Logger.Log("Server %s is going down...", s.Base.AppID)
Password: c.CachePassword,
DB: 0,
})
s.Registry.Unregister()
// s.clearMetadataCache()
s.Eventbus.Close()
s.Database.Close()
s.Logger.Log("Gone.")
s.Logger.Close()
return s.Base.Shutdown()
}
}
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
func (s *Server) registerKVUpdater() { // @FIXME: merge duplication in server.go and worker.go
go func() {
ticker := time.NewTicker(time.Second * 10)
for range ticker.C {
config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil)
if err != nil || config == nil {
return
}
kvCnf := bytes.NewBuffer(config.Value)
decoder := json.NewDecoder(kvCnf)
if err := decoder.Decode(&s.Config); err != nil {
return
}
}
}()
}
// func (s *Server) clearMetadataCache() {
// ctx := context.Background()
// key, address := s.getMetadataIPsKey(), s.Config.Base.AppID
// s.Cache.LRem(ctx, key, 0, address)
// }
// func (s *Server) getMetadataIPsKey() string {
// return "internal__" + s.Base.Config.AppName + "__ips"
// }
func WithCache(c *cnf.Config) OptionFn {
return func(s *Server) error { return func(s *Server) error {
s.Cache = redis s.Cache = redis.NewClient(&redis.Options{
Addr: s.Config.CacheAddr,
Password: s.Config.CachePassword,
DB: 0,
})
return nil return nil
} }
@ -84,17 +126,17 @@ func WithDatabase(c *cnf.Config) OptionFn {
} }
func WithEventbus(c *cnf.Config) OptionFn { func WithEventbus(c *cnf.Config) OptionFn {
conn, err := amqp.Dial(c.EventBusURL)
if err != nil {
log.Fatalf("Failed to connect to the Eventbus: %s. Err: %v\n", c.EventBusURL, err)
}
chn, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open new Eventbus channel. Err: %v\n", err)
}
return func(s *Server) error { return func(s *Server) error {
conn, err := amqp.Dial(s.Config.EventBusURL)
if err != nil {
log.Fatalf("Failed to connect to the Eventbus: %s. Err: %v\n", s.Config.EventBusURL, err)
}
chn, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open new Eventbus channel. Err: %v\n", err)
}
s.Eventbus = chn s.Eventbus = chn
return nil return nil
@ -122,7 +164,8 @@ func WithLogger(c *cnf.Config) OptionFn {
func WithRegistry(c *cnf.Config) OptionFn { func WithRegistry(c *cnf.Config) OptionFn {
return func(s *Server) error { return func(s *Server) error {
port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which will cause error port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which will cause error
registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, c.Base.AppName, c.Base.PathPrefix, port) log.Printf("Consul retrieved port: %v", port)
registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.GetIP(), c.Base.AppDomain, c.Base.PathPrefix, port)
if err != nil { if err != nil {
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err) log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err)
} }
@ -132,74 +175,31 @@ func WithRegistry(c *cnf.Config) OptionFn {
log.Fatalf("Failed to register in the Consul service. Err: %v", err) log.Fatalf("Failed to register in the Consul service. Err: %v", err)
} }
registry.RegisterHealthChecks()
s.registerKVUpdater()
s.Registry = registry s.Registry = registry
go func() { // Consul KV updater // svc, _ := registry.Connect()
ticker := time.NewTicker(time.Second * 15) // tlsCnf := svc.ServerTLSConfig()
for range ticker.C { // s.Base.App.Server().TLSConfig = tlsCnf
fetchKVConfig(s) // FIXME: duplicated in worker // fmt.Println("Podmiana configa TLS")
} // defer svc.Close()
}()
go func() { // Server metadata cache updater // go func() { // Consul KV updater
ticker := time.NewTicker(time.Second * 5) // ticker := time.NewTicker(time.Second * 15)
for range ticker.C { // for range ticker.C {
s.cacheMetadata() // fetchKVConfig(s) // FIXME: duplicated in worker
} // }
}() // }()
// go func() { // Server metadata cache updater
// ticker := time.NewTicker(time.Second * 5)
// for range ticker.C {
// s.cacheMetadata()
// }
// }()
return nil return nil
} }
} }
func (s *Server) Shutdown() srv.PurgeFn {
return func(srv *srv.Server) error {
s.Logger.Log("Server %s is going down...", s.Base.AppID)
s.Registry.Unregister()
s.clearMetadataCache()
s.Eventbus.Close()
s.Database.Close()
s.Logger.Log("Gone.")
s.Logger.Close()
return s.Base.Shutdown()
}
}
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
func fetchKVConfig(s *Server) { // @FIXME: merge duplication in server.go and worker.go
config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil)
if err != nil || config == nil {
return
}
kvCnf := bytes.NewBuffer(config.Value)
decoder := json.NewDecoder(kvCnf)
if err := decoder.Decode(&s.Config); err != nil {
return
}
}
func (s *Server) cacheMetadata() {
ctx := context.Background()
key, address := s.getMetadataIPsKey(), s.Base.Config.AppID
pos := s.Cache.LPos(ctx, key, address, redis.LPosArgs{}).Val()
if pos >= 0 {
s.Cache.LRem(ctx, key, 0, address)
}
s.Cache.LPush(ctx, key, address).Err()
}
func (s *Server) clearMetadataCache() {
ctx := context.Background()
key, address := s.getMetadataIPsKey(), s.Config.Base.AppID
s.Cache.LRem(ctx, key, 0, address)
}
func (s *Server) getMetadataIPsKey() string {
return "internal__" + s.Base.Config.AppName + "__ips"
}

View File

@ -2,6 +2,7 @@ package service
import ( import (
"context" "context"
"time"
"github.com/georgysavva/scany/v2/pgxscan" "github.com/georgysavva/scany/v2/pgxscan"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
@ -114,27 +115,29 @@ func (s *BasketService) RemoveItem(ctx context.Context, itemID int, basketID str
func (s *BasketService) UpdateItem(ctx context.Context, item *model.BasketItemModel, qty int) error { func (s *BasketService) UpdateItem(ctx context.Context, item *model.BasketItemModel, qty int) error {
var price float64 = 0 var price float64 = 0
pricingAPI := api.NewPricingAPI(ServiceUserAgent, s.redis) pricingAPI := api.NewPricingAPI(ServiceUserAgent, s.redis) // FIXME
productPrice, err := pricingAPI.GetProductPrice(item.ProductID) productPrice, err := pricingAPI.GetProductPrice(item.ProductID)
if err == nil { if err == nil {
price = productPrice.Price price = productPrice.Price
} }
now := time.Now()
sql := `UPDATE basket.basket_item SET quantity=$1, price=$2 WHERE basket_id=$3 AND product_id=$4` sql := `UPDATE basket.basket_item SET quantity=$1, price=$2, updated_at=$3 WHERE basket_id=$4 AND product_id=$5`
if _, err := s.dbConn.Exec(ctx, sql, qty, price, item.BasketID, item.ProductID); err != nil { if _, err := s.dbConn.Exec(ctx, sql, qty, price, now, item.BasketID, item.ProductID); err != nil {
return err return err
} }
// update basket updated_at field...
return nil return nil
} }
func (s *BasketService) Checkout(reqID, basketID string) (string, error) { func (s *BasketService) CheckoutBasket(reqID, basketID string) (string, error) {
s.log.Log("Creating initial order from basket#:%s", basketID) s.log.Log("Checkout basket#:%s", basketID)
msg := &event.BasketCheckoutEvent{Event: event.NewEvent(reqID), BasketID: basketID} // FIXME: send more info... msg := &event.BasketCheckoutEvent{
Event: event.NewEvent("CheckoutBasket", reqID),
BasketID: basketID,
} // FIXME: send more info...
rabbitmq.Publish(s.ebCh, "api-events", "basket.order.basketCheckout", msg) rabbitmq.Publish(s.ebCh, "api-events", "basket.order.basketCheckout", msg)
return basketID, nil return basketID, nil

View File

@ -78,7 +78,7 @@ func RemoveProductFromBasket(srv *service.BasketService, productID, qty int, bas
func CheckoutBasket(srv *service.BasketService, basketID, reqID string) (*entity.BasketCheckoutResponse, error) { func CheckoutBasket(srv *service.BasketService, basketID, reqID string) (*entity.BasketCheckoutResponse, error) {
res := &entity.BasketCheckoutResponse{} res := &entity.BasketCheckoutResponse{}
basketID, err := srv.Checkout(reqID, basketID) basketID, err := srv.CheckoutBasket(reqID, basketID)
if err != nil { if err != nil {
return res, err return res, err
} }

View File

@ -20,15 +20,36 @@ type CommandRunner struct {
cmd Command cmd Command
} }
func NewCommandRunner(cmd string, srvc *service.BasketService) *CommandRunner {
rnr := &CommandRunner{}
rnr.cmd = getCommand(cmd, srvc)
return rnr
}
func getCommand(cmd string, srvc *service.BasketService) Command {
// fmt.Printf("getCommand: %v\n", cmd)
var c Command
switch cmd { // FIXME
case "AddProductToBasket":
c = &AddProductToBasketCommand{srvc}
case "RemoveProductFromBasket":
c = &RemoveProductFromBasketCommand{srvc}
}
return c
}
func (r *CommandRunner) run(data CommandData) (bool, any) { func (r *CommandRunner) run(data CommandData) (bool, any) {
return r.cmd.run(data) return r.cmd.run(data)
} }
type AddProductToBasketCommand struct { type AddProductToBasketCommand struct {
srvc *service.BasketService srvc *service.BasketService // FIXME: Remove service dep
} }
func (c *AddProductToBasketCommand) run(data CommandData) (bool, any) { func (c *AddProductToBasketCommand) run(data CommandData) (bool, any) { // FIXME: Move run func to WorkerPool
reqID := data["request_id"].(string) // FIXME Check input params! reqID := data["request_id"].(string) // FIXME Check input params!
productID := int(data["product_id"].(float64)) // FIXME Check input params! productID := int(data["product_id"].(float64)) // FIXME Check input params!
basketID := data["basket_id"].(string) // FIXME Check input params! basketID := data["basket_id"].(string) // FIXME Check input params!
@ -40,10 +61,10 @@ func (c *AddProductToBasketCommand) run(data CommandData) (bool, any) {
} }
type RemoveProductFromBasketCommand struct { type RemoveProductFromBasketCommand struct {
srvc *service.BasketService srvc *service.BasketService // FIXME: Remove service dep
} }
func (c *RemoveProductFromBasketCommand) run(data CommandData) (bool, any) { func (c *RemoveProductFromBasketCommand) run(data CommandData) (bool, any) { // FIXME: Move run func to WorkerPool
reqID := data["request_id"].(string) // FIXME Check input params! reqID := data["request_id"].(string) // FIXME Check input params!
productID := int(data["product_id"].(float64)) // FIXME Check input params! productID := int(data["product_id"].(float64)) // FIXME Check input params!
basketID := data["basket_id"].(string) // FIXME Check input params! basketID := data["basket_id"].(string) // FIXME Check input params!

135
src/internal/worker/ext.go Normal file
View File

@ -0,0 +1,135 @@
package worker
import (
"bytes"
"encoding/json"
"os"
"time"
cnf "git.pbiernat.dev/egommerce/basket-service/internal/config"
"git.pbiernat.dev/egommerce/basket-service/pkg/database"
"git.pbiernat.dev/egommerce/go-api-pkg/consul"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
"git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
"github.com/go-redis/redis/v8"
)
func WithCache(c *cnf.Config) OptionFn {
return func(w *Worker) error {
conn := redis.NewClient(&redis.Options{
Addr: c.CacheAddr,
Password: c.CachePassword,
DB: 0,
})
w.Cache = conn
return nil
}
}
func WithDatabase(c *cnf.Config) OptionFn {
return func(w *Worker) error {
conn, err := database.Connect(c.DbURL)
if err != nil {
w.Logger.Log("Failed to connect to Database server: %v\n", err)
os.Exit(1)
}
w.Database = conn
return nil
}
}
func WithEventbus(c *cnf.Config) OptionFn {
return func(w *Worker) error {
_, chn, err := rabbitmq.Open(c.EventBusURL)
if err != nil {
w.Logger.Log("Failed to connect to EventBus server: %v\n", err)
os.Exit(1)
}
err = rabbitmq.NewExchange(chn, c.EventBusExchange)
if err != nil {
w.Logger.Log("Failed to declare EventBus exchange: %v\n", err)
os.Exit(1)
}
_, err = chn.QueueDeclare(
c.EventBusQueue, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
w.Logger.Log("Failed to declare EventBus queue: %v\n", err)
os.Exit(1)
}
// w.bindQueues()
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productAddedToBasket")
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productRemovedFromBasket")
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.updateQuantity")
w.Eventbus = chn
return nil
}
}
func WithLogger(c *cnf.Config) OptionFn {
return func(w *Worker) error {
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
if err != nil {
w.Logger.Log("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err)
os.Exit(1)
}
logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort)
if err != nil {
w.Logger.Log("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err)
os.Exit(1)
}
w.Logger = logger
return nil
}
}
func WithRegistry(c *cnf.Config) OptionFn {
return func(w *Worker) error {
registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0)
if err != nil {
w.Logger.Log("Error connecting to %s: %v", c.RegistryAddr, err)
}
w.Registry = registry
go func(w *Worker) { // Fetch Consul KV config and store it in app config
ticker := time.NewTicker(time.Second * 15)
for range ticker.C {
fetchKVConfig(w) // FIXME: duplicated in server
}
}(w)
return nil
}
}
func fetchKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go
config, _, err := w.Registry.KV().Get(w.Config.KVNamespace, nil)
if err != nil || config == nil {
return
}
kvCnf := bytes.NewBuffer(config.Value)
decoder := json.NewDecoder(kvCnf)
if err := decoder.Decode(&w.Config); err != nil {
return
}
}

View File

@ -1,14 +1,13 @@
package worker package worker
import ( import (
"bytes"
"encoding/json"
"fmt" "fmt"
"log" "log"
"os" "os"
"os/signal" "os/signal"
"strconv"
"strings"
"syscall" "syscall"
"time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
@ -19,8 +18,8 @@ import (
"git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
cnf "git.pbiernat.dev/egommerce/basket-service/internal/config" cnf "git.pbiernat.dev/egommerce/basket-service/internal/config"
"git.pbiernat.dev/egommerce/basket-service/internal/event"
"git.pbiernat.dev/egommerce/basket-service/internal/service" "git.pbiernat.dev/egommerce/basket-service/internal/service"
"git.pbiernat.dev/egommerce/basket-service/pkg/database"
) )
type ( type (
@ -36,137 +35,17 @@ type (
) )
func New(c *cnf.Config, opts ...OptionFn) *Worker { func New(c *cnf.Config, opts ...OptionFn) *Worker {
wrk := &Worker{ w := &Worker{
Config: c, Config: c,
} }
for _, opt := range opts { for _, opt := range opts {
if err := opt(wrk); err != nil { if err := opt(w); err != nil {
log.Fatalf("Failed to attach extension to the worker. Err: %v\n", err) log.Fatalf("Failed to attach extension to the Worker. Err: %v\n", err)
} }
} }
return wrk return w
}
func WithCache(c *cnf.Config) OptionFn {
return func(w *Worker) error {
conn := redis.NewClient(&redis.Options{
Addr: c.CacheAddr,
Password: c.CachePassword,
DB: 0,
})
w.Cache = conn
return nil
}
}
func WithDatabase(c *cnf.Config) OptionFn {
return func(w *Worker) error {
conn, err := database.Connect(c.DbURL)
if err != nil {
w.Logger.Log("Failed to connect to Database server: %v\n", err)
os.Exit(1)
}
w.Database = conn
return nil
}
}
func WithEventbus(c *cnf.Config) OptionFn {
return func(w *Worker) error {
_, chn, err := rabbitmq.Open(c.EventBusURL)
if err != nil {
w.Logger.Log("Failed to connect to EventBus server: %v\n", err)
os.Exit(1)
}
err = rabbitmq.NewExchange(chn, c.EventBusExchange)
if err != nil {
w.Logger.Log("Failed to declare EventBus exchange: %v\n", err)
os.Exit(1)
}
_, err = chn.QueueDeclare(
c.EventBusQueue, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
w.Logger.Log("Failed to declare EventBus queue: %v\n", err)
os.Exit(1)
}
// w.bindQueues()
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productAddedToBasket")
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productRemovedFromBasket")
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.updateQuantity")
w.Eventbus = chn
return nil
}
}
func WithLogger(c *cnf.Config) OptionFn {
return func(w *Worker) error {
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
if err != nil {
w.Logger.Log("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err)
os.Exit(1)
}
logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort)
if err != nil {
w.Logger.Log("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err)
os.Exit(1)
}
w.Logger = logger
return nil
}
}
func WithRegistry(c *cnf.Config) OptionFn {
return func(w *Worker) error {
registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0)
if err != nil {
w.Logger.Log("Error connecting to %s: %v", c.RegistryAddr, err)
}
w.Registry = registry
go func(w *Worker) { // Fetch Consul KV config and store it in app config
ticker := time.NewTicker(time.Second * 15)
for range ticker.C {
fetchKVConfig(w) // FIXME: duplicated in server
}
}(w)
return nil
}
}
func fetchKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go
config, _, err := w.Registry.KV().Get(w.Config.KVNamespace, nil)
if err != nil || config == nil {
return
}
kvCnf := bytes.NewBuffer(config.Value)
decoder := json.NewDecoder(kvCnf)
if err := decoder.Decode(&w.Config); err != nil {
return
}
} }
func (w *Worker) Start(while chan struct{}) error { func (w *Worker) Start(while chan struct{}) error {
@ -176,10 +55,12 @@ func (w *Worker) Start(while chan struct{}) error {
<-sigint <-sigint
w.Shutdown() w.Shutdown()
close(while) close(while)
}() }()
run := w.createRunFile("/app.run") // TODO move to common library (shared between server and worker)
defer w.removeRunFile(run)
err := w.doWork() err := w.doWork()
if err != nil { if err != nil {
log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.Config.Base.AppID, err) log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.Config.Base.AppID, err)
@ -203,6 +84,21 @@ func (w *Worker) Shutdown() error {
return nil return nil
} }
func (w *Worker) createRunFile(path string) *os.File {
run, err := os.Create(path)
if err != nil {
log.Fatalf("Failed to create run file. Reason: %v\n", err)
os.Exit(1)
}
run.WriteString(strconv.Itoa(os.Getpid()))
return run
}
func (w *Worker) removeRunFile(f *os.File) error {
return f.Close()
}
func (w *Worker) doWork() error { func (w *Worker) doWork() error {
msgs, err := w.Eventbus.Consume( msgs, err := w.Eventbus.Consume(
w.Config.EventBusQueue, // queue w.Config.EventBusQueue, // queue
@ -221,10 +117,10 @@ func (w *Worker) doWork() error {
go func() { go func() {
basketSrvc := service.NewBasketService(w.Database, w.Cache, w.Eventbus, w.Logger) basketSrvc := service.NewBasketService(w.Database, w.Cache, w.Eventbus, w.Logger)
for m := range msgs { for d := range msgs {
go func(m amqp.Delivery) { go func(d amqp.Delivery) {
w.processMsg(basketSrvc, m) w.processMsg(basketSrvc, d)
}(m) }(d)
} }
}() }()
@ -240,8 +136,6 @@ func (w *Worker) processMsg(srvc *service.BasketService, m amqp.Delivery) {
return return
} }
rnr := &CommandRunner{}
name := fmt.Sprintf("%s", msg["event"]) name := fmt.Sprintf("%s", msg["event"])
data := (msg["data"]).(map[string]interface{}) data := (msg["data"]).(map[string]interface{})
// reqID := (data["request_id"]).(string) // FIXME Check input params! // reqID := (data["request_id"]).(string) // FIXME Check input params!
@ -249,9 +143,15 @@ func (w *Worker) processMsg(srvc *service.BasketService, m amqp.Delivery) {
w.Logger.Log("Processing message \"%s\" with data: %v\n", name, data) w.Logger.Log("Processing message \"%s\" with data: %v\n", name, data)
var ok = false var ok = false
w.Logger.Log("CMD:") switch true { // Refactor -> use case for polymorphism
rnr.cmd = (msg["command"]).(Command) case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET):
w.Logger.Log(": %v", rnr.cmd) w.Logger.Log("Event: %s", event.EVENT_PRODUCT_ADDED_TO_BASKET)
case strings.Contains(name, event.EVENT_PRODUCT_REMOVED_FROM_BASKET):
w.Logger.Log("Event: %s", event.EVENT_PRODUCT_REMOVED_FROM_BASKET)
// case strings.Contains(name, event.EVENT_BASKET_CHECKOUT):
// w.Logger.Log("Event: %s", event.EVENT_BASKET_CHECKOUT)
}
// case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET): // case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET):
// basketID := data["basket_id"].(string) // FIXME Check input params! // basketID := data["basket_id"].(string) // FIXME Check input params!
// productID := data["product_id"] // FIXME Check input params! // productID := data["product_id"] // FIXME Check input params!
@ -266,6 +166,7 @@ func (w *Worker) processMsg(srvc *service.BasketService, m amqp.Delivery) {
// w.Logger.Log("Removing product #%d from basket #%s. ReqID: #%s", productID, basketID, reqID) // w.Logger.Log("Removing product #%d from basket #%s. ReqID: #%s", productID, basketID, reqID)
// } // }
rnr := NewCommandRunner((data["command"]).(string), srvc)
ok, _ = rnr.run(data) ok, _ = rnr.run(data)
if ok { if ok {
w.Logger.Log("Successful executed message \"%s\"\n", name) w.Logger.Log("Successful executed message \"%s\"\n", name)

View File

@ -1,22 +0,0 @@
package config
import (
"os"
"github.com/joho/godotenv"
)
var ErrLoadingEnvs error
func init() {
ErrLoadingEnvs = godotenv.Load()
}
func GetEnv(name string, defVal string) string {
env := os.Getenv(name)
if env == "" {
return defVal
}
return env
}

View File

@ -2,12 +2,15 @@ package server
import ( import (
"fmt" "fmt"
"net"
"os"
"time" "time"
) )
type Config struct { type Config struct {
AppID string AppID string
AppName string AppName string
AppDomain string
NetAddr string NetAddr string
PathPrefix string PathPrefix string
@ -19,3 +22,13 @@ type Config struct {
func (c *Config) GetAppFullName() string { func (c *Config) GetAppFullName() string {
return fmt.Sprintf("%s_%s", c.AppName, c.AppID) return fmt.Sprintf("%s_%s", c.AppName, c.AppID)
} }
func (c *Config) GetIP() string {
host, _ := os.Hostname()
ips, _ := net.LookupIP(host)
for _, ip := range ips {
return ip.String()
}
return host
}

View File

@ -2,8 +2,10 @@ package server
import ( import (
"log" "log"
"net"
"os" "os"
"os/signal" "os/signal"
"strconv"
"syscall" "syscall"
"time" "time"
@ -17,22 +19,22 @@ type (
*fiber.App *fiber.App
*Config *Config
addr string // e.g. "127.0.0.1:8080" addr string // e.g. "127.0.0.1:80"
// name string // e.g. "awesome-rest-api"
// kvNmspc string ShutdownFn
} }
HeaderRequestID struct { HeaderRequestID struct {
RequestID string `reqHeader:"x-request-id"` RequestID string `reqHeader:"x-request-id"`
} }
OptionFn func(*Server) error OptionFn func(*Server) error
PurgeFn func(*Server) error ShutdownFn func() error
) )
func New(conf *Config) *Server { func New(conf *Config) *Server {
return &Server{ return &Server{
App: fiber.New(fiber.Config{ App: fiber.New(fiber.Config{
AppName: conf.AppID, AppName: conf.AppID,
ServerHeader: conf.AppName, ServerHeader: conf.AppName + ":" + conf.AppID,
ReadTimeout: conf.ReadTimeout * time.Millisecond, ReadTimeout: conf.ReadTimeout * time.Millisecond,
WriteTimeout: conf.WriteTimeout * time.Millisecond, WriteTimeout: conf.WriteTimeout * time.Millisecond,
IdleTimeout: conf.IdleTimeout * time.Millisecond, IdleTimeout: conf.IdleTimeout * time.Millisecond,
@ -42,20 +44,25 @@ func New(conf *Config) *Server {
} }
} }
func (s *Server) Start(while chan struct{}, prgFn PurgeFn) error { func (s *Server) Start(while chan struct{}) error {
go func() { go func() {
sigint := make(chan os.Signal, 1) sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sigint <-sigint
if err := prgFn(s); err != nil { if err := s.ShutdownFn(); err != nil {
log.Fatalf("Failed to shutdown server. Reason: %v\n", err) log.Fatalf("Failed to shutdown server. Reason: %v\n", err)
} }
close(while) close(while)
}() }()
err := s.Listen(s.addr) run := s.createRunFile("./app.run") // FIXME path... TODO move to common library (shared between server and worker)
defer s.removeRunFile(run)
ln, _ := net.Listen("tcp", s.addr)
// ln = tls.NewListener(ln, s.App.Server().TLSConfig)
err := s.Listener(ln)
if err != nil { if err != nil {
log.Fatalf("Failed to start server: %s. Reason: %v\n", s.Config.AppID, err) log.Fatalf("Failed to start server: %s. Reason: %v\n", s.Config.AppID, err)
close(while) close(while)
@ -77,3 +84,18 @@ func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
func (s *Server) Error(c *fiber.Ctx, code int, msg string) error { func (s *Server) Error(c *fiber.Ctx, code int, msg string) error {
return c.Status(code).JSON(http.ErrorResponse{Error: msg}) return c.Status(code).JSON(http.ErrorResponse{Error: msg})
} }
func (s *Server) createRunFile(path string) *os.File {
run, err := os.Create(path)
if err != nil {
log.Fatalf("Failed to create run file. Reason: %v\n", err)
os.Exit(1)
}
run.WriteString(strconv.Itoa(os.Getpid()))
return run
}
func (s *Server) removeRunFile(f *os.File) error {
return f.Close()
}