diff --git a/CHANGELOG.md b/CHANGELOG.md index 7078c57..2b27c34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,8 +15,14 @@ The following emojis are used to highlight certain changes: ### Added +- `--records-limit` / `SOMEGUY_RECORDS_LIMIT` (default `100`) caps results for `Accept: application/json`, matching the SHOULD-cap in [HTTP Routing v1 §4.1.5](https://specs.ipfs.tech/routing/http-routing-v1/). +- `--streaming-records-limit` / `SOMEGUY_STREAMING_RECORDS_LIMIT` (default `1000`) caps results for `Accept: application/x-ndjson`. Set to `0` to disable the cap. + ### Changed +- ✨ JSON responses return up to 100 providers (previously 20). NDJSON streams up to 1000 results (previously unbounded). Both caps are tunable via the new flags. +- `cachedRouter.FindProviders` now over-fetches by 3x (cap 3000) so that, after `cacheFallbackIter` drops records without addresses, the surviving count is close to the caller's limit. Before, the upstream walk's hard ceiling pre-filtered records and JSON callers received far fewer providers than they asked for. + ### Removed ### Fixed diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 71ec26b..4542f43 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -8,6 +8,8 @@ The environment variables below override `someguy`'s built-in defaults. - [`SOMEGUY_CACHED_ADDR_BOOK`](#someguy_cached_addr_book) - [`SOMEGUY_CACHED_ADDR_BOOK_RECENT_TTL`](#someguy_cached_addr_book_recent_ttl) - [`SOMEGUY_CACHED_ADDR_BOOK_ACTIVE_PROBING`](#someguy_cached_addr_book_active_probing) + - [`SOMEGUY_RECORDS_LIMIT`](#someguy_records_limit) + - [`SOMEGUY_STREAMING_RECORDS_LIMIT`](#someguy_streaming_records_limit) - [`SOMEGUY_PROVIDER_ENDPOINTS`](#someguy_provider_endpoints) - [`SOMEGUY_PEER_ENDPOINTS`](#someguy_peer_endpoints) - [`SOMEGUY_IPNS_ENDPOINTS`](#someguy_ipns_endpoints) @@ -63,6 +65,18 @@ Enables active probing of cached peers to keep their multiaddrs up to date. Appl Default: `true` +### `SOMEGUY_RECORDS_LIMIT` + +Maximum providers or peers returned per `Accept: application/json` request. [HTTP Routing v1 §4.1.5](https://specs.ipfs.tech/routing/http-routing-v1/) recommends `100`. Set to `0` to disable the cap. + +Default: `100` + +### `SOMEGUY_STREAMING_RECORDS_LIMIT` + +Maximum providers or peers returned per `Accept: application/x-ndjson` request. Sits above `SOMEGUY_RECORDS_LIMIT` so streaming returns more results, while bounding per-connection cost (DHT walk, addr-book probing, deduplication). Set to `0` to disable the cap. + +Default: `1000` + ### `SOMEGUY_PROVIDER_ENDPOINTS` Comma-separated list of [Delegated Routing V1](https://specs.ipfs.tech/routing/http-routing-v1/) endpoints for provider lookups. diff --git a/main.go b/main.go index b20a304..17cedc0 100644 --- a/main.go +++ b/main.go @@ -60,6 +60,18 @@ func main() { EnvVars: []string{"SOMEGUY_CACHED_ADDR_BOOK_RECENT_TTL"}, Usage: "TTL for recently connected peers' multiaddrs in the cached address book", }, + &cli.IntFlag{ + Name: "records-limit", + Value: DefaultRecordsLimit, + EnvVars: []string{"SOMEGUY_RECORDS_LIMIT"}, + Usage: "maximum providers or peers per `Accept: application/json` request (HTTP Routing v1 section 4.1.5 recommends 100; 0 disables the cap)", + }, + &cli.IntFlag{ + Name: "streaming-records-limit", + Value: DefaultStreamingRecordsLimit, + EnvVars: []string{"SOMEGUY_STREAMING_RECORDS_LIMIT"}, + Usage: "maximum providers or peers per `Accept: application/x-ndjson` request (0 disables the cap)", + }, &cli.StringSliceFlag{ Name: "provider-endpoints", Value: cli.NewStringSlice(autoconf.AutoPlaceholder), @@ -179,12 +191,22 @@ func main() { }, }, Action: func(ctx *cli.Context) error { + recordsLimit := ctx.Int("records-limit") + if recordsLimit < 0 { + return fmt.Errorf("records-limit must be non-negative, got %d (0 means unbounded)", recordsLimit) + } + streamingRecordsLimit := ctx.Int("streaming-records-limit") + if streamingRecordsLimit < 0 { + return fmt.Errorf("streaming-records-limit must be non-negative, got %d (0 means unbounded)", streamingRecordsLimit) + } cfg := &config{ listenAddress: ctx.String("listen-address"), dhtType: ctx.String("dht"), cachedAddrBook: ctx.Bool("cached-addr-book"), cachedAddrBookActiveProbing: ctx.Bool("cached-addr-book-active-probing"), cachedAddrBookRecentTTL: ctx.Duration("cached-addr-book-recent-ttl"), + recordsLimit: recordsLimit, + streamingRecordsLimit: streamingRecordsLimit, contentEndpoints: ctx.StringSlice("provider-endpoints"), peerEndpoints: ctx.StringSlice("peer-endpoints"), diff --git a/server.go b/server.go index 518974b..294485c 100644 --- a/server.go +++ b/server.go @@ -74,12 +74,27 @@ func withRequestLogger(next http.Handler) http.Handler { }) } +const ( + // DefaultRecordsLimit caps results for `Accept: application/json` + // requests. Matches the SHOULD-cap from HTTP Routing v1 section 4.1.5. + DefaultRecordsLimit = 100 + // DefaultStreamingRecordsLimit caps results for `Accept: + // application/x-ndjson` requests. Sits above the JSON cap so streaming + // returns "more results" per HTTP Routing v1 section 4.1.5, while + // bounding per-connection cost: DHT walk, addr-book probing, and + // peer-record deduplication. Set SOMEGUY_STREAMING_RECORDS_LIMIT=0 to + // disable the cap. + DefaultStreamingRecordsLimit = 1000 +) + type config struct { listenAddress string dhtType string cachedAddrBook bool cachedAddrBookActiveProbing bool cachedAddrBookRecentTTL time.Duration + recordsLimit int + streamingRecordsLimit int contentEndpoints []string peerEndpoints []string @@ -223,6 +238,8 @@ func start(ctx context.Context, cfg *config) error { handlerOpts := []server.Option{ server.WithPrometheusRegistry(prometheus.DefaultRegisterer), + server.WithRecordsLimit(cfg.recordsLimit), + server.WithStreamingRecordsLimit(cfg.streamingRecordsLimit), } handler := server.Handler(&composableRouter{ diff --git a/server_cached_router.go b/server_cached_router.go index 7edc0b7..d97719c 100644 --- a/server_cached_router.go +++ b/server_cached_router.go @@ -46,6 +46,19 @@ const ( addrQueryOriginUnknown = "unknown" DispatchedFindPeersTimeout = time.Minute + + // cacheFallbackOverfetchMultiplier sizes the over-fetch from the + // underlying router so that, after [cacheFallbackIter] drops records + // without multiaddrs, enough survive to meet the caller's limit. + // Empirically derived: at delegated-ipfs.dev, a JSON limit of 20 + // produced about 4 surfaced results, so roughly 1 in 3 records + // reaches the client. + cacheFallbackOverfetchMultiplier = 3 + // cacheFallbackOverfetchMax caps the over-fetched limit so a large + // caller-side limit cannot blow up the DHT walk. Sized at 3x + // DefaultStreamingRecordsLimit so the multiplier applies on the + // streaming path too. The routing timeout bounds wall-clock. + cacheFallbackOverfetchMax = 3000 ) // cachedRouter wraps a router with the cachedAddrBook to retrieve cached addresses for peers without multiaddrs in FindProviders @@ -60,13 +73,32 @@ func NewCachedRouter(router router, cab *cachedAddrBook) cachedRouter { } func (r cachedRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { - it, err := r.router.FindProviders(ctx, key, limit) + // Over-fetch from the underlying router: cacheFallbackIter drops + // records whose Addrs are empty and uncached. Without over-fetching, + // the source iterator's hard ceiling pre-filters records and the + // post-filter count falls short of `limit`. + overfetch := overfetchLimit(limit) + it, err := r.router.FindProviders(ctx, key, overfetch) if err != nil { return nil, err } - iter := NewCacheFallbackIter(it, r, ctx, addrQueryOriginProviders) - return iter, nil + fallback := NewCacheFallbackIter(it, r, ctx, addrQueryOriginProviders) + if limit <= 0 { + return fallback, nil + } + return newLimitedIter(fallback, limit), nil +} + +// overfetchLimit returns the count to pass to the underlying router so +// that, after cacheFallbackIter drops records without addresses, up to +// `limit` results reach the caller. A non-positive `limit` means +// unbounded and returns 0, matching the boxo convention. +func overfetchLimit(limit int) int { + if limit <= 0 { + return 0 + } + return min(limit*cacheFallbackOverfetchMultiplier, cacheFallbackOverfetchMax) } // FindPeers uses a simpler approach than FindProviders because we're dealing with a single PeerRecord, and there's @@ -304,3 +336,37 @@ func (it *cacheFallbackIter) dispatchFindPeer(record types.PeerRecord) { sendResult(record) // pass back the record with no addrs } } + +// limitedIter caps an [iter.ResultIter] at the first `limit` successful +// (non-error) values. Errors pass through and do not count toward the +// limit. Close cascades to the wrapped iterator. +type limitedIter[T any] struct { + inner iter.ResultIter[T] + limit int + count int +} + +func newLimitedIter[T any](inner iter.ResultIter[T], limit int) *limitedIter[T] { + return &limitedIter[T]{inner: inner, limit: limit} +} + +func (l *limitedIter[T]) Next() bool { + if l.count >= l.limit { + return false + } + if !l.inner.Next() { + return false + } + if l.inner.Val().Err == nil { + l.count++ + } + return true +} + +func (l *limitedIter[T]) Val() iter.Result[T] { + return l.inner.Val() +} + +func (l *limitedIter[T]) Close() error { + return l.inner.Close() +} diff --git a/server_cached_router_test.go b/server_cached_router_test.go index e43b703..1be71f2 100644 --- a/server_cached_router_test.go +++ b/server_cached_router_test.go @@ -2,6 +2,8 @@ package main import ( "context" + "errors" + "fmt" "testing" "time" @@ -62,7 +64,8 @@ func TestCachedRouter(t *testing.T) { mockIter := newMockResultIter([]iter.Result[types.Record]{ {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil}}, }) - mr.On("FindProviders", mock.Anything, c, 10).Return(mockIter, nil) + // cachedRouter over-fetches by cacheFallbackOverfetchMultiplier to absorb addr-less drops. + mr.On("FindProviders", mock.Anything, c, overfetchLimit(10)).Return(mockIter, nil) // Create cached address book with test addresses cab, err := newCachedAddrBook() @@ -489,3 +492,163 @@ func TestCacheFallbackIter(t *testing.T) { }) } + +func TestOverfetchLimit(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + limit int + want int + }{ + {"zero (unbounded) stays zero", 0, 0}, + {"negative treated as unbounded", -1, 0}, + {"small limit multiplied", 1, cacheFallbackOverfetchMultiplier}, + {"spec-default of 100 -> 300", 100, 300}, + {"limit beyond cap is clamped", 10000, cacheFallbackOverfetchMax}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.want, overfetchLimit(tc.limit)) + }) + } +} + +func TestLimitedIter_StopsAtLimit(t *testing.T) { + t.Parallel() + + pid := peer.ID("p") + inner := newMockResultIter([]iter.Result[types.Record]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid}}, + {Val: &types.PeerRecord{Schema: "peer", ID: &pid}}, + {Val: &types.PeerRecord{Schema: "peer", ID: &pid}}, + {Val: &types.PeerRecord{Schema: "peer", ID: &pid}}, + {Val: &types.PeerRecord{Schema: "peer", ID: &pid}}, + }) + + limited := newLimitedIter(inner, 2) + results, err := iter.ReadAllResults(limited) + require.NoError(t, err) + require.Len(t, results, 2) + require.NoError(t, limited.Close()) +} + +func TestLimitedIter_ErrorsDoNotCountTowardLimit(t *testing.T) { + t.Parallel() + + pid := peer.ID("p") + inner := newMockResultIter([]iter.Result[types.Record]{ + {Err: errors.New("transient error")}, + {Val: &types.PeerRecord{Schema: "peer", ID: &pid}}, + {Err: errors.New("another transient error")}, + {Val: &types.PeerRecord{Schema: "peer", ID: &pid}}, + {Val: &types.PeerRecord{Schema: "peer", ID: &pid}}, + }) + + limited := newLimitedIter(inner, 2) + var values, errs int + for limited.Next() { + if limited.Val().Err != nil { + errs++ + } else { + values++ + } + } + require.Equal(t, 2, values, "limit should stop at 2 successful values") + require.Equal(t, 2, errs, "error results before the cap should still be observed") +} + +func TestCachedRouter_FindProviders_OverFetchesToAbsorbDrops(t *testing.T) { + t.Parallel() + + ctx := context.Background() + c := makeCID() + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + + // 12 records: 4 with addresses, 8 without (uncached, so + // cacheFallbackIter drops them). The caller asks for limit=4; the + // over-fetch requests 4*multiplier=12, and the wrapper surfaces 4. + records := make([]iter.Result[types.Record], 0, 12) + for i := range 4 { + pid := peer.ID(fmt.Sprintf("with-addrs-%d", i)) + records = append(records, iter.Result[types.Record]{ + Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: []types.Multiaddr{publicAddr}}, + }) + } + for i := range 8 { + pid := peer.ID(fmt.Sprintf("no-addrs-%d", i)) + records = append(records, iter.Result[types.Record]{ + Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil}, + }) + } + + mr := &mockRouter{} + mr.On("FindProviders", mock.Anything, c, overfetchLimit(4)).Return(newMockResultIter(records), nil) + // cacheFallbackIter dispatches background FindPeers for addr-less + // records. Stub it as "no addrs found" so it does not surface them. + mr.On("FindPeers", mock.Anything, mock.Anything, 1).Maybe().Return( + newMockResultIter([]iter.Result[*types.PeerRecord]{}), nil, + ) + + cab, err := newCachedAddrBook() + require.NoError(t, err) + cr := NewCachedRouter(mr, cab) + + it, err := cr.FindProviders(ctx, c, 4) + require.NoError(t, err) + + results, err := iter.ReadAllResults(it) + require.NoError(t, err) + require.Len(t, results, 4, "over-fetch should keep producing until limit reached or source exhausted") + mr.AssertExpectations(t) +} + +func TestCachedRouter_FindProviders_OverFetchCapApplies(t *testing.T) { + t.Parallel() + + ctx := context.Background() + c := makeCID() + + // Pick a caller limit large enough that limit*multiplier exceeds the + // cap, so the assertion proves clamping rather than the boundary case. + const callerLimit = cacheFallbackOverfetchMax + require.Greater(t, callerLimit*cacheFallbackOverfetchMultiplier, cacheFallbackOverfetchMax, + "test precondition: limit*multiplier must exceed the cap") + + mr := &mockRouter{} + mr.On("FindProviders", mock.Anything, c, cacheFallbackOverfetchMax).Return( + newMockResultIter([]iter.Result[types.Record]{}), nil, + ) + cab, err := newCachedAddrBook() + require.NoError(t, err) + cr := NewCachedRouter(mr, cab) + + it, err := cr.FindProviders(ctx, c, callerLimit) + require.NoError(t, err) + _, err = iter.ReadAllResults(it) + require.NoError(t, err) + mr.AssertExpectations(t) +} + +func TestCachedRouter_FindProviders_UnboundedLimitPassesZero(t *testing.T) { + t.Parallel() + + ctx := context.Background() + c := makeCID() + + mr := &mockRouter{} + // limit == 0 means unbounded; cachedRouter must pass 0 through, not multiply it. + mr.On("FindProviders", mock.Anything, c, 0).Return( + newMockResultIter([]iter.Result[types.Record]{}), nil, + ) + cab, err := newCachedAddrBook() + require.NoError(t, err) + cr := NewCachedRouter(mr, cab) + + it, err := cr.FindProviders(ctx, c, 0) + require.NoError(t, err) + _, err = iter.ReadAllResults(it) + require.NoError(t, err) + mr.AssertExpectations(t) +} diff --git a/server_delegated_routing_test.go b/server_delegated_routing_test.go index 78acd08..3f716fc 100644 --- a/server_delegated_routing_test.go +++ b/server_delegated_routing_test.go @@ -1,8 +1,22 @@ package main import ( + "bufio" + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" "testing" + "github.com/ipfs/boxo/ipns" + "github.com/ipfs/boxo/routing/http/server" + "github.com/ipfs/boxo/routing/http/types" + "github.com/ipfs/boxo/routing/http/types/iter" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -114,3 +128,132 @@ func TestCollectEndpoints(t *testing.T) { assert.Empty(t, endpoints) }) } + +// providersRouterFunc adapts a function to the router interface, returning +// records only for FindProviders so other methods of composableRouter do +// not need a full mock for these focused tests. +type providersRouterFunc func(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) + +func (f providersRouterFunc) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { + return f(ctx, key, limit) +} +func (f providersRouterFunc) FindPeers(context.Context, peer.ID, int) (iter.ResultIter[*types.PeerRecord], error) { + return nil, fmt.Errorf("not implemented") +} +func (f providersRouterFunc) GetClosestPeers(context.Context, cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + return nil, fmt.Errorf("not implemented") +} +func (f providersRouterFunc) GetIPNS(context.Context, ipns.Name) (*ipns.Record, error) { + return nil, fmt.Errorf("not implemented") +} +func (f providersRouterFunc) PutIPNS(context.Context, ipns.Name, *ipns.Record) error { + return fmt.Errorf("not implemented") +} + +func TestProvidersLimitsHonorSpecCap(t *testing.T) { + t.Parallel() + + // Supply more records than either cap so both the JSON and NDJSON + // assertions prove their cap is the binding limit, not the mock size. + supplied := DefaultStreamingRecordsLimit + 500 + require.Greater(t, supplied, DefaultRecordsLimit, "mock must exceed both caps") + makeRecords := func(t *testing.T) []iter.Result[types.Record] { + recs := make([]iter.Result[types.Record], 0, supplied) + for i := range supplied { + _, p := makeEd25519PeerID(t) + ma, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/10.0.0.%d/tcp/4001", (i%254)+1)) + require.NoError(t, err) + recs = append(recs, iter.Result[types.Record]{ + Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &p, + Addrs: []types.Multiaddr{{Multiaddr: ma}}, + }, + }) + } + return recs + } + + // Mock router honors the limit like a real DHT router: it stops + // emitting records after producing `limit` of them. The boxo server + // passes the configured limit down as a hint, which is what the spec + // asks the server to do. + makeHandler := func(records []iter.Result[types.Record], gotLimit *int) http.Handler { + var providers router = providersRouterFunc(func(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { + if gotLimit != nil { + *gotLimit = limit + } + if limit > 0 && limit < len(records) { + return iter.FromSlice(records[:limit]), nil + } + return iter.FromSlice(records), nil + }) + return server.Handler( + &composableRouter{providers: providers}, + server.WithRecordsLimit(DefaultRecordsLimit), + server.WithStreamingRecordsLimit(DefaultStreamingRecordsLimit), + ) + } + + // Use a real CID so the path parsing inside the boxo server accepts it. + c, err := cid.Decode("bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi") + require.NoError(t, err) + + t.Run("JSON honors spec cap of 100", func(t *testing.T) { + t.Parallel() + var gotLimit int + srv := httptest.NewServer(makeHandler(makeRecords(t), &gotLimit)) + t.Cleanup(srv.Close) + + req, err := http.NewRequest(http.MethodGet, srv.URL+"/routing/v1/providers/"+c.String(), nil) + require.NoError(t, err) + req.Header.Set("Accept", "application/json") + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + t.Cleanup(func() { _ = resp.Body.Close() }) + + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, "application/json", resp.Header.Get("Content-Type")) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + // Count records by their opening token. The boxo JSON serializer + // emits each provider record as `{"Addrs": ...}`. + got := strings.Count(string(body), `{"Addrs":`) + require.Equal(t, DefaultRecordsLimit, got, + "JSON path should cap at DefaultRecordsLimit (HTTP routing v1 spec section 4.1.5)") + require.Equal(t, DefaultRecordsLimit, gotLimit, + "the limit hint passed to the underlying router must be DefaultRecordsLimit") + }) + + t.Run("NDJSON caps at DefaultStreamingRecordsLimit", func(t *testing.T) { + t.Parallel() + var gotLimit int + srv := httptest.NewServer(makeHandler(makeRecords(t), &gotLimit)) + t.Cleanup(srv.Close) + + req, err := http.NewRequest(http.MethodGet, srv.URL+"/routing/v1/providers/"+c.String(), nil) + require.NoError(t, err) + req.Header.Set("Accept", "application/x-ndjson") + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + t.Cleanup(func() { _ = resp.Body.Close() }) + + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, "application/x-ndjson", resp.Header.Get("Content-Type")) + + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + var count int + for scanner.Scan() { + if strings.TrimSpace(scanner.Text()) != "" { + count++ + } + } + require.NoError(t, scanner.Err()) + require.Equal(t, DefaultStreamingRecordsLimit, count, + "NDJSON should cap at DefaultStreamingRecordsLimit when more records are available") + require.Equal(t, DefaultStreamingRecordsLimit, gotLimit, + "the limit hint passed to the underlying router must be DefaultStreamingRecordsLimit") + }) +}