From 6188022ad612f37f14c12e60542c573b0c20f5d3 Mon Sep 17 00:00:00 2001 From: Piotr Biernat Date: Thu, 1 Dec 2022 17:56:11 +0100 Subject: [PATCH] update --- consul/discovery.go | 132 ++++++++++++++++++++++++++++++++++++++++++++ fluentd/config.go | 14 +++++ fluentd/logger.go | 41 ++++++++++++++ rabbitmq/connect.go | 0 rabbitmq/queue.go | 85 ++++++++++++++++++++++++++++ 5 files changed, 272 insertions(+) create mode 100644 consul/discovery.go create mode 100644 fluentd/config.go create mode 100644 fluentd/logger.go create mode 100644 rabbitmq/connect.go create mode 100644 rabbitmq/queue.go diff --git a/consul/discovery.go b/consul/discovery.go new file mode 100644 index 0000000..369e29d --- /dev/null +++ b/consul/discovery.go @@ -0,0 +1,132 @@ +package consul + +import ( + "fmt" + "log" + "net/http" + "time" + + consul "github.com/hashicorp/consul/api" +) + +type Service struct { + AppID string + Name string + Address string + IP string + Port int + TTL time.Duration + ConsulAgent *consul.Agent +} + +var ErrServiceUnavailable = fmt.Errorf("Service is unavailable") + +func NewService(serverAddr, appID, appName, ip, domain string, appPort int) (*Service, error) { + s := new(Service) + s.AppID = appID + s.Name = appName + s.Address = domain + s.IP = ip + s.Port = appPort + s.TTL = time.Second * 15 + + client, err := consul.NewClient(newClientConfig(serverAddr)) + if err != nil { + return nil, err + } + s.ConsulAgent = client.Agent() + + return s, nil +} + +func newClientConfig(serverAddr string) *consul.Config { + conf := consul.DefaultConfig() + conf.Address = serverAddr + + return conf +} + +func (s *Service) Register() error { + def := &consul.AgentServiceRegistration{ + ID: s.Name + "_" + s.AppID, + Name: s.Name, + Address: s.IP, + Port: s.Port, + Tags: s.getTags(), + Check: &consul.AgentServiceCheck{ + TTL: s.TTL.String(), + }, + } + + if err := s.ConsulAgent.ServiceRegister(def); err != nil { + return err + } + go s.UpdateTTL(def) + + return nil +} +func (s *Service) Unregister() error { + return s.ConsulAgent.ServiceDeregister(s.Name + "_" + s.AppID) +} + +func (s *Service) UpdateTTL(service *consul.AgentServiceRegistration) { + ticker := time.NewTicker(s.TTL / 2) + for range ticker.C { + ok, err := s.check() + if !ok { + if err := s.ConsulAgent.FailTTL("service:"+s.Name+"_"+s.AppID, err.Error()); err != nil { + log.Println(err) + } + } else { + if err := s.ConsulAgent.PassTTL("service:"+s.AppID, "OK"); err != nil { + log.Println(err) + } + } + } +} + +func (s *Service) check() (bool, error) { + client := &http.Client{} + healthUrl := fmt.Sprintf("http://%s:%d/health", s.IP, s.Port) + req, err := http.NewRequest(http.MethodGet, healthUrl, nil) + if err != nil { + return false, ErrServiceUnavailable + } + req.Header.Set("User-Agent", "Health Check") + + resp, err := client.Do(req) + if err != nil { + return false, ErrServiceUnavailable + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + return true, nil + } + + return false, ErrServiceUnavailable +} + +func (s *Service) getTags() []string { + fullName := fmt.Sprintf("%s-%s", s.Name, s.AppID) + bFullAddr := fmt.Sprintf("http://%s:%d/", s.IP, s.Port) // FIXME: declare one once - dont need to refresh.... + + tags := []string{ + "traefik.enable=true", + "traefik.http.routers." + s.Name + ".rule=Host(`" + s.Address + "`)", + "traefik.http.routers." + s.Name + ".entryPoints=https", + "traefik.http.routers." + s.Name + ".service=" + s.Name, + "traefik.http.routers." + s.Name + ".middlewares=compress,requestid", + "traefik.http.routers." + s.Name + ".tls=true", + // "traefik.http.services." + s.Name + ".loadbalancer.server.scheme=http", + // "traefik.http.services." + s.Name + ".loadbalancer.server.port=" + port, + "traefik.http.services." + s.Name + ".loadbalancer.passhostheader=false", + "traefik.http.services." + s.Name + ".loadbalancer.servers." + fullName + ".url=" + bFullAddr, + "traefik.http.middlewares.compress.compress=true", + "traefik.http.middlewares.requestid.plugin.requestid.headerName=X-Request-ID", + // "traefik.http.services." + fullName + ".loadbalancer.healthcheck.path=/health", + // "traefik.http.services." + fullName + ".loadbalancer.healthcheck.interval=10s", + } + + return tags +} diff --git a/fluentd/config.go b/fluentd/config.go new file mode 100644 index 0000000..48255d2 --- /dev/null +++ b/fluentd/config.go @@ -0,0 +1,14 @@ +package fluentd + +import ( + "strconv" + "strings" +) + +func ParseAddr(addr string) (string, int) { + p := strings.Split(addr, ":") + fHost := p[0] + fPort, _ := strconv.Atoi(p[1]) + + return fHost, fPort +} diff --git a/fluentd/logger.go b/fluentd/logger.go new file mode 100644 index 0000000..2ffa50d --- /dev/null +++ b/fluentd/logger.go @@ -0,0 +1,41 @@ +package fluentd + +import ( + "fmt" + "log" + + "github.com/fluent/fluent-logger-golang/fluent" +) + +type Logger struct { + fluent *fluent.Fluent + appName string +} + +func NewLogger(appName, fHost string, fPort int) *Logger { + config := fluent.Config{ + FluentHost: fHost, + FluentPort: fPort, + // WriteTimeout: -1, + } + fluent, err := fluent.New(config) + if err != nil { + log.Panicf("Error connecting to %s: %v", fHost, err) + } + + return &Logger{fluent, appName} +} + +func (l *Logger) Log(format string, v ...any) { + mapData := map[string]string{ + "message": fmt.Sprintf(format, v...), + } + err := l.fluent.Post(l.appName, mapData) + if err != nil { + log.Println("Error sending log: ", err) + } +} + +func (l *Logger) Close() error { + return l.fluent.Close() +} diff --git a/rabbitmq/connect.go b/rabbitmq/connect.go new file mode 100644 index 0000000..e69de29 diff --git a/rabbitmq/queue.go b/rabbitmq/queue.go new file mode 100644 index 0000000..f382a5a --- /dev/null +++ b/rabbitmq/queue.go @@ -0,0 +1,85 @@ +package rabbitmq + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + + "github.com/streadway/amqp" +) + +type Message map[string]interface{} + +func Serialize(msg any) (string, error) { + var b bytes.Buffer + encoder := json.NewEncoder(&b) + err := encoder.Encode(msg) + + return b.String(), err +} + +func Deserialize(b []byte) (Message, error) { + var msg Message + buf := bytes.NewBuffer(b) + decoder := json.NewDecoder(buf) + err := decoder.Decode(&msg) + + return msg, err +} + +func NewExchange(chn *amqp.Channel, name string) error { + err := chn.ExchangeDeclare( + name, + "direct", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + + if err != nil { + return err + } + + return nil +} + +func Publish(chn *amqp.Channel, name, routingKey string, msg any) error { + jsonData, err := Serialize(msg) + if err != nil { + return err + } + + msgBody := fmt.Sprintf(`{"event":"%T","data":%s}`, msg, jsonData) + chn.Publish( + name, // exchange name + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: []byte(msgBody), + }, + ) + + return nil +} + +func BindQueueToExchange(chn *amqp.Channel, queueName, exchName, routingKey string) error { + err := chn.QueueBind( + queueName, // queue name + routingKey, // routing key + exchName, // exchange name + false, + nil, + ) + if err != nil { + log.Printf("Failed to bind a queue: %s\n", queueName) + + return err + } + + return nil +}