~xenrox/ntfy-alertmanager

054c163ffb744b9cf8e4681bbb1980210cf35f62 — Thorben Günther 1 year, 1 month ago 652d46d
cache: Support redis

Closes: https://todo.xenrox.net/~xenrox/ntfy-alertmanager/11
9 files changed, 114 insertions(+), 15 deletions(-)

M README.md
M cache/cache.go
M cache/memory.go
A cache/redis.go
M config.go
M config_test.go
M go.mod
M go.sum
M main.go
M README.md => README.md +5 -1
@@ 96,12 96,16 @@ alertmanager {
# When the alert-mode is set to single, ntfy-alertmanager will cache each single alert
# to avoid sending recurrences.
cache {
    # The type of cache that will be used (default is memory).
    # The type of cache that will be used (either memory or redis; default is memory).
    type memory
    # How long messages stay in the cache for
    duration 24h

    # Memory cache settings
    # Interval in which the cache is cleaned up
    cleanup-interval 1h

    # Redis cache settings
}
```


M cache/cache.go => cache/cache.go +2 -2
@@ 3,7 3,7 @@ package cache

// Cache is the interface that describes a cache for ntfy-alertmanager.
type Cache interface {
	Set(fingerprint string, status string)
	Contains(fingerprint string, status string) bool
	Set(fingerprint string, status string) error
	Contains(fingerprint string, status string) (bool, error)
	Cleanup()
}

M cache/memory.go => cache/memory.go +6 -5
@@ 27,7 27,7 @@ func NewMemoryCache(d time.Duration) Cache {
}

// Set saves an alert in the cache.
func (c *MemoryCache) Set(fingerprint string, status string) {
func (c *MemoryCache) Set(fingerprint string, status string) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	alert := new(cachedAlert)


@@ 35,19 35,20 @@ func (c *MemoryCache) Set(fingerprint string, status string) {
	alert.status = status

	c.alerts[fingerprint] = alert
	return nil
}

// Contains checks if an alert with a given fingerprint is in the cache
// and checks if the status matches.
func (c *MemoryCache) Contains(fingerprint string, status string) bool {
// and if the status matches.
func (c *MemoryCache) Contains(fingerprint string, status string) (bool, error) {
	c.mu.Lock()
	defer c.mu.Unlock()
	alert, ok := c.alerts[fingerprint]
	if ok {
		return alert.status == status
		return alert.status == status, nil
	}

	return false
	return false, nil
}

func (a *cachedAlert) expired() bool {

A cache/redis.go => cache/redis.go +51 -0
@@ 0,0 1,51 @@
package cache

import (
	"context"
	"time"

	"github.com/redis/go-redis/v9"
)

// RedisCache is the redis cache.
type RedisCache struct {
	client   *redis.Client
	duration time.Duration
}

// NewRedisCache creates a new redis cache/client.
func NewRedisCache(redisURL string, d time.Duration) (Cache, error) {
	c := new(RedisCache)
	ropts, err := redis.ParseURL(redisURL)
	if err != nil {
		return nil, err
	}

	rdb := redis.NewClient(ropts)
	c.client = rdb
	c.duration = d

	return c, nil
}

// Set saves an alert in the cache.
func (c *RedisCache) Set(fingerprint string, status string) error {
	return c.client.SetEx(context.Background(), fingerprint, status, c.duration).Err()
}

// Contains checks if an alert with a given fingerprint is in the cache
// and if the status matches.
func (c *RedisCache) Contains(fingerprint string, status string) (bool, error) {
	val, err := c.client.Get(context.Background(), fingerprint).Result()
	if err == redis.Nil {
		return false, nil
	} else if err != nil {
		return false, err
	}

	return val == status, nil
}

// Cleanup is an empty function that is simply here to implement the interface.
// Redis does its own cleanup.
func (c *RedisCache) Cleanup() {}

M config.go => config.go +3 -0
@@ 20,6 20,7 @@ type cacheType int

const (
	memory cacheType = iota
	redis
)

type config struct {


@@ 256,6 257,8 @@ func readConfig(path string) (*config, error) {
			switch strings.ToLower(cacheType) {
			case "memory":
				config.cache.Type = memory
			case "redis":
				config.cache.Type = redis
			default:
				return nil, fmt.Errorf("cache: illegal type %q", cacheType)
			}

M config_test.go => config_test.go +6 -1
@@ 54,6 54,7 @@ alertmanager {
}

cache {
    type redis
    duration 48h
}
`


@@ 73,7 74,11 @@ cache {
				"instance:example.com": {Tags: []string{"computer", "example"}},
			},
		},
		cache: cacheConfig{CleanupInterval: time.Hour, Duration: 48 * time.Hour},
		cache: cacheConfig{
			Type:            redis,
			CleanupInterval: time.Hour,
			Duration:        48 * time.Hour,
		},
		am: alertmanagerConfig{
			SilenceDuration: time.Hour * 24,
			User:            "user",

M go.mod => go.mod +6 -1
@@ 5,7 5,12 @@ go 1.19
require (
	git.sr.ht/~emersion/go-scfg v0.0.0-20211215104734-c2c7a15d6c99
	git.xenrox.net/~xenrox/go-log v0.0.0-20221012231312-9e7356c29b74
	github.com/redis/go-redis/v9 v9.0.2
	golang.org/x/text v0.7.0
)

require github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
require (
	github.com/cespare/xxhash/v2 v2.2.0 // indirect
	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
	github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
)

M go.sum => go.sum +11 -0
@@ 2,9 2,20 @@ git.sr.ht/~emersion/go-scfg v0.0.0-20211215104734-c2c7a15d6c99 h1:1s8n5uisqkR+Bz
git.sr.ht/~emersion/go-scfg v0.0.0-20211215104734-c2c7a15d6c99/go.mod h1:t+Ww6SR24yYnXzEWiNlOY0AFo5E9B73X++10lrSpp4U=
git.xenrox.net/~xenrox/go-log v0.0.0-20221012231312-9e7356c29b74 h1:t/52xLRU4IHd2O1nkb9fcUE6K95/KdBtdQYHT31szps=
git.xenrox.net/~xenrox/go-log v0.0.0-20221012231312-9e7356c29b74/go.mod h1:d98WFDHGpxaEThKue5CfGtr9OrWgbaApprt3GH+OM4s=
github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

M main.go => main.go +24 -5
@@ 59,8 59,12 @@ type notification struct {
func (br *bridge) singleAlertNotifications(p *payload) []*notification {
	var notifications []*notification
	for _, alert := range p.Alerts {
		if br.cache.Contains(alert.Fingerprint, alert.Status) {
			br.logger.Debugf("Alert %s skipped: Still in cache", alert.Fingerprint)
		contains, err := br.cache.Contains(alert.Fingerprint, alert.Status)
		if err != nil {
			br.logger.Errorf("Failed to lookup alert %q in cache: %v", alert.Fingerprint, err)
		}
		if contains {
			br.logger.Debugf("Alert %q skipped: Still in cache", alert.Fingerprint)
			continue
		}



@@ 324,7 328,9 @@ func (br *bridge) handleWebhooks(w http.ResponseWriter, r *http.Request) {
			if err != nil {
				br.logger.Errorf("Failed to publish notification: %v", err)
			} else {
				br.cache.Set(n.fingerprint, n.status)
				if err := br.cache.Set(n.fingerprint, n.status); err != nil {
					br.logger.Errorf("Failed to set alert %q in cache: %v", n.fingerprint, err)
				}
			}
		}
	} else {


@@ 395,7 401,18 @@ func main() {

	client := &httpClient{&http.Client{Timeout: time.Second * 3}}

	c := cache.NewMemoryCache(cfg.cache.Duration)
	var c cache.Cache
	switch cfg.cache.Type {
	case memory:
		c = cache.NewMemoryCache(cfg.cache.Duration)
	case redis:
		var err error
		// TODO: Read URL from config
		c, err = cache.NewRedisCache("redis://localhost:6379", cfg.cache.Duration)
		if err != nil {
			logger.Fatalf("Failed to create redis cache: %v", err)
		}
	}
	bridge := &bridge{cfg: cfg, logger: logger, cache: c, client: client}

	logger.Infof("Listening on %s, ntfy-alertmanager %s", cfg.HTTPAddress, version)


@@ 409,6 426,8 @@ func main() {
		http.HandleFunc("/silences", bridge.handleSilences)
	}

	go bridge.runCleanup()
	if cfg.cache.Type == memory {
		go bridge.runCleanup()
	}
	logger.Fatal(http.ListenAndServe(cfg.HTTPAddress, nil))
}