M README.md => README.md +5 -1
@@ 96,12 96,16 @@ alertmanager {
# When the alert-mode is set to single, ntfy-alertmanager will cache each single alert
# to avoid sending recurrences.
cache {
- # The type of cache that will be used (default is memory).
+ # The type of cache that will be used (either memory or redis; default is memory).
type memory
# How long messages stay in the cache for
duration 24h
+
+ # Memory cache settings
# Interval in which the cache is cleaned up
cleanup-interval 1h
+
+ # Redis cache settings
}
```
M cache/cache.go => cache/cache.go +2 -2
@@ 3,7 3,7 @@ package cache
// Cache is the interface that describes a cache for ntfy-alertmanager.
type Cache interface {
- Set(fingerprint string, status string)
- Contains(fingerprint string, status string) bool
+ Set(fingerprint string, status string) error
+ Contains(fingerprint string, status string) (bool, error)
Cleanup()
}
M cache/memory.go => cache/memory.go +6 -5
@@ 27,7 27,7 @@ func NewMemoryCache(d time.Duration) Cache {
}
// Set saves an alert in the cache.
-func (c *MemoryCache) Set(fingerprint string, status string) {
+func (c *MemoryCache) Set(fingerprint string, status string) error {
c.mu.Lock()
defer c.mu.Unlock()
alert := new(cachedAlert)
@@ 35,19 35,20 @@ func (c *MemoryCache) Set(fingerprint string, status string) {
alert.status = status
c.alerts[fingerprint] = alert
+ return nil
}
// Contains checks if an alert with a given fingerprint is in the cache
-// and checks if the status matches.
-func (c *MemoryCache) Contains(fingerprint string, status string) bool {
+// and if the status matches.
+func (c *MemoryCache) Contains(fingerprint string, status string) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()
alert, ok := c.alerts[fingerprint]
if ok {
- return alert.status == status
+ return alert.status == status, nil
}
- return false
+ return false, nil
}
func (a *cachedAlert) expired() bool {
A cache/redis.go => cache/redis.go +51 -0
@@ 0,0 1,51 @@
+package cache
+
+import (
+ "context"
+ "time"
+
+ "github.com/redis/go-redis/v9"
+)
+
+// RedisCache is the redis cache.
+type RedisCache struct {
+ client *redis.Client
+ duration time.Duration
+}
+
+// NewRedisCache creates a new redis cache/client.
+func NewRedisCache(redisURL string, d time.Duration) (Cache, error) {
+ c := new(RedisCache)
+ ropts, err := redis.ParseURL(redisURL)
+ if err != nil {
+ return nil, err
+ }
+
+ rdb := redis.NewClient(ropts)
+ c.client = rdb
+ c.duration = d
+
+ return c, nil
+}
+
+// Set saves an alert in the cache.
+func (c *RedisCache) Set(fingerprint string, status string) error {
+ return c.client.SetEx(context.Background(), fingerprint, status, c.duration).Err()
+}
+
+// Contains checks if an alert with a given fingerprint is in the cache
+// and if the status matches.
+func (c *RedisCache) Contains(fingerprint string, status string) (bool, error) {
+ val, err := c.client.Get(context.Background(), fingerprint).Result()
+ if err == redis.Nil {
+ return false, nil
+ } else if err != nil {
+ return false, err
+ }
+
+ return val == status, nil
+}
+
+// Cleanup is an empty function that is simply here to implement the interface.
+// Redis does its own cleanup.
+func (c *RedisCache) Cleanup() {}
M config.go => config.go +3 -0
@@ 20,6 20,7 @@ type cacheType int
const (
memory cacheType = iota
+ redis
)
type config struct {
@@ 256,6 257,8 @@ func readConfig(path string) (*config, error) {
switch strings.ToLower(cacheType) {
case "memory":
config.cache.Type = memory
+ case "redis":
+ config.cache.Type = redis
default:
return nil, fmt.Errorf("cache: illegal type %q", cacheType)
}
M config_test.go => config_test.go +6 -1
@@ 54,6 54,7 @@ alertmanager {
}
cache {
+ type redis
duration 48h
}
`
@@ 73,7 74,11 @@ cache {
"instance:example.com": {Tags: []string{"computer", "example"}},
},
},
- cache: cacheConfig{CleanupInterval: time.Hour, Duration: 48 * time.Hour},
+ cache: cacheConfig{
+ Type: redis,
+ CleanupInterval: time.Hour,
+ Duration: 48 * time.Hour,
+ },
am: alertmanagerConfig{
SilenceDuration: time.Hour * 24,
User: "user",
M go.mod => go.mod +6 -1
@@ 5,7 5,12 @@ go 1.19
require (
git.sr.ht/~emersion/go-scfg v0.0.0-20211215104734-c2c7a15d6c99
git.xenrox.net/~xenrox/go-log v0.0.0-20221012231312-9e7356c29b74
+ github.com/redis/go-redis/v9 v9.0.2
golang.org/x/text v0.7.0
)
-require github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
+require (
+ github.com/cespare/xxhash/v2 v2.2.0 // indirect
+ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
+ github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
+)
M go.sum => go.sum +11 -0
@@ 2,9 2,20 @@ git.sr.ht/~emersion/go-scfg v0.0.0-20211215104734-c2c7a15d6c99 h1:1s8n5uisqkR+Bz
git.sr.ht/~emersion/go-scfg v0.0.0-20211215104734-c2c7a15d6c99/go.mod h1:t+Ww6SR24yYnXzEWiNlOY0AFo5E9B73X++10lrSpp4U=
git.xenrox.net/~xenrox/go-log v0.0.0-20221012231312-9e7356c29b74 h1:t/52xLRU4IHd2O1nkb9fcUE6K95/KdBtdQYHT31szps=
git.xenrox.net/~xenrox/go-log v0.0.0-20221012231312-9e7356c29b74/go.mod h1:d98WFDHGpxaEThKue5CfGtr9OrWgbaApprt3GH+OM4s=
+github.com/bsm/ginkgo/v2 v2.5.0 h1:aOAnND1T40wEdAtkGSkvSICWeQ8L3UASX7YVCqQx+eQ=
+github.com/bsm/gomega v1.20.0 h1:JhAwLmtRzXFTx2AkALSLa8ijZafntmhSoU63Ok18Uq8=
+github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
+github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/redis/go-redis/v9 v9.0.2 h1:BA426Zqe/7r56kCcvxYLWe1mkaz71LKF77GwgFzSxfE=
+github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEtEHbBQevps=
+github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
M main.go => main.go +24 -5
@@ 59,8 59,12 @@ type notification struct {
func (br *bridge) singleAlertNotifications(p *payload) []*notification {
var notifications []*notification
for _, alert := range p.Alerts {
- if br.cache.Contains(alert.Fingerprint, alert.Status) {
- br.logger.Debugf("Alert %s skipped: Still in cache", alert.Fingerprint)
+ contains, err := br.cache.Contains(alert.Fingerprint, alert.Status)
+ if err != nil {
+ br.logger.Errorf("Failed to lookup alert %q in cache: %v", alert.Fingerprint, err)
+ }
+ if contains {
+ br.logger.Debugf("Alert %q skipped: Still in cache", alert.Fingerprint)
continue
}
@@ 324,7 328,9 @@ func (br *bridge) handleWebhooks(w http.ResponseWriter, r *http.Request) {
if err != nil {
br.logger.Errorf("Failed to publish notification: %v", err)
} else {
- br.cache.Set(n.fingerprint, n.status)
+ if err := br.cache.Set(n.fingerprint, n.status); err != nil {
+ br.logger.Errorf("Failed to set alert %q in cache: %v", n.fingerprint, err)
+ }
}
}
} else {
@@ 395,7 401,18 @@ func main() {
client := &httpClient{&http.Client{Timeout: time.Second * 3}}
- c := cache.NewMemoryCache(cfg.cache.Duration)
+ var c cache.Cache
+ switch cfg.cache.Type {
+ case memory:
+ c = cache.NewMemoryCache(cfg.cache.Duration)
+ case redis:
+ var err error
+ // TODO: Read URL from config
+ c, err = cache.NewRedisCache("redis://localhost:6379", cfg.cache.Duration)
+ if err != nil {
+ logger.Fatalf("Failed to create redis cache: %v", err)
+ }
+ }
bridge := &bridge{cfg: cfg, logger: logger, cache: c, client: client}
logger.Infof("Listening on %s, ntfy-alertmanager %s", cfg.HTTPAddress, version)
@@ 409,6 426,8 @@ func main() {
http.HandleFunc("/silences", bridge.handleSilences)
}
- go bridge.runCleanup()
+ if cfg.cache.Type == memory {
+ go bridge.runCleanup()
+ }
logger.Fatal(http.ListenAndServe(cfg.HTTPAddress, nil))
}