diff options
-rw-r--r-- | go.mod | 1 | ||||
-rw-r--r-- | go.sum | 2 | ||||
-rw-r--r-- | pkg/metricsx/geohash.go | 148 | ||||
-rw-r--r-- | pkg/metricsx/geohash_test.go | 119 |
4 files changed, 270 insertions, 0 deletions
@@ -10,6 +10,7 @@ require ( github.com/jmoiron/sqlx v1.3.5 github.com/klauspost/compress v1.15.12 github.com/mattn/go-sqlite3 v1.14.16 + github.com/mmcloughlin/geohash v0.10.0 github.com/pg9182/ip2x v0.1.1 github.com/rs/zerolog v1.28.0 github.com/spf13/pflag v1.0.5 @@ -26,6 +26,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= +github.com/mmcloughlin/geohash v0.10.0 h1:9w1HchfDfdeLc+jFEf/04D27KP7E2QmpDu52wPbJWRE= +github.com/mmcloughlin/geohash v0.10.0/go.mod h1:oNZxQo5yWJh0eMQEP/8hwQuVx9Z9tjwFUqcTB1SmG0c= github.com/pg9182/ip2x v0.1.1 h1:J++3N8OV+k7zK3xAY5mSZWE73EbwgqIxpzeloL7Zqyk= github.com/pg9182/ip2x v0.1.1/go.mod h1:iJzts7yWZDWUaldqNGFVOZ0MW9uYrGk1hv9R/wTJdNQ= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/pkg/metricsx/geohash.go b/pkg/metricsx/geohash.go new file mode 100644 index 0000000..1073f97 --- /dev/null +++ b/pkg/metricsx/geohash.go @@ -0,0 +1,148 @@ +package metricsx + +import ( + "io" + "strconv" + "strings" + "sync/atomic" + + "github.com/VictoriaMetrics/metrics" + "github.com/mmcloughlin/geohash" +) + +// GeoCounter is like a *metrics.Counter, but split by location using geohashes. +type GeoCounter struct { + level uint + ctr []*metrics.Counter + unk *metrics.Counter + set *metrics.Set + base string + arg string +} + +// NewGeoCounter creates a new GeoCounter writing to metrics in set named name, +// with level chars in the geohash. +func NewGeoCounter(set *metrics.Set, name string, level uint) *GeoCounter { + if h, p := geohash.ConvertStringToInt(strings.Repeat("z", int(level))); h != 1<<(5*level)-1 || p != 5*uint(level) { + panic("wtf") // this shouldn't happen... geohashes are base32, and int encoding is 5 bits per char + } + base, arg := splitName(name) + return &GeoCounter{ + level: level, + ctr: make([]*metrics.Counter, 1<<(5*level)), + unk: set.NewCounter(formatName(base, arg, "geohash", "")), + set: set, + base: base, + arg: arg, + } +} + +// Inc increments the counter for the specified latitude and longitude. +func (c *GeoCounter) Inc(lat, lng float64) { + c.Counter(lat, lng).Inc() +} + +// Set sets the counter for the specified latitude and longitude. +func (c *GeoCounter) Set(lat, lng float64, v uint64) { + c.Counter(lat, lng).Set(v) +} + +// IncUnknown increments the unknown counter. +func (c *GeoCounter) IncUnknown() { + c.unk.Inc() +} + +// SetUnknown sets the unknown counter. +func (c *GeoCounter) SetUnknown(v uint64) { + c.unk.Set(v) +} + +// Counter gets the underlying counter for the specified latitude and longitude. +func (c *GeoCounter) Counter(lat, lng float64) *metrics.Counter { + h := geohash.EncodeIntWithPrecision(lat, lng, c.level*5) + if int(h) >= len(c.ctr) { + return nil // wtf (this shouldn't even be possible, but we don't panic here for performance reasons) + } + m := c.ctr[h] + if m == nil { + m = c.set.NewCounter(formatName(c.base, c.arg, "geohash", geohash.EncodeWithPrecision(lat, lng, c.level))) + c.ctr[h] = m + } + return m +} + +// CounterUnknown gets the underlying counter for unknown positions. +func (c *GeoCounter) CounterUnknown() *metrics.Counter { + return c.unk +} + +// GeoCounter2 is an optimized standalone level 2 geocounter metric. It must not +// be copied (it uses atomics). +type GeoCounter2 struct { + name string + ctr [1 << (5 * 2)]uint64 + unk uint64 +} + +// NewGeoCounter2 creates a new GeoCounter2 with the provided metric name. +// +// Note: The maximum cardinality of metrics produced will be 1024. +func NewGeoCounter2(name string) *GeoCounter2 { + b, a := splitName(name) + n := formatName(b, a, "geohash", "") + if !strings.HasSuffix(n, `geohash=""}`) { + panic("wtf") // should never happen + } + return &GeoCounter2{name: n} +} + +// Inc increments the counter for the specified latitude and longitude. +func (c *GeoCounter2) Inc(lat, lng float64) { + if c != nil { + // this should always be true, but we need it to satisfy the bounds checker + if h := geohash2(lat, lng); h < 1<<(5*2) { + atomic.AddUint64(&c.ctr[h], 1) + } + } +} + +// Set sets the counter for the specified latitude and longitude. +func (c *GeoCounter2) Set(lat, lng float64, v uint64) { + if c != nil { + // this should always be true, but we need it to satisfy the bounds checker + if h := geohash2(lat, lng); h < 1<<(5*2) { + atomic.StoreUint64(&c.ctr[h], 1) + } + } +} + +// IncUnknown increments the unknown counter. +func (c *GeoCounter2) IncUnknown() { + atomic.AddUint64(&c.unk, 1) +} + +// SetUnknown sets the unknown counter. +func (c *GeoCounter2) SetUnknown(v uint64) { + atomic.StoreUint64(&c.unk, v) +} + +// WritePrometheus writes the Promethus text metrics. +func (c *GeoCounter2) WritePrometheus(w io.Writer) { + n := len(c.name) + b := make([]byte, 0, n+2+1+20+1) + b = append(b, c.name...) + w.Write(append(strconv.AppendUint(append(b, ' '), atomic.LoadUint64(&c.unk), 10), '\n')) + b = append(b, `"} `...) + _ = b[n-2] // bounds check hint + for h := uint64(0); h < 1<<(5*2); h++ { + if v := atomic.LoadUint64(&c.ctr[h]); v != 0 { + b[n-1] = "0123456789bcdefghjkmnpqrstuvwxyz"[(h>>0)&0x1f] + b[n-2] = "0123456789bcdefghjkmnpqrstuvwxyz"[(h>>5)&0x1f] + w.Write(append(strconv.AppendUint(b, v, 10), '\n')) + } + } +} + +func geohash2(lat, lng float64) uint64 { + return geohash.EncodeIntWithPrecision(lat, lng, 5*2) +} diff --git a/pkg/metricsx/geohash_test.go b/pkg/metricsx/geohash_test.go new file mode 100644 index 0000000..03cd12e --- /dev/null +++ b/pkg/metricsx/geohash_test.go @@ -0,0 +1,119 @@ +package metricsx + +import ( + "bytes" + "compress/gzip" + "encoding/base64" + "io" + "strings" + "testing" + + "github.com/VictoriaMetrics/metrics" +) + +func TestGeoCounter(t *testing.T) { + var exp string + if d, err := gzip.NewReader(base64.NewDecoder(base64.StdEncoding, strings.NewReader(``+ + `H4sIAAAAAAAA/4zZSW7lVACG0TmriN6YQZzndlCLwb1s2bjvUPaOEPPKmV9dquDg/8vNVq3bP2XdlHXz67WW9VrWrz+b6u/2r7X99Xp9f3z+8fsjn5+v74/4p0PB6/vj/dOhr9f3x9dPh96v74/gp0OR/OMSuSmVP1MmN+VyUyE3VXKokUOd/Hsa5KZJblrkpk0OHXLo`+ + `kkMPHAo+4b9dEMhNYjwQ40Ekh8R4IMYDMR6I8UCMB2I8EONBJ4fEeDDJITEeiPFAjAdiPBDjX5/wf/DXl9wUyk2x3JTKoVwOlXKolkOt/O16uWmUm2a5aZVDuxw65dANh96fckg8vUM5JJ7e4uktnt7i6S2e3q0cEk/vUQ6Jp7d4eount3h6i6fwP0/hT4cCGKBQNjiU`+ + `DQ4juUk2OJQNDmWDQ9ngUDY4lA0OZYPDTv5MssHhJDfJBoeywaFscCgbHMoGR9KZkXRmJMYjMR5JZ0ZiPBLjkRiPxHgkxiMxHonxSDozEuORdGYkxiMxHonxSIxHYjwW47F0QRzKTdIFsXRBLF0QSxfE0gVxK3876YJ4lJukC2Lpgli6IJYuiKULEunMRDwl0pmJeErE`+ + `UyKeEvGUiKdEOjMRT4l0ZiKeEvGUiKdEPCXiKZWfg1N5z0wFXSobnMp7ZiobnIrMVDY4Fb6pbHAqG5zKBqfynpnKBqfynpnKBqeywalscCobnMoGZ/LNzKQzMzGeifFMOjMT45kYz8R4JsYzMZ6J8UyMZ9KZmRjPpDMzMZ6J8UyMZ2I8E+O5fMdz4ZvLe2YuXZCLzFzQ`+ + `5dIFuXRBLu+ZuXRBLu+ZuXRBLl2QSxfk0gW5dEEh38xCPBXSmYV4KsRTIZ4K8VSIp0I6sxBPhXRmIZ4K8VSIp0I8FeKplJ+DS3nPLAVdKRtcyntmKRtcisxSNrgUvqVscCkbXMoGl/KeWcoGl/KeWcoGl7LBpWxwKRtcygZX8s2spDMrMV6J8Uo6sxLjlRivxHglxisx`+ + `XonxSoxX0pmVGK+kMysxXonxSoxXYrwS47V8x2vhW8t7Zi1dUIvMWtDV0gW1dEEt75m1dEEt75m1dEEtXVBLF9TSBbV0QSPfzEY8NdKZjXhqxFMjnhrx1IinRjqzEU+NdGYjnhrx1IinRjw14qmV35u30pmt/E6xlQ1upTNb2eBWfqfYyga38jvFVja4lQ1uZYNb6cxW`+ + `NriVzmxlg1vZ4FY2uJUNbmWDO9ngTjqzE+OdGO+kMzsx3onxTox3YrwT450Y78R4J53ZifFOOrMT450Y78R4J8Y7Md6L8V66oJfO7KULeumCXrqgly7opQt66cxeuqCXzuylC3rpgl66oJcu6KULBunMQTwN0pmDeBrE0yCeBvE0iKdBOnMQT4N05iCeBvE0iKdBPA3i`+ + `aZTOHKUzR9ngUTZ4lM4cZYNH2eBRNniUDR5lg0fZ4FE2eJTOHGWDR+nMUTZ4lA0eZYNH2eBRNniSDZ6kMycxPonxSTpzEuOTGJ/E+CTGJzE+ifFJjE/SmZMYn6QzJzE+ifFJjE9ifBLjsxifpQtm6cxZumCWLpilC2bpglm6YJbOnKULZunMWbpgli6YpQtm6YJZumCR`+ + `zlzE0yKduYinRTwt4mkRT4t4WqQzF/G0SGcu4mkRT4t4WsTTIp5W+T6t0pmroFtlg1fpzFU2eBWZq2zwKnxX2eBVNniVDV6lM1fZ4FU6c5UNXmWDV9ngVTZ4lQ3e5Ju5SWduYnwT45t05ibGNzG+ifFNjG9ifBPjmxjfpDM3Mb5JZ25ifBPjmxjfxPgmxnf5ju/Cd5fO`+ + `3KULdpG5C7pdumCXLtilM3fpgl06c5cu2KULdumCXbpgly445Jt5iKdDOvMQT4d4OsTTIZ4O8XRIZx7i6ZDOPMTTIZ4O8XSIp0M8nfJ9OqUzT0F3ygaf0pmnbPApMk/Z4FP4nrLBp2zwKRt8SmeessGndOYpG3zKBp+ywads8CkbfMk385LOvMT4JcYv6cxLjF9i/BLj`+ + `lxi/xPglxi8xfklnXmL8ks68xPglxi8xfonxS4zf8h2/he8tnXlLF9wi8xZ0t3TBLV1wS2fe0gW3dOYtXXBLF9zSBbd0wS1d8Mg38xFPj3TmI54e8fSIp0c8PeLpkc58xNMjnfmIp0c8PeLpEU/P/57+DQAA//8lgllOY1MAAA==`, + ))); err != nil { + panic(err) + } else if x, err := io.ReadAll(d); err != nil { + panic(err) + } else { + exp = strings.TrimSpace(string(x)) + } + + set := metrics.NewSet() + name := `test{dfgdfg="sdfsdf"}` + gc1 := NewGeoCounter(set, name, 2) + gc2 := NewGeoCounter2(name) + + for lat := float64(-90); lat <= 90; lat += 10 { + for lng := float64(-180); lng <= 180; lng += 10 { + gc1.Inc(lat, lng) + gc2.Inc(lat, lng) + } + } + + var b1 strings.Builder + set.WritePrometheus(&b1) + + var b2 strings.Builder + gc2.WritePrometheus(&b2) + + t.Run("GeoCounter", func(t *testing.T) { + if a := strings.TrimSpace(b1.String()); a != exp { + t.Errorf("expected:\n\t%s\n, got\n\t%s", strings.ReplaceAll(exp, "\n", "\n\t"), strings.ReplaceAll(a, "\n", "\n\t")) + } + }) + + t.Run("GeoCounter2", func(t *testing.T) { + if a := strings.TrimSpace(b2.String()); a != exp { + t.Errorf("expected:\n\t%s\n, got\n\t%s", strings.ReplaceAll(exp, "\n", "\n\t"), strings.ReplaceAll(a, "\n", "\n\t")) + } + }) +} + +func BenchmarkGeoCounter2(b *testing.B) { + var pts [][2]float64 + for lat := float64(-90); lat <= 90; lat += 10 { + for lng := float64(-180); lng <= 180; lng += 10 { + pts = append(pts, [2]float64{lat, lng}) + } + } + + b.Run("GeoCounter", func(b *testing.B) { + set := metrics.NewSet() + ctr := NewGeoCounter(set, `test{dfgdfg="sdfsdf"}`, 2) + + b.Run("Inc", func(b *testing.B) { + for n := 0; n < b.N; n++ { + pt := pts[n%len(pts)] + ctr.Inc(pt[0], pt[1]) + } + }) + + b.Run("WritePrometheus", func(b *testing.B) { + var buf bytes.Buffer + set.WritePrometheus(&buf) + b.ResetTimer() + + for n := 0; n < b.N; n++ { + buf.Reset() + set.WritePrometheus(&buf) + } + }) + }) + + b.Run("GeoCounter2", func(b *testing.B) { + ctr := NewGeoCounter2(`test{dfgdfg="sdfsdf"}`) + + b.Run("Inc", func(b *testing.B) { + for n := 0; n < b.N; n++ { + pt := pts[n%len(pts)] + ctr.Inc(pt[0], pt[1]) + } + }) + + b.Run("WritePrometheus", func(b *testing.B) { + var buf bytes.Buffer + ctr.WritePrometheus(&buf) + b.ResetTimer() + + for n := 0; n < b.N; n++ { + buf.Reset() + ctr.WritePrometheus(&buf) + } + }) + }) +} |