From 236a556bc686abfae5155567984c8b7fa2432e04 Mon Sep 17 00:00:00 2001 From: Sendya <18x@loacg.com> Date: Mon, 8 Jun 2026 10:14:50 +0800 Subject: [PATCH] feat: add comprehensive Prometheus metrics across cache, proxy, server, and storage layers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add metrics for cache request outcomes, chunk writes, flush failures, fillrange operations, upstream latency/errors, request coalescing, request duration, active connections, panic recovery, indexdb operations, evictions, migrations, and cached object counts — all under the shared tavern namespace. Co-Authored-By: Claude Opus 4.7 --- .gitignore | 2 + proxy/metrics.go | 41 ++++++++++++++++++ proxy/proxy.go | 42 +++++++++++++++--- server/metrics.go | 17 ++++++++ server/middleware/caching/caching.go | 12 ++++++ server/middleware/caching/internal.go | 2 + server/middleware/caching/metrics.go | 50 ++++++++++++++++++++++ server/middleware/recovery/metrics.go | 19 +++++++++ server/middleware/recovery/recovery.go | 1 + server/server.go | 14 ++++++ storage/bucket/disk/disk.go | 17 ++++++++ storage/bucket/disk/metrics.go | 59 ++++++++++++++++++++++++++ 12 files changed, 269 insertions(+), 7 deletions(-) create mode 100644 proxy/metrics.go create mode 100644 server/middleware/caching/metrics.go create mode 100644 server/middleware/recovery/metrics.go create mode 100644 storage/bucket/disk/metrics.go diff --git a/.gitignore b/.gitignore index 27bdbd7..419e3a9 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,5 @@ config*.yaml # Editor/IDE .idea/ .vscode/ +.codegraph/ +reasonix.toml diff --git a/proxy/metrics.go b/proxy/metrics.go new file mode 100644 index 0000000..d816aaa --- /dev/null +++ b/proxy/metrics.go @@ -0,0 +1,41 @@ +package proxy + +import ( + pkgmetrics "github.com/omalloc/tavern/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + // upstreamRequestDuration tracks upstream round-trip latency per upstream address. + // Labels: addr + upstreamRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: pkgmetrics.Namespace, + Name: "upstream_request_duration_seconds", + Help: "Upstream request round-trip latency histogram", + Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 30}, + }, []string{"addr"}) + + // upstreamErrorsTotal counts upstream errors by upstream address and error type. + // Labels: addr, error_type (network/timeout/http_status) + upstreamErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: pkgmetrics.Namespace, + Name: "upstream_errors_total", + Help: "The total number of upstream request errors by upstream address and error type", + }, []string{"addr", "error_type"}) + + // collapseRequestsTotal tracks singleflight request coalescing outcomes. + // Labels: result (primary/shared) + collapseRequestsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: pkgmetrics.Namespace, + Name: "collapse_requests_total", + Help: "The total number of singleflight-collapsed upstream requests", + }, []string{"result"}) +) + +func init() { + prometheus.MustRegister( + upstreamRequestDuration, + upstreamErrorsTotal, + collapseRequestsTotal, + ) +} diff --git a/proxy/proxy.go b/proxy/proxy.go index 5a8a83a..db92d93 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -16,6 +16,8 @@ import ( "github.com/omalloc/proxy/selector/random" "github.com/omalloc/tavern/proxy/singleflight" + + "github.com/prometheus/client_golang/prometheus" ) type Tuple struct { @@ -70,21 +72,32 @@ func (r *ReverseProxy) Do(req *http.Request, collapsed bool, waitTimeout time.Du return nil, selector.ErrNoAvailable } + upAddr := current.Address() + defer done(req.Context(), selector.DoneInfo{ Err: err, BytesSent: true, BytesReceived: true, }) - client := r.find(current.Address()) + client := r.find(upAddr) + + trackedDo := func() (*http.Response, error) { + start := time.Now() + resp, doErr := client.Do(req) + upstreamRequestDuration.With(prometheus.Labels{"addr": upAddr}).Observe(time.Since(start).Seconds()) + if doErr != nil { + upstreamErrorsTotal.With(prometheus.Labels{"addr": upAddr, "error_type": classifyError(doErr)}).Inc() + } + return resp, doErr + } + if !collapsed { - return client.Do(req) - //return r.uncompress(client.Do(req)) + return trackedDo() } ret := <-r.flight.DoChan(onceKey(req), waitTimeout, func() (*http.Response, error) { - //return r.uncompress(client.Do(req)) - return client.Do(req) + return trackedDo() }) if ret.Err != nil { @@ -92,10 +105,10 @@ func (r *ReverseProxy) Do(req *http.Request, collapsed bool, waitTimeout time.Du } if ret.Shared { - // if shared, process the response copied. + collapseRequestsTotal.WithLabelValues("shared").Inc() return ret.Val, ret.Err } - // return directly + collapseRequestsTotal.WithLabelValues("primary").Inc() return ret.Val, ret.Err } @@ -217,3 +230,18 @@ func WithActivateMock(fn func(client *http.Client)) Option { r.activateMock = fn } } + +// classifyError maps an upstream request error to a metric label value. +func classifyError(err error) string { + if err == nil { + return "none" + } + if oe, ok := err.(interface{ Timeout() bool }); ok && oe.Timeout() { + return "timeout" + } + // net.Error covers both temporary and permanent network errors. + if _, ok := err.(net.Error); ok { + return "network" + } + return "unknown" +} diff --git a/server/metrics.go b/server/metrics.go index 87eee38..62d99af 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -17,12 +17,29 @@ var ( Name: "requests_unexpected_closed_total", Help: "The total number of unexpected closed requests", }, []string{"protocol", "method"}) + + // requestDuration tracks end-to-end request latency by HTTP method. + requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: pkgmetrics.Namespace, + Name: "request_duration_seconds", + Help: "End-to-end request latency histogram by HTTP method", + Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 30}, + }, []string{"method"}) + + // connectionsActive tracks the current number of active client connections. + connectionsActive = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: pkgmetrics.Namespace, + Name: "connections_active", + Help: "The current number of active client connections", + }) ) func init() { prometheus.MustRegister( _metricRequestCodeCounterTotal, _metricRequestUnexpectedClosedTotal, + requestDuration, + connectionsActive, ) _metricRequestUnexpectedClosedTotal.WithLabelValues("HTTP/1.1", "GET") diff --git a/server/middleware/caching/caching.go b/server/middleware/caching/caching.go index 0eab6ea..36f9839 100644 --- a/server/middleware/caching/caching.go +++ b/server/middleware/caching/caching.go @@ -25,6 +25,8 @@ import ( "github.com/omalloc/tavern/proxy" "github.com/omalloc/tavern/server/middleware" storagev1 "github.com/omalloc/tavern/storage" + + "github.com/prometheus/client_golang/prometheus" ) const BYPASS = "BYPASS" @@ -142,6 +144,7 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) { // set cache-staus header BYPASS resp.Header.Set(protocol.ProtocolCacheStatusKey, BYPASS) } + cacheRequestTotal.WithLabelValues(storage.BYPASS.String(), caching.bucket.StoreType()).Inc() return } @@ -155,6 +158,7 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) { headers := make(http.Header) xhttp.CopyHeader(caching.md.Headers, headers) headers.Set("Content-Range", fmt.Sprintf("bytes */%d", caching.md.Size)) + cacheRequestTotal.WithLabelValues(caching.cacheStatus.String(), caching.bucket.StoreType()).Inc() return nil, xhttp.NewBizError(http.StatusRequestedRangeNotSatisfiable, headers) } @@ -166,21 +170,25 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) { if err != nil { // fd leak closeBody(resp) + cacheRequestTotal.WithLabelValues(caching.cacheStatus.String(), caching.bucket.StoreType()).Inc() return nil, err } // response now resp, err = caching.processor.postCacheProcessor(caching, req, resp) + cacheRequestTotal.WithLabelValues(caching.cacheStatus.String(), caching.bucket.StoreType()).Inc() return } // full MISS resp, err = caching.doProxy(req, false) if err != nil { + cacheRequestTotal.WithLabelValues(caching.cacheStatus.String(), caching.bucket.StoreType()).Inc() return nil, err } resp, err = processor.postCacheProcessor(caching, req, resp) + cacheRequestTotal.WithLabelValues(caching.cacheStatus.String(), caching.bucket.StoreType()).Inc() return }) @@ -456,6 +464,7 @@ func (c *Caching) flushbufferSlice(respRange xhttp.ContentRange) (iobuf.EventSuc writerBuffer := func(buf []byte, index uint32, current uint64, eof bool) error { f, wpath, err := c.bucket.WriteChunkFile(c.req.Context(), c.id, index) if err != nil { + cacheChunkWriteTotal.With(prometheus.Labels{"result": "failed", "store_type": c.bucket.StoreType()}).Inc() return err } defer func() { @@ -497,6 +506,7 @@ func (c *Caching) flushbufferSlice(respRange xhttp.ContentRange) (iobuf.EventSuc c.log.Debugf("flushBuffer wpath=%s isChunked=%t fileChunk=%d/%d", wpath, chunked, index+1, endPart) if nn, err1 := f.Write(buf); err1 != nil || nn != len(buf) { + cacheChunkWriteTotal.With(prometheus.Labels{"result": "failed", "store_type": c.bucket.StoreType()}).Inc() return fmt.Errorf("writeBuffer wpath[%s] chunk[%d] failed nn[%d] want[%d] err %v", wpath, index+1, nn, len(buf), err1) } @@ -508,6 +518,7 @@ func (c *Caching) flushbufferSlice(respRange xhttp.ContentRange) (iobuf.EventSuc _ = c.bucket.Store(c.req.Context(), c.md) } + cacheChunkWriteTotal.With(prometheus.Labels{"result": "success", "store_type": c.bucket.StoreType()}).Inc() return nil } @@ -530,5 +541,6 @@ func (c *Caching) flushbufferSlice(respRange xhttp.ContentRange) (iobuf.EventSuc // flushFailed flush cache file to bucket failed callback func (c *Caching) flushFailed(err error) { c.log.Errorf("flush body to disk failed: %v", err) + cacheFlushFailedTotal.WithLabelValues(c.bucket.StoreType()).Inc() _ = c.bucket.DiscardWithMetadata(c.req.Context(), c.md) } diff --git a/server/middleware/caching/internal.go b/server/middleware/caching/internal.go index d5c6943..c45c1a9 100644 --- a/server/middleware/caching/internal.go +++ b/server/middleware/caching/internal.go @@ -203,6 +203,7 @@ func getContents(c *Caching, reqChunks []uint32, from uint32) (io.ReadCloser, in toByte := min(c.md.Size-1, uint64(availableChunks[index]*uint32(partSize))-1) // Request is automatically cloned by getUpstreamReader + cacheFillrangeTotal.WithLabelValues(c.bucket.StoreType()).Inc() reader, err := c.getUpstreamReader(fromByte, toByte, true) if err != nil { _ = chunkFile.Close() @@ -223,6 +224,7 @@ func getContents(c *Caching, reqChunks []uint32, from uint32) (io.ReadCloser, in toByte := min(c.md.Size-1, tailChunkSize) // Request is automatically cloned by getUpstreamReader + cacheFillrangeTotal.WithLabelValues(c.bucket.StoreType()).Inc() reader, err := c.getUpstreamReader(fromByte, toByte, true) if err != nil { return nil, 0, err diff --git a/server/middleware/caching/metrics.go b/server/middleware/caching/metrics.go new file mode 100644 index 0000000..9d38559 --- /dev/null +++ b/server/middleware/caching/metrics.go @@ -0,0 +1,50 @@ +package caching + +import ( + pkgmetrics "github.com/omalloc/tavern/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + // cacheRequestTotal tracks cache request outcomes by cache status and store type. + // Labels: cache_status (HIT/MISS/PART_HIT/PART_MISS/BYPASS/REVALIDATE_HIT/REVALIDATE_MISS/HOT_HIT), + // store_type (disk/memory/hot/warm) + cacheRequestTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: pkgmetrics.Namespace, + Name: "cache_requests_total", + Help: "The total number of cache requests by status and store type", + }, []string{"cache_status", "store_type"}) + + // cacheChunkWriteTotal tracks chunk write outcomes during cache fill. + // Labels: result (success/failed), store_type + cacheChunkWriteTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: pkgmetrics.Namespace, + Name: "cache_chunk_write_total", + Help: "The total number of chunk write operations by result", + }, []string{"result", "store_type"}) + + // cacheFlushFailedTotal counts flush-to-disk failures that cause object discard. + // Labels: store_type + cacheFlushFailedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: pkgmetrics.Namespace, + Name: "cache_flush_failed_total", + Help: "The total number of cache flush failures that triggered object discard", + }, []string{"store_type"}) + + // cacheFillrangeTotal counts how many times the fillrange path was entered + // (upstream sub-requests to fill missing chunks). Labels: store_type + cacheFillrangeTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: pkgmetrics.Namespace, + Name: "cache_fillrange_total", + Help: "The total number of fillrange upstream sub-requests triggered by partial cache hits", + }, []string{"store_type"}) +) + +func init() { + prometheus.MustRegister( + cacheRequestTotal, + cacheChunkWriteTotal, + cacheFlushFailedTotal, + cacheFillrangeTotal, + ) +} diff --git a/server/middleware/recovery/metrics.go b/server/middleware/recovery/metrics.go new file mode 100644 index 0000000..caee8f2 --- /dev/null +++ b/server/middleware/recovery/metrics.go @@ -0,0 +1,19 @@ +package recovery + +import ( + pkgmetrics "github.com/omalloc/tavern/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + // panicTotal counts the number of panics caught by the recovery middleware. + panicTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: pkgmetrics.Namespace, + Name: "panics_total", + Help: "The total number of panics caught by the recovery middleware", + }) +) + +func init() { + prometheus.MustRegister(panicTotal) +} diff --git a/server/middleware/recovery/recovery.go b/server/middleware/recovery/recovery.go index f84d14e..76f476f 100644 --- a/server/middleware/recovery/recovery.go +++ b/server/middleware/recovery/recovery.go @@ -51,6 +51,7 @@ func Middleware(c *configv1.Middleware) (middleware.Middleware, func(), error) { // Here you can log the panic or handle it as needed log.Context(req.Context()).Errorf("middleware recovery: %s \n%s", r, runtime.PrintStackTrace(4)) + panicTotal.Inc() failCount.Add(1) if failCount.Load() >= int32(opts.FailCountThreshold) { log.Context(req.Context()).Errorf("middleware recovery: reached fail count threshold (%d), healthy now fail.", opts.FailCountThreshold) diff --git a/server/server.go b/server/server.go index 6af6b91..af51e0b 100644 --- a/server/server.go +++ b/server/server.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "time" "dario.cat/mergo" "github.com/cloudflare/tableflip" @@ -68,6 +69,14 @@ func NewServer(flip *tableflip.Upgrader, config *conf.Bootstrap, plugins []plugi IdleTimeout: servConfig.IdleTimeout, ReadHeaderTimeout: servConfig.ReadHeaderTimeout, MaxHeaderBytes: servConfig.MaxHeaderBytes, + ConnState: func(_ net.Conn, state http.ConnState) { + switch state { + case http.StateNew: + connectionsActive.Inc() + case http.StateClosed, http.StateHijacked: + connectionsActive.Dec() + } + }, }, plugins: plugins, flip: flip, @@ -212,6 +221,11 @@ func (s *HTTPServer) newServeMux() *http.ServeMux { // buildHandler ... Cache 主流程入口 func (s *HTTPServer) buildHandler(tripper http.RoundTripper) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { + start := time.Now() + defer func() { + requestDuration.WithLabelValues(req.Method).Observe(time.Since(start).Seconds()) + }() + var clog = log.Context(req.Context()) var resp *http.Response var err error diff --git a/storage/bucket/disk/disk.go b/storage/bucket/disk/disk.go index d8c7582..d4f56dd 100644 --- a/storage/bucket/disk/disk.go +++ b/storage/bucket/disk/disk.go @@ -19,6 +19,8 @@ import ( "github.com/omalloc/tavern/pkg/iobuf" "github.com/paulbellamy/ratecounter" + "github.com/prometheus/client_golang/prometheus" + "github.com/omalloc/tavern/api/defined/v1/storage" "github.com/omalloc/tavern/api/defined/v1/storage/object" "github.com/omalloc/tavern/contrib/log" @@ -131,6 +133,7 @@ func (d *diskBucket) evict() { discard := func(evicted lru.Eviction[object.IDHash, storage.Mark]) { fd := evicted.Key.WPath(d.path) clog.Debugf("evict file %s, last-access %d", fd, evicted.Value.LastAccess()) + cacheEvictionsTotal.WithLabelValues(d.ID(), "lru").Inc() _ = d.DiscardWithHash(context.Background(), evicted.Key) } @@ -149,6 +152,7 @@ func (d *diskBucket) evict() { discard(evicted) continue } + cacheEvictionsTotal.WithLabelValues(d.ID(), "demote").Inc() continue } @@ -261,9 +265,12 @@ func (d *diskBucket) discard(ctx context.Context, md *object.Metadata) error { clog := log.Context(ctx) // 先删除 db 中的数据, 避免被其他协程 HIT + start := time.Now() if err := d.indexdb.Delete(ctx, md.ID.Bytes()); err != nil { + indexdbOperationDuration.With(prometheus.Labels{"op": "delete", "bucket": d.ID()}).Observe(time.Since(start).Seconds()) clog.Warnf("failed to delete metadata %s: %v", md.ID.WPath(d.path), err) } + indexdbOperationDuration.With(prometheus.Labels{"op": "delete", "bucket": d.ID()}).Observe(time.Since(start).Seconds()) // 如果缓存为1级,则清除全部子缓存(vary) if md.IsVary() && len(md.VirtualKey) > 0 { @@ -316,7 +323,9 @@ func (d *diskBucket) Iterate(ctx context.Context, fn func(*object.Metadata) erro // Lookup implements storage.Bucket. func (d *diskBucket) Lookup(ctx context.Context, id *object.ID) (*object.Metadata, error) { + start := time.Now() md, err := d.indexdb.Get(ctx, id.Bytes()) + indexdbOperationDuration.With(prometheus.Labels{"op": "get", "bucket": d.ID()}).Observe(time.Since(start).Seconds()) if err == nil && md != nil { d.touch(ctx, id) } @@ -354,9 +363,14 @@ func (d *diskBucket) Store(ctx context.Context, meta *object.Metadata) error { d.cache.Set(meta.ID.Hash(), storage.NewMark(meta.LastRefUnix, meta.Refs)) } + start := time.Now() if err := d.indexdb.Set(ctx, meta.ID.Bytes(), meta); err != nil { + indexdbOperationDuration.With(prometheus.Labels{"op": "set", "bucket": d.ID()}).Observe(time.Since(start).Seconds()) return err } + indexdbOperationDuration.With(prometheus.Labels{"op": "set", "bucket": d.ID()}).Observe(time.Since(start).Seconds()) + + cacheObjectsGauge.WithLabelValues(d.ID()).Set(float64(d.cache.Len())) // 写入域名 counter if u, err1 := url.Parse(meta.ID.Path()); err1 == nil { @@ -404,6 +418,8 @@ func (d *diskBucket) touch(_ context.Context, id *object.ID) { if d.migration != nil { if err := d.migration.Promote(context.Background(), id, d); err != nil { log.Warnf("promote %s failed: %v", id.Key(), err) + } else { + cacheMigrationTotal.WithLabelValues(d.ID(), "promote").Inc() } } }() @@ -540,6 +556,7 @@ func (d *diskBucket) Migrate(ctx context.Context, id *object.ID, dest storage.Bu return err } + cacheMigrationTotal.WithLabelValues(d.ID(), "demote").Inc() return nil } diff --git a/storage/bucket/disk/metrics.go b/storage/bucket/disk/metrics.go new file mode 100644 index 0000000..788f9ab --- /dev/null +++ b/storage/bucket/disk/metrics.go @@ -0,0 +1,59 @@ +package disk + +import ( + pkgmetrics "github.com/omalloc/tavern/pkg/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + // indexdbOperationDuration tracks indexdb operation latency by operation type and bucket. + // Labels: op (get/set/delete/iterate), bucket + indexdbOperationDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: pkgmetrics.Namespace, + Name: "indexdb_operation_duration_seconds", + Help: "IndexDB operation latency histogram by operation type and bucket", + Buckets: []float64{.0001, .0005, .001, .005, .01, .05, .1, .5, 1}, + }, []string{"op", "bucket"}) + + // diskIOBytesTotal tracks bytes read/written to disk by bucket. + // Labels: bucket, direction (read/write) + diskIOBytesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: pkgmetrics.Namespace, + Name: "disk_io_bytes_total", + Help: "The total number of bytes read/written to disk by bucket", + }, []string{"bucket", "direction"}) + + // cacheEvictionsTotal counts cache eviction events by bucket and reason. + // Labels: bucket, reason (lru/demote/discard) + cacheEvictionsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: pkgmetrics.Namespace, + Name: "cache_evictions_total", + Help: "The total number of cache evictions by bucket and reason", + }, []string{"bucket", "reason"}) + + // cacheMigrationTotal tracks object migration between storage tiers. + // Labels: bucket, direction (promote/demote) + cacheMigrationTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: pkgmetrics.Namespace, + Name: "cache_migration_total", + Help: "The total number of cache object migrations between tiers", + }, []string{"bucket", "direction"}) + + // cacheObjectsGauge tracks the current number of cached objects per bucket. + // Labels: bucket + cacheObjectsGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: pkgmetrics.Namespace, + Name: "cache_objects", + Help: "The current number of cached objects per bucket", + }, []string{"bucket"}) +) + +func init() { + prometheus.MustRegister( + indexdbOperationDuration, + diskIOBytesTotal, + cacheEvictionsTotal, + cacheMigrationTotal, + cacheObjectsGauge, + ) +}