This commit is contained in:
parent
6f7952d166
commit
bf47c8645d
@ -11,3 +11,4 @@ COPY src ./
|
||||
RUN export CGO_ENABLED=0 ; export GOOS=linux ; export GOARCH=amd64 && \
|
||||
go build -ldflags="-w -s" -o "$BIN_OUTPUT/server" $GO_SERVER && \
|
||||
go build -ldflags="-w -s" -o "$BIN_OUTPUT/worker" $GO_WORKER
|
||||
|
@ -3,7 +3,8 @@ ARG BUILDER_IMAGE="git.pbiernat.dev/egommerce/catalog-builder:latest"
|
||||
FROM ${BUILDER_IMAGE} AS builder
|
||||
|
||||
# Destination image - server
|
||||
FROM gcr.io/distroless/base-debian10
|
||||
# FROM gcr.io/distroless/base-debian10
|
||||
FROM alpine:3.17
|
||||
|
||||
ARG BIN_OUTPUT
|
||||
ARG SVC_NAME
|
||||
|
@ -31,6 +31,7 @@ func main() {
|
||||
}
|
||||
|
||||
c := new(server.Config)
|
||||
c.AppID, _ = os.Hostname()
|
||||
c.AppName = config.GetEnv("APP_NAME", defAppName)
|
||||
c.AppDomain = config.GetEnv("APP_DOMAIN", defAppDomain)
|
||||
c.NetAddr = config.GetEnv("SERVER_ADDR", defNetAddr)
|
||||
@ -42,7 +43,7 @@ func main() {
|
||||
c.EventBusExchange = ebEventsExchange
|
||||
|
||||
logHost, logPort := fluentd.ParseAddr(c.LoggerAddr)
|
||||
logger := fluentd.NewLogger(c.AppName, logHost, logPort)
|
||||
logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort)
|
||||
defer logger.Close()
|
||||
|
||||
// db conn
|
||||
@ -70,5 +71,10 @@ func main() {
|
||||
|
||||
// start server
|
||||
srv := server.NewServer(c, logger, dbConn, ebCh)
|
||||
srv.StartWithGracefulShutdown()
|
||||
|
||||
forever := make(chan struct{})
|
||||
srv.StartWithGracefulShutdown(forever)
|
||||
<-forever
|
||||
|
||||
// os.Exit(1)
|
||||
}
|
||||
|
@ -4,13 +4,14 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/config"
|
||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/database"
|
||||
"git.pbiernat.dev/egommerce/catalog-service/internal/app/server"
|
||||
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
||||
amqp "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -24,22 +25,21 @@ const (
|
||||
)
|
||||
|
||||
func main() {
|
||||
id := uuid.New().String()[24:]
|
||||
if config.ErrLoadingEnvs != nil {
|
||||
log.Panicln("Error loading .env file", config.ErrLoadingEnvs)
|
||||
}
|
||||
|
||||
c := new(server.Config)
|
||||
c.AppName = config.GetEnv("APP_NAME", defAppName) + "#:" + id
|
||||
c.AppID, _ = os.Hostname()
|
||||
c.AppName = config.GetEnv("APP_NAME", defAppName)
|
||||
c.LoggerAddr = config.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
||||
c.DbURL = config.GetEnv("DATABASE_URL", defDbURL)
|
||||
c.EventBusURL = config.GetEnv("EVENTBUS_URL", defEventBusURL)
|
||||
c.EventBusExchange = ebEventsExchange
|
||||
c.EventBusQueue = ebEventsQueue
|
||||
// c.EventBusQueue = fmt.Sprintf("%s-%s", ebEventsQueue, id)
|
||||
|
||||
logHost, logPort := fluentd.ParseAddr(c.LoggerAddr)
|
||||
logger := fluentd.NewLogger(c.AppName, logHost, logPort)
|
||||
logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort)
|
||||
defer logger.Close()
|
||||
|
||||
// db conn
|
||||
@ -79,11 +79,11 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
err = amqp.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "warehouse.catalog.stockUpdated")
|
||||
if err != nil {
|
||||
logger.Log("Failed to prepare EventBus queue: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
amqp.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "warehouse.catalog.stockUpdated")
|
||||
// if err != nil {
|
||||
// logger.Log("Failed to bind EventBus queue: %v\n", err)
|
||||
// os.Exit(1)
|
||||
// }
|
||||
|
||||
// event consume
|
||||
msgs, err := ebCh.Consume(
|
||||
@ -96,11 +96,21 @@ func main() {
|
||||
nil, // args
|
||||
)
|
||||
if err != nil {
|
||||
logger.Log("Failed to register a consumer: %s", err)
|
||||
logger.Log("Failed to register a EventBus consumer: %s", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
var forever chan struct{}
|
||||
forever := make(chan struct{})
|
||||
go func() {
|
||||
sigint := make(chan os.Signal, 1)
|
||||
signal.Notify(sigint, os.Interrupt, os.Kill, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigint
|
||||
|
||||
logger.Log("Worker %s stopped working...\n", c.GetAppFullName())
|
||||
|
||||
close(forever)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for d := range msgs {
|
||||
msg, err := amqp.Deserialize(d.Body)
|
||||
@ -109,6 +119,7 @@ func main() {
|
||||
d.Reject(false) // FIXME: how to handle erros in queue...????
|
||||
continue
|
||||
}
|
||||
|
||||
event := fmt.Sprintf("%s", msg["event"])
|
||||
data := (msg["data"]).(map[string]interface{})
|
||||
logger.Log("Message<%s>: %s\n", event, data)
|
||||
|
@ -3,9 +3,7 @@ module git.pbiernat.dev/egommerce/catalog-service
|
||||
go 1.18
|
||||
|
||||
require (
|
||||
git.pbiernat.dev/egommerce/go-api-pkg/consul v0.0.0-20221201033742-97afd0d96662
|
||||
git.pbiernat.dev/egommerce/go-api-pkg/fluentd v0.0.0-20221201033742-97afd0d96662
|
||||
git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq v0.0.0-20221201033742-97afd0d96662
|
||||
git.pbiernat.dev/egommerce/go-api-pkg v0.0.31
|
||||
github.com/gofiber/fiber/v2 v2.40.1
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/jackc/pgx/v4 v4.17.2
|
||||
@ -41,7 +39,6 @@ require (
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/philhofer/fwd v1.1.1 // indirect
|
||||
github.com/rivo/uniseg v0.2.0 // indirect
|
||||
github.com/rogpeppe/go-internal v1.9.0 // indirect
|
||||
github.com/tinylib/msgp v1.1.6 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/valyala/fasthttp v1.41.0 // indirect
|
||||
|
16
src/go.sum
16
src/go.sum
@ -1,9 +1,9 @@
|
||||
git.pbiernat.dev/egommerce/go-api-pkg/consul v0.0.0-20221201033742-97afd0d96662 h1:Z6D9KDaHS/TL2jcY10M0UxqyGcaXn7jK7P6ja8+ytkg=
|
||||
git.pbiernat.dev/egommerce/go-api-pkg/consul v0.0.0-20221201033742-97afd0d96662/go.mod h1:lDctRzmIVtFNCPrXAOAOQJvi52KjfOqhgTftQ6gTE7U=
|
||||
git.pbiernat.dev/egommerce/go-api-pkg/fluentd v0.0.0-20221201033742-97afd0d96662 h1:AG5rRYaXQFyL9XtZLkKfBQGyOBbGRQAqrgtLL6KoQTo=
|
||||
git.pbiernat.dev/egommerce/go-api-pkg/fluentd v0.0.0-20221201033742-97afd0d96662/go.mod h1:/7GWyTxCHuk7y1aQtxJkTMuXNG6utr8APWjvOp51H7A=
|
||||
git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq v0.0.0-20221201033742-97afd0d96662 h1:2skXunz8yjmYK0H5VOsXRhuyFrPo86PpL4b8heBei3I=
|
||||
git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq v0.0.0-20221201033742-97afd0d96662/go.mod h1:gJQ6go/IGbrDkIftZO8cubLCu8Rbt5SWCh4EZMcdsW8=
|
||||
git.pbiernat.dev/egommerce/go-api-pkg v0.0.29 h1:EG6t3i0P8ENH3eRYPgjVE8UHpnif8UPYA/23e1Nm6n0=
|
||||
git.pbiernat.dev/egommerce/go-api-pkg v0.0.29/go.mod h1:nAwcw2MZtn/54YKq8VQK6RJAsiuoLUtPuazXg8JcqK8=
|
||||
git.pbiernat.dev/egommerce/go-api-pkg v0.0.30 h1:qRUGkv/TA7vO6FDnKKxZ6CfooQVfp3/PpL3vGf6lwYY=
|
||||
git.pbiernat.dev/egommerce/go-api-pkg v0.0.30/go.mod h1:nAwcw2MZtn/54YKq8VQK6RJAsiuoLUtPuazXg8JcqK8=
|
||||
git.pbiernat.dev/egommerce/go-api-pkg v0.0.31 h1:TZOdMTp++vws86ZlwtWKmpoik5YN6aOSWRpu9j0FykU=
|
||||
git.pbiernat.dev/egommerce/go-api-pkg v0.0.31/go.mod h1:nAwcw2MZtn/54YKq8VQK6RJAsiuoLUtPuazXg8JcqK8=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
|
||||
github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs=
|
||||
@ -171,8 +171,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxv
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
|
||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
@ -248,8 +248,6 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx
|
||||
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
|
||||
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
|
||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
||||
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
||||
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
|
||||
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
|
||||
|
@ -1,10 +1,12 @@
|
||||
package server
|
||||
|
||||
import "fmt"
|
||||
|
||||
type Config struct {
|
||||
AppName string
|
||||
AppDomain string
|
||||
NetAddr string
|
||||
// Host string
|
||||
AppID string
|
||||
AppName string
|
||||
AppDomain string
|
||||
NetAddr string
|
||||
Port int
|
||||
LoggerAddr string
|
||||
RegistryAddr string
|
||||
@ -14,3 +16,7 @@ type Config struct {
|
||||
EventBusExchange string
|
||||
EventBusQueue string
|
||||
}
|
||||
|
||||
func (c *Config) GetAppFullName() string {
|
||||
return fmt.Sprintf("%s_%s", c.AppName, c.AppID)
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package server
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gofiber/fiber/v2"
|
||||
@ -23,12 +24,13 @@ type Server struct {
|
||||
addr string
|
||||
}
|
||||
|
||||
type RequestID struct {
|
||||
type Headers struct {
|
||||
RequestID string `reqHeader:"x-request-id"`
|
||||
}
|
||||
|
||||
func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, ebCh *amqp.Channel) *Server {
|
||||
discovery, err := discovery.NewService(conf.RegistryAddr, conf.AppName, conf.AppDomain, conf.Port)
|
||||
logger.Log("API_ID: %s", conf.AppID)
|
||||
discovery, err := discovery.NewService(conf.RegistryAddr, conf.AppID, conf.AppName, conf.AppID, conf.AppDomain, conf.Port)
|
||||
if err != nil {
|
||||
logger.Log("Error connecting to %s: %v", conf.RegistryAddr, err)
|
||||
}
|
||||
@ -36,12 +38,12 @@ func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, ebCh *amq
|
||||
logger.Log("Registering service with name: %s, address: %s", discovery.Name, discovery.Address)
|
||||
err = discovery.Register()
|
||||
if err != nil {
|
||||
logger.Log(err.Error())
|
||||
logger.Log("register error: %v", err)
|
||||
}
|
||||
|
||||
cnf := fiber.Config{
|
||||
AppName: conf.AppName,
|
||||
ServerHeader: conf.AppDomain,
|
||||
ServerHeader: conf.AppName,
|
||||
ReadTimeout: time.Millisecond * 50,
|
||||
WriteTimeout: time.Millisecond * 50,
|
||||
IdleTimeout: time.Millisecond * 50,
|
||||
@ -67,33 +69,28 @@ func (s *Server) Start() {
|
||||
s.log.Log("Starting error: %v", err)
|
||||
}
|
||||
|
||||
func (s *Server) StartWithGracefulShutdown() {
|
||||
idle := make(chan struct{})
|
||||
|
||||
func (s *Server) StartWithGracefulShutdown(forever chan struct{}) {
|
||||
go func() {
|
||||
sigint := make(chan os.Signal, 1)
|
||||
signal.Notify(sigint, os.Interrupt)
|
||||
signal.Notify(sigint, os.Interrupt, os.Kill, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigint
|
||||
|
||||
if err := s.Shutdown(); err != nil {
|
||||
if err := s.gracefulShutdown(); err != nil {
|
||||
s.log.Log("Server is not shutting down! Reason: %v", err)
|
||||
}
|
||||
|
||||
s.log.Log("Server is going down...")
|
||||
s.discovery.Unregister()
|
||||
|
||||
close(idle)
|
||||
close(forever)
|
||||
}()
|
||||
|
||||
if err := s.Listen(s.addr); err != nil {
|
||||
s.log.Log("Server is not running! Reason: %v", err)
|
||||
}
|
||||
|
||||
<-idle
|
||||
<-forever
|
||||
}
|
||||
|
||||
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
|
||||
var hdr = new(RequestID)
|
||||
var hdr = new(Headers)
|
||||
if err := c.ReqHeaderParser(hdr); err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -101,3 +98,15 @@ func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
|
||||
return hdr.RequestID, nil
|
||||
|
||||
}
|
||||
|
||||
func (s *Server) gracefulShutdown() error {
|
||||
s.log.Log("Server is going down...")
|
||||
s.log.Log("Unregistering service: %s", s.discovery.GetID())
|
||||
s.discovery.Unregister()
|
||||
|
||||
s.ebCh.Close()
|
||||
s.db.Close()
|
||||
s.log.Close()
|
||||
|
||||
return s.Shutdown()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user