aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpg9182 <96569817+pg9182@users.noreply.github.com>2022-11-27 00:56:27 -0500
committerpg9182 <96569817+pg9182@users.noreply.github.com>2022-11-27 02:48:10 -0500
commit1bcdc2f5491afe98240b3a823d518cf610b1deed (patch)
treead565c7e1493cc5aada661d2f492bbd2b6ab30c4
parente3cdce7ada2be1fdf77a5e15130b4fcef3045c26 (diff)
downloadAtlas-1bcdc2f5491afe98240b3a823d518cf610b1deed.tar.gz
Atlas-1bcdc2f5491afe98240b3a823d518cf610b1deed.zip
pkg/metricsx: Implement GeoCounter
-rw-r--r--go.mod1
-rw-r--r--go.sum2
-rw-r--r--pkg/metricsx/geohash.go148
-rw-r--r--pkg/metricsx/geohash_test.go119
4 files changed, 270 insertions, 0 deletions
diff --git a/go.mod b/go.mod
index 5d5081d..d0341a4 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,7 @@ require (
github.com/jmoiron/sqlx v1.3.5
github.com/klauspost/compress v1.15.12
github.com/mattn/go-sqlite3 v1.14.16
+ github.com/mmcloughlin/geohash v0.10.0
github.com/pg9182/ip2x v0.1.1
github.com/rs/zerolog v1.28.0
github.com/spf13/pflag v1.0.5
diff --git a/go.sum b/go.sum
index 96e9de0..0cd9a81 100644
--- a/go.sum
+++ b/go.sum
@@ -26,6 +26,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
+github.com/mmcloughlin/geohash v0.10.0 h1:9w1HchfDfdeLc+jFEf/04D27KP7E2QmpDu52wPbJWRE=
+github.com/mmcloughlin/geohash v0.10.0/go.mod h1:oNZxQo5yWJh0eMQEP/8hwQuVx9Z9tjwFUqcTB1SmG0c=
github.com/pg9182/ip2x v0.1.1 h1:J++3N8OV+k7zK3xAY5mSZWE73EbwgqIxpzeloL7Zqyk=
github.com/pg9182/ip2x v0.1.1/go.mod h1:iJzts7yWZDWUaldqNGFVOZ0MW9uYrGk1hv9R/wTJdNQ=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
diff --git a/pkg/metricsx/geohash.go b/pkg/metricsx/geohash.go
new file mode 100644
index 0000000..1073f97
--- /dev/null
+++ b/pkg/metricsx/geohash.go
@@ -0,0 +1,148 @@
+package metricsx
+
+import (
+ "io"
+ "strconv"
+ "strings"
+ "sync/atomic"
+
+ "github.com/VictoriaMetrics/metrics"
+ "github.com/mmcloughlin/geohash"
+)
+
+// GeoCounter is like a *metrics.Counter, but split by location using geohashes.
+type GeoCounter struct {
+ level uint
+ ctr []*metrics.Counter
+ unk *metrics.Counter
+ set *metrics.Set
+ base string
+ arg string
+}
+
+// NewGeoCounter creates a new GeoCounter writing to metrics in set named name,
+// with level chars in the geohash.
+func NewGeoCounter(set *metrics.Set, name string, level uint) *GeoCounter {
+ if h, p := geohash.ConvertStringToInt(strings.Repeat("z", int(level))); h != 1<<(5*level)-1 || p != 5*uint(level) {
+ panic("wtf") // this shouldn't happen... geohashes are base32, and int encoding is 5 bits per char
+ }
+ base, arg := splitName(name)
+ return &GeoCounter{
+ level: level,
+ ctr: make([]*metrics.Counter, 1<<(5*level)),
+ unk: set.NewCounter(formatName(base, arg, "geohash", "")),
+ set: set,
+ base: base,
+ arg: arg,
+ }
+}
+
+// Inc increments the counter for the specified latitude and longitude.
+func (c *GeoCounter) Inc(lat, lng float64) {
+ c.Counter(lat, lng).Inc()
+}
+
+// Set sets the counter for the specified latitude and longitude.
+func (c *GeoCounter) Set(lat, lng float64, v uint64) {
+ c.Counter(lat, lng).Set(v)
+}
+
+// IncUnknown increments the unknown counter.
+func (c *GeoCounter) IncUnknown() {
+ c.unk.Inc()
+}
+
+// SetUnknown sets the unknown counter.
+func (c *GeoCounter) SetUnknown(v uint64) {
+ c.unk.Set(v)
+}
+
+// Counter gets the underlying counter for the specified latitude and longitude.
+func (c *GeoCounter) Counter(lat, lng float64) *metrics.Counter {
+ h := geohash.EncodeIntWithPrecision(lat, lng, c.level*5)
+ if int(h) >= len(c.ctr) {
+ return nil // wtf (this shouldn't even be possible, but we don't panic here for performance reasons)
+ }
+ m := c.ctr[h]
+ if m == nil {
+ m = c.set.NewCounter(formatName(c.base, c.arg, "geohash", geohash.EncodeWithPrecision(lat, lng, c.level)))
+ c.ctr[h] = m
+ }
+ return m
+}
+
+// CounterUnknown gets the underlying counter for unknown positions.
+func (c *GeoCounter) CounterUnknown() *metrics.Counter {
+ return c.unk
+}
+
+// GeoCounter2 is an optimized standalone level 2 geocounter metric. It must not
+// be copied (it uses atomics).
+type GeoCounter2 struct {
+ name string
+ ctr [1 << (5 * 2)]uint64
+ unk uint64
+}
+
+// NewGeoCounter2 creates a new GeoCounter2 with the provided metric name.
+//
+// Note: The maximum cardinality of metrics produced will be 1024.
+func NewGeoCounter2(name string) *GeoCounter2 {
+ b, a := splitName(name)
+ n := formatName(b, a, "geohash", "")
+ if !strings.HasSuffix(n, `geohash=""}`) {
+ panic("wtf") // should never happen
+ }
+ return &GeoCounter2{name: n}
+}
+
+// Inc increments the counter for the specified latitude and longitude.
+func (c *GeoCounter2) Inc(lat, lng float64) {
+ if c != nil {
+ // this should always be true, but we need it to satisfy the bounds checker
+ if h := geohash2(lat, lng); h < 1<<(5*2) {
+ atomic.AddUint64(&c.ctr[h], 1)
+ }
+ }
+}
+
+// Set sets the counter for the specified latitude and longitude.
+func (c *GeoCounter2) Set(lat, lng float64, v uint64) {
+ if c != nil {
+ // this should always be true, but we need it to satisfy the bounds checker
+ if h := geohash2(lat, lng); h < 1<<(5*2) {
+ atomic.StoreUint64(&c.ctr[h], 1)
+ }
+ }
+}
+
+// IncUnknown increments the unknown counter.
+func (c *GeoCounter2) IncUnknown() {
+ atomic.AddUint64(&c.unk, 1)
+}
+
+// SetUnknown sets the unknown counter.
+func (c *GeoCounter2) SetUnknown(v uint64) {
+ atomic.StoreUint64(&c.unk, v)
+}
+
+// WritePrometheus writes the Promethus text metrics.
+func (c *GeoCounter2) WritePrometheus(w io.Writer) {
+ n := len(c.name)
+ b := make([]byte, 0, n+2+1+20+1)
+ b = append(b, c.name...)
+ w.Write(append(strconv.AppendUint(append(b, ' '), atomic.LoadUint64(&c.unk), 10), '\n'))
+ b = append(b, `"} `...)
+ _ = b[n-2] // bounds check hint
+ for h := uint64(0); h < 1<<(5*2); h++ {
+ if v := atomic.LoadUint64(&c.ctr[h]); v != 0 {
+ b[n-1] = "0123456789bcdefghjkmnpqrstuvwxyz"[(h>>0)&0x1f]
+ b[n-2] = "0123456789bcdefghjkmnpqrstuvwxyz"[(h>>5)&0x1f]
+ w.Write(append(strconv.AppendUint(b, v, 10), '\n'))
+ }
+ }
+}
+
+func geohash2(lat, lng float64) uint64 {
+ return geohash.EncodeIntWithPrecision(lat, lng, 5*2)
+}
diff --git a/pkg/metricsx/geohash_test.go b/pkg/metricsx/geohash_test.go
new file mode 100644
index 0000000..03cd12e
--- /dev/null
+++ b/pkg/metricsx/geohash_test.go
@@ -0,0 +1,119 @@
+package metricsx
+
+import (
+ "bytes"
+ "compress/gzip"
+ "encoding/base64"
+ "io"
+ "strings"
+ "testing"
+
+ "github.com/VictoriaMetrics/metrics"
+)
+
+func TestGeoCounter(t *testing.T) {
+ var exp string
+ if d, err := gzip.NewReader(base64.NewDecoder(base64.StdEncoding, strings.NewReader(``+
+ `H4sIAAAAAAAA/4zZSW7lVACG0TmriN6YQZzndlCLwb1s2bjvUPaOEPPKmV9dquDg/8vNVq3bP2XdlHXz67WW9VrWrz+b6u/2r7X99Xp9f3z+8fsjn5+v74/4p0PB6/vj/dOhr9f3x9dPh96v74/gp0OR/OMSuSmVP1MmN+VyUyE3VXKokUOd/Hsa5KZJblrkpk0OHXLo`+
+ `kkMPHAo+4b9dEMhNYjwQ40Ekh8R4IMYDMR6I8UCMB2I8EONBJ4fEeDDJITEeiPFAjAdiPBDjX5/wf/DXl9wUyk2x3JTKoVwOlXKolkOt/O16uWmUm2a5aZVDuxw65dANh96fckg8vUM5JJ7e4uktnt7i6S2e3q0cEk/vUQ6Jp7d4eount3h6i6fwP0/hT4cCGKBQNjiU`+
+ `DQ4juUk2OJQNDmWDQ9ngUDY4lA0OZYPDTv5MssHhJDfJBoeywaFscCgbHMoGR9KZkXRmJMYjMR5JZ0ZiPBLjkRiPxHgkxiMxHonxSDozEuORdGYkxiMxHonxSIxHYjwW47F0QRzKTdIFsXRBLF0QSxfE0gVxK3876YJ4lJukC2Lpgli6IJYuiKULEunMRDwl0pmJeErE`+
+ `UyKeEvGUiKdEOjMRT4l0ZiKeEvGUiKdEPCXiKZWfg1N5z0wFXSobnMp7ZiobnIrMVDY4Fb6pbHAqG5zKBqfynpnKBqfynpnKBqeywalscCobnMoGZ/LNzKQzMzGeifFMOjMT45kYz8R4JsYzMZ6J8UyMZ9KZmRjPpDMzMZ6J8UyMZ2I8E+O5fMdz4ZvLe2YuXZCLzFzQ`+
+ `5dIFuXRBLu+ZuXRBLu+ZuXRBLl2QSxfk0gW5dEEh38xCPBXSmYV4KsRTIZ4K8VSIp0I6sxBPhXRmIZ4K8VSIp0I8FeKplJ+DS3nPLAVdKRtcyntmKRtcisxSNrgUvqVscCkbXMoGl/KeWcoGl/KeWcoGl7LBpWxwKRtcygZX8s2spDMrMV6J8Uo6sxLjlRivxHglxisx`+
+ `XonxSoxX0pmVGK+kMysxXonxSoxXYrwS47V8x2vhW8t7Zi1dUIvMWtDV0gW1dEEt75m1dEEt75m1dEEtXVBLF9TSBbV0QSPfzEY8NdKZjXhqxFMjnhrx1IinRjqzEU+NdGYjnhrx1IinRjw14qmV35u30pmt/E6xlQ1upTNb2eBWfqfYyga38jvFVja4lQ1uZYNb6cxW`+
+ `NriVzmxlg1vZ4FY2uJUNbmWDO9ngTjqzE+OdGO+kMzsx3onxTox3YrwT450Y78R4J53ZifFOOrMT450Y78R4J8Y7Md6L8V66oJfO7KULeumCXrqgly7opQt66cxeuqCXzuylC3rpgl66oJcu6KULBunMQTwN0pmDeBrE0yCeBvE0iKdBOnMQT4N05iCeBvE0iKdBPA3i`+
+ `aZTOHKUzR9ngUTZ4lM4cZYNH2eBRNniUDR5lg0fZ4FE2eJTOHGWDR+nMUTZ4lA0eZYNH2eBRNniSDZ6kMycxPonxSTpzEuOTGJ/E+CTGJzE+ifFJjE/SmZMYn6QzJzE+ifFJjE9ifBLjsxifpQtm6cxZumCWLpilC2bpglm6YJbOnKULZunMWbpgli6YpQtm6YJZumCR`+
+ `zlzE0yKduYinRTwt4mkRT4t4WqQzF/G0SGcu4mkRT4t4WsTTIp5W+T6t0pmroFtlg1fpzFU2eBWZq2zwKnxX2eBVNniVDV6lM1fZ4FU6c5UNXmWDV9ngVTZ4lQ3e5Ju5SWduYnwT45t05ibGNzG+ifFNjG9ifBPjmxjfpDM3Mb5JZ25ifBPjmxjfxPgmxnf5ju/Cd5fO`+
+ `3KULdpG5C7pdumCXLtilM3fpgl06c5cu2KULdumCXbpgly445Jt5iKdDOvMQT4d4OsTTIZ4O8XRIZx7i6ZDOPMTTIZ4O8XSIp0M8nfJ9OqUzT0F3ygaf0pmnbPApMk/Z4FP4nrLBp2zwKRt8SmeessGndOYpG3zKBp+ywads8CkbfMk385LOvMT4JcYv6cxLjF9i/BLj`+
+ `lxi/xPglxi8xfklnXmL8ks68xPglxi8xfonxS4zf8h2/he8tnXlLF9wi8xZ0t3TBLV1wS2fe0gW3dOYtXXBLF9zSBbd0wS1d8Mg38xFPj3TmI54e8fSIp0c8PeLpkc58xNMjnfmIp0c8PeLpEU/P/57+DQAA//8lgllOY1MAAA==`,
+ ))); err != nil {
+ panic(err)
+ } else if x, err := io.ReadAll(d); err != nil {
+ panic(err)
+ } else {
+ exp = strings.TrimSpace(string(x))
+ }
+
+ set := metrics.NewSet()
+ name := `test{dfgdfg="sdfsdf"}`
+ gc1 := NewGeoCounter(set, name, 2)
+ gc2 := NewGeoCounter2(name)
+
+ for lat := float64(-90); lat <= 90; lat += 10 {
+ for lng := float64(-180); lng <= 180; lng += 10 {
+ gc1.Inc(lat, lng)
+ gc2.Inc(lat, lng)
+ }
+ }
+
+ var b1 strings.Builder
+ set.WritePrometheus(&b1)
+
+ var b2 strings.Builder
+ gc2.WritePrometheus(&b2)
+
+ t.Run("GeoCounter", func(t *testing.T) {
+ if a := strings.TrimSpace(b1.String()); a != exp {
+ t.Errorf("expected:\n\t%s\n, got\n\t%s", strings.ReplaceAll(exp, "\n", "\n\t"), strings.ReplaceAll(a, "\n", "\n\t"))
+ }
+ })
+
+ t.Run("GeoCounter2", func(t *testing.T) {
+ if a := strings.TrimSpace(b2.String()); a != exp {
+ t.Errorf("expected:\n\t%s\n, got\n\t%s", strings.ReplaceAll(exp, "\n", "\n\t"), strings.ReplaceAll(a, "\n", "\n\t"))
+ }
+ })
+}
+
+func BenchmarkGeoCounter2(b *testing.B) {
+ var pts [][2]float64
+ for lat := float64(-90); lat <= 90; lat += 10 {
+ for lng := float64(-180); lng <= 180; lng += 10 {
+ pts = append(pts, [2]float64{lat, lng})
+ }
+ }
+
+ b.Run("GeoCounter", func(b *testing.B) {
+ set := metrics.NewSet()
+ ctr := NewGeoCounter(set, `test{dfgdfg="sdfsdf"}`, 2)
+
+ b.Run("Inc", func(b *testing.B) {
+ for n := 0; n < b.N; n++ {
+ pt := pts[n%len(pts)]
+ ctr.Inc(pt[0], pt[1])
+ }
+ })
+
+ b.Run("WritePrometheus", func(b *testing.B) {
+ var buf bytes.Buffer
+ set.WritePrometheus(&buf)
+ b.ResetTimer()
+
+ for n := 0; n < b.N; n++ {
+ buf.Reset()
+ set.WritePrometheus(&buf)
+ }
+ })
+ })
+
+ b.Run("GeoCounter2", func(b *testing.B) {
+ ctr := NewGeoCounter2(`test{dfgdfg="sdfsdf"}`)
+
+ b.Run("Inc", func(b *testing.B) {
+ for n := 0; n < b.N; n++ {
+ pt := pts[n%len(pts)]
+ ctr.Inc(pt[0], pt[1])
+ }
+ })
+
+ b.Run("WritePrometheus", func(b *testing.B) {
+ var buf bytes.Buffer
+ ctr.WritePrometheus(&buf)
+ b.ResetTimer()
+
+ for n := 0; n < b.N; n++ {
+ buf.Reset()
+ ctr.WritePrometheus(&buf)
+ }
+ })
+ })
+}