Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@ The following emojis are used to highlight certain changes:

### Added

- `--records-limit` / `SOMEGUY_RECORDS_LIMIT` (default `100`) caps results for `Accept: application/json`, matching the SHOULD-cap in [HTTP Routing v1 §4.1.5](https://specs.ipfs.tech/routing/http-routing-v1/).
- `--streaming-records-limit` / `SOMEGUY_STREAMING_RECORDS_LIMIT` (default `1000`) caps results for `Accept: application/x-ndjson`. Set to `0` to disable the cap.

### Changed

- ✨ JSON responses return up to 100 providers (previously 20). NDJSON streams up to 1000 results (previously unbounded). Both caps are tunable via the new flags.
- `cachedRouter.FindProviders` now over-fetches by 3x (cap 3000) so that, after `cacheFallbackIter` drops records without addresses, the surviving count is close to the caller's limit. Before, the upstream walk's hard ceiling pre-filtered records and JSON callers received far fewer providers than they asked for.

### Removed

### Fixed
Expand Down
14 changes: 14 additions & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ The environment variables below override `someguy`'s built-in defaults.
- [`SOMEGUY_CACHED_ADDR_BOOK`](#someguy_cached_addr_book)
- [`SOMEGUY_CACHED_ADDR_BOOK_RECENT_TTL`](#someguy_cached_addr_book_recent_ttl)
- [`SOMEGUY_CACHED_ADDR_BOOK_ACTIVE_PROBING`](#someguy_cached_addr_book_active_probing)
- [`SOMEGUY_RECORDS_LIMIT`](#someguy_records_limit)
- [`SOMEGUY_STREAMING_RECORDS_LIMIT`](#someguy_streaming_records_limit)
- [`SOMEGUY_PROVIDER_ENDPOINTS`](#someguy_provider_endpoints)
- [`SOMEGUY_PEER_ENDPOINTS`](#someguy_peer_endpoints)
- [`SOMEGUY_IPNS_ENDPOINTS`](#someguy_ipns_endpoints)
Expand Down Expand Up @@ -63,6 +65,18 @@ Enables active probing of cached peers to keep their multiaddrs up to date. Appl

Default: `true`

### `SOMEGUY_RECORDS_LIMIT`

Maximum providers or peers returned per `Accept: application/json` request. [HTTP Routing v1 §4.1.5](https://specs.ipfs.tech/routing/http-routing-v1/) recommends `100`. Set to `0` to disable the cap.

Default: `100`

### `SOMEGUY_STREAMING_RECORDS_LIMIT`

Maximum providers or peers returned per `Accept: application/x-ndjson` request. Sits above `SOMEGUY_RECORDS_LIMIT` so streaming returns more results, while bounding per-connection cost (DHT walk, addr-book probing, deduplication). Set to `0` to disable the cap.

Default: `1000`

### `SOMEGUY_PROVIDER_ENDPOINTS`

Comma-separated list of [Delegated Routing V1](https://specs.ipfs.tech/routing/http-routing-v1/) endpoints for provider lookups.
Expand Down
22 changes: 22 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ func main() {
EnvVars: []string{"SOMEGUY_CACHED_ADDR_BOOK_RECENT_TTL"},
Usage: "TTL for recently connected peers' multiaddrs in the cached address book",
},
&cli.IntFlag{
Name: "records-limit",
Value: DefaultRecordsLimit,
EnvVars: []string{"SOMEGUY_RECORDS_LIMIT"},
Usage: "maximum providers or peers per `Accept: application/json` request (HTTP Routing v1 section 4.1.5 recommends 100; 0 disables the cap)",
},
&cli.IntFlag{
Name: "streaming-records-limit",
Value: DefaultStreamingRecordsLimit,
EnvVars: []string{"SOMEGUY_STREAMING_RECORDS_LIMIT"},
Usage: "maximum providers or peers per `Accept: application/x-ndjson` request (0 disables the cap)",
},
&cli.StringSliceFlag{
Name: "provider-endpoints",
Value: cli.NewStringSlice(autoconf.AutoPlaceholder),
Expand Down Expand Up @@ -179,12 +191,22 @@ func main() {
},
},
Action: func(ctx *cli.Context) error {
recordsLimit := ctx.Int("records-limit")
if recordsLimit < 0 {
return fmt.Errorf("records-limit must be non-negative, got %d (0 means unbounded)", recordsLimit)
}
streamingRecordsLimit := ctx.Int("streaming-records-limit")
if streamingRecordsLimit < 0 {
return fmt.Errorf("streaming-records-limit must be non-negative, got %d (0 means unbounded)", streamingRecordsLimit)
}
cfg := &config{
listenAddress: ctx.String("listen-address"),
dhtType: ctx.String("dht"),
cachedAddrBook: ctx.Bool("cached-addr-book"),
cachedAddrBookActiveProbing: ctx.Bool("cached-addr-book-active-probing"),
cachedAddrBookRecentTTL: ctx.Duration("cached-addr-book-recent-ttl"),
recordsLimit: recordsLimit,
streamingRecordsLimit: streamingRecordsLimit,

contentEndpoints: ctx.StringSlice("provider-endpoints"),
peerEndpoints: ctx.StringSlice("peer-endpoints"),
Expand Down
17 changes: 17 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,27 @@ func withRequestLogger(next http.Handler) http.Handler {
})
}

const (
// DefaultRecordsLimit caps results for `Accept: application/json`
// requests. Matches the SHOULD-cap from HTTP Routing v1 section 4.1.5.
DefaultRecordsLimit = 100
// DefaultStreamingRecordsLimit caps results for `Accept:
// application/x-ndjson` requests. Sits above the JSON cap so streaming
// returns "more results" per HTTP Routing v1 section 4.1.5, while
// bounding per-connection cost: DHT walk, addr-book probing, and
// peer-record deduplication. Set SOMEGUY_STREAMING_RECORDS_LIMIT=0 to
// disable the cap.
DefaultStreamingRecordsLimit = 1000
)

type config struct {
listenAddress string
dhtType string
cachedAddrBook bool
cachedAddrBookActiveProbing bool
cachedAddrBookRecentTTL time.Duration
recordsLimit int
streamingRecordsLimit int

contentEndpoints []string
peerEndpoints []string
Expand Down Expand Up @@ -223,6 +238,8 @@ func start(ctx context.Context, cfg *config) error {

handlerOpts := []server.Option{
server.WithPrometheusRegistry(prometheus.DefaultRegisterer),
server.WithRecordsLimit(cfg.recordsLimit),
server.WithStreamingRecordsLimit(cfg.streamingRecordsLimit),
}

handler := server.Handler(&composableRouter{
Expand Down
72 changes: 69 additions & 3 deletions server_cached_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ const (
addrQueryOriginUnknown = "unknown"

DispatchedFindPeersTimeout = time.Minute

// cacheFallbackOverfetchMultiplier sizes the over-fetch from the
// underlying router so that, after [cacheFallbackIter] drops records
// without multiaddrs, enough survive to meet the caller's limit.
// Empirically derived: at delegated-ipfs.dev, a JSON limit of 20
// produced about 4 surfaced results, so roughly 1 in 3 records
// reaches the client.
cacheFallbackOverfetchMultiplier = 3
// cacheFallbackOverfetchMax caps the over-fetched limit so a large
// caller-side limit cannot blow up the DHT walk. Sized at 3x
// DefaultStreamingRecordsLimit so the multiplier applies on the
// streaming path too. The routing timeout bounds wall-clock.
cacheFallbackOverfetchMax = 3000
)

// cachedRouter wraps a router with the cachedAddrBook to retrieve cached addresses for peers without multiaddrs in FindProviders
Expand All @@ -60,13 +73,32 @@ func NewCachedRouter(router router, cab *cachedAddrBook) cachedRouter {
}

func (r cachedRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) {
it, err := r.router.FindProviders(ctx, key, limit)
// Over-fetch from the underlying router: cacheFallbackIter drops
// records whose Addrs are empty and uncached. Without over-fetching,
// the source iterator's hard ceiling pre-filters records and the
// post-filter count falls short of `limit`.
overfetch := overfetchLimit(limit)
it, err := r.router.FindProviders(ctx, key, overfetch)
if err != nil {
return nil, err
}

iter := NewCacheFallbackIter(it, r, ctx, addrQueryOriginProviders)
return iter, nil
fallback := NewCacheFallbackIter(it, r, ctx, addrQueryOriginProviders)
if limit <= 0 {
return fallback, nil
}
return newLimitedIter(fallback, limit), nil
}

// overfetchLimit returns the count to pass to the underlying router so
// that, after cacheFallbackIter drops records without addresses, up to
// `limit` results reach the caller. A non-positive `limit` means
// unbounded and returns 0, matching the boxo convention.
func overfetchLimit(limit int) int {
if limit <= 0 {
return 0
}
return min(limit*cacheFallbackOverfetchMultiplier, cacheFallbackOverfetchMax)
}

// FindPeers uses a simpler approach than FindProviders because we're dealing with a single PeerRecord, and there's
Expand Down Expand Up @@ -304,3 +336,37 @@ func (it *cacheFallbackIter) dispatchFindPeer(record types.PeerRecord) {
sendResult(record) // pass back the record with no addrs
}
}

// limitedIter caps an [iter.ResultIter] at the first `limit` successful
// (non-error) values. Errors pass through and do not count toward the
// limit. Close cascades to the wrapped iterator.
type limitedIter[T any] struct {
inner iter.ResultIter[T]
limit int
count int
}

func newLimitedIter[T any](inner iter.ResultIter[T], limit int) *limitedIter[T] {
return &limitedIter[T]{inner: inner, limit: limit}
}

func (l *limitedIter[T]) Next() bool {
if l.count >= l.limit {
return false
}
if !l.inner.Next() {
return false
}
if l.inner.Val().Err == nil {
l.count++
}
return true
}

func (l *limitedIter[T]) Val() iter.Result[T] {
return l.inner.Val()
}

func (l *limitedIter[T]) Close() error {
return l.inner.Close()
}
165 changes: 164 additions & 1 deletion server_cached_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -62,7 +64,8 @@ func TestCachedRouter(t *testing.T) {
mockIter := newMockResultIter([]iter.Result[types.Record]{
{Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil}},
})
mr.On("FindProviders", mock.Anything, c, 10).Return(mockIter, nil)
// cachedRouter over-fetches by cacheFallbackOverfetchMultiplier to absorb addr-less drops.
mr.On("FindProviders", mock.Anything, c, overfetchLimit(10)).Return(mockIter, nil)

// Create cached address book with test addresses
cab, err := newCachedAddrBook()
Expand Down Expand Up @@ -489,3 +492,163 @@ func TestCacheFallbackIter(t *testing.T) {
})

}

func TestOverfetchLimit(t *testing.T) {
t.Parallel()

tests := []struct {
name string
limit int
want int
}{
{"zero (unbounded) stays zero", 0, 0},
{"negative treated as unbounded", -1, 0},
{"small limit multiplied", 1, cacheFallbackOverfetchMultiplier},
{"spec-default of 100 -> 300", 100, 300},
{"limit beyond cap is clamped", 10000, cacheFallbackOverfetchMax},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.want, overfetchLimit(tc.limit))
})
}
}

func TestLimitedIter_StopsAtLimit(t *testing.T) {
t.Parallel()

pid := peer.ID("p")
inner := newMockResultIter([]iter.Result[types.Record]{
{Val: &types.PeerRecord{Schema: "peer", ID: &pid}},
{Val: &types.PeerRecord{Schema: "peer", ID: &pid}},
{Val: &types.PeerRecord{Schema: "peer", ID: &pid}},
{Val: &types.PeerRecord{Schema: "peer", ID: &pid}},
{Val: &types.PeerRecord{Schema: "peer", ID: &pid}},
})

limited := newLimitedIter(inner, 2)
results, err := iter.ReadAllResults(limited)
require.NoError(t, err)
require.Len(t, results, 2)
require.NoError(t, limited.Close())
}

func TestLimitedIter_ErrorsDoNotCountTowardLimit(t *testing.T) {
t.Parallel()

pid := peer.ID("p")
inner := newMockResultIter([]iter.Result[types.Record]{
{Err: errors.New("transient error")},
{Val: &types.PeerRecord{Schema: "peer", ID: &pid}},
{Err: errors.New("another transient error")},
{Val: &types.PeerRecord{Schema: "peer", ID: &pid}},
{Val: &types.PeerRecord{Schema: "peer", ID: &pid}},
})

limited := newLimitedIter(inner, 2)
var values, errs int
for limited.Next() {
if limited.Val().Err != nil {
errs++
} else {
values++
}
}
require.Equal(t, 2, values, "limit should stop at 2 successful values")
require.Equal(t, 2, errs, "error results before the cap should still be observed")
}

func TestCachedRouter_FindProviders_OverFetchesToAbsorbDrops(t *testing.T) {
t.Parallel()

ctx := context.Background()
c := makeCID()
publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001")

// 12 records: 4 with addresses, 8 without (uncached, so
// cacheFallbackIter drops them). The caller asks for limit=4; the
// over-fetch requests 4*multiplier=12, and the wrapper surfaces 4.
records := make([]iter.Result[types.Record], 0, 12)
for i := range 4 {
pid := peer.ID(fmt.Sprintf("with-addrs-%d", i))
records = append(records, iter.Result[types.Record]{
Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: []types.Multiaddr{publicAddr}},
})
}
for i := range 8 {
pid := peer.ID(fmt.Sprintf("no-addrs-%d", i))
records = append(records, iter.Result[types.Record]{
Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil},
})
}

mr := &mockRouter{}
mr.On("FindProviders", mock.Anything, c, overfetchLimit(4)).Return(newMockResultIter(records), nil)
// cacheFallbackIter dispatches background FindPeers for addr-less
// records. Stub it as "no addrs found" so it does not surface them.
mr.On("FindPeers", mock.Anything, mock.Anything, 1).Maybe().Return(
newMockResultIter([]iter.Result[*types.PeerRecord]{}), nil,
)

cab, err := newCachedAddrBook()
require.NoError(t, err)
cr := NewCachedRouter(mr, cab)

it, err := cr.FindProviders(ctx, c, 4)
require.NoError(t, err)

results, err := iter.ReadAllResults(it)
require.NoError(t, err)
require.Len(t, results, 4, "over-fetch should keep producing until limit reached or source exhausted")
mr.AssertExpectations(t)
}

func TestCachedRouter_FindProviders_OverFetchCapApplies(t *testing.T) {
t.Parallel()

ctx := context.Background()
c := makeCID()

// Pick a caller limit large enough that limit*multiplier exceeds the
// cap, so the assertion proves clamping rather than the boundary case.
const callerLimit = cacheFallbackOverfetchMax
require.Greater(t, callerLimit*cacheFallbackOverfetchMultiplier, cacheFallbackOverfetchMax,
"test precondition: limit*multiplier must exceed the cap")

mr := &mockRouter{}
mr.On("FindProviders", mock.Anything, c, cacheFallbackOverfetchMax).Return(
newMockResultIter([]iter.Result[types.Record]{}), nil,
)
cab, err := newCachedAddrBook()
require.NoError(t, err)
cr := NewCachedRouter(mr, cab)

it, err := cr.FindProviders(ctx, c, callerLimit)
require.NoError(t, err)
_, err = iter.ReadAllResults(it)
require.NoError(t, err)
mr.AssertExpectations(t)
}

func TestCachedRouter_FindProviders_UnboundedLimitPassesZero(t *testing.T) {
t.Parallel()

ctx := context.Background()
c := makeCID()

mr := &mockRouter{}
// limit == 0 means unbounded; cachedRouter must pass 0 through, not multiply it.
mr.On("FindProviders", mock.Anything, c, 0).Return(
newMockResultIter([]iter.Result[types.Record]{}), nil,
)
cab, err := newCachedAddrBook()
require.NoError(t, err)
cr := NewCachedRouter(mr, cab)

it, err := cr.FindProviders(ctx, c, 0)
require.NoError(t, err)
_, err = iter.ReadAllResults(it)
require.NoError(t, err)
mr.AssertExpectations(t)
}
Loading
Loading