From 3cdde219473f9b33e389034e5e8ca56b90509427 Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Fri, 24 Apr 2026 14:30:57 +0200 Subject: [PATCH 1/4] feat(pinner): add Close with ErrClosed lifecycle Pinner gains Close() error. Close waits for every in-flight operation, including streaming goroutines from RecursiveKeys, DirectKeys, and InternalPins, to finish. After Close, every other Pinner method fails fast with a new ErrClosed sentinel; streaming methods surface it as the Err of a single entry on the returned channel, which is then closed. dspinner gains the matching implementation: Close is idempotent, admission is serialised with sync.Mutex + sync.WaitGroup, and stream sends select on a shutdown channel so a parked consumer cannot stall Close. The panic-recovery and context guards added in #1146 stay in place as defence in depth for hosts that do not wire Close correctly. Hosts that own the backing datastore (e.g. kubo) should call Close on the pinner before closing the datastore to avoid use-after-close panics in pebble and similar stores. --- CHANGELOG.md | 3 + pinning/pinner/dspinner/pin.go | 135 +++++++++++++++++-- pinning/pinner/dspinner/pin_test.go | 198 ++++++++++++++++++++++++++++ pinning/pinner/pin.go | 18 +++ 4 files changed, 344 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a3ec532ac..085c73e36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,9 @@ The following emojis are used to highlight certain changes: ### Added +- 🛠 `pinning/pinner`: `Pinner` now has a `Close() error` method. Close waits for every in-flight operation, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, to return before unblocking. After Close, every other method fails fast with the new `ErrClosed` sentinel; streaming methods surface it as the `Err` field of a single entry on the returned channel. Close is idempotent and safe to call from any goroutine. Downstream implementations of `Pinner` must add a `Close` method. +- `pinning/pinner/dspinner`: implements the new `Close`. Stream goroutines now also select on the pinner shutdown signal when they send, so Close never stalls on a parked consumer. Hosts that own the backing datastore should call `Close` on the pinner before closing the datastore to avoid the panic-on-use-after-close path in datastores such as pebble. + ### Changed ### Removed diff --git a/pinning/pinner/dspinner/pin.go b/pinning/pinner/dspinner/pin.go index 3dc44863d..22ddb56f5 100644 --- a/pinning/pinner/dspinner/pin.go +++ b/pinning/pinner/dspinner/pin.go @@ -100,6 +100,15 @@ type pinner struct { rootsProvider provider.MultihashProvider pinnedProvider provider.MultihashProvider + + // Lifecycle state. closedMu serialises the isClosed check with + // wg.Add so Close reliably waits for every admitted operation. + // done is closed by Close and unblocks streaming goroutines parked + // on a send. + closedMu sync.Mutex + isClosed bool + done chan struct{} + wg sync.WaitGroup } var _ ipfspinner.Pinner = (*pinner)(nil) @@ -166,6 +175,7 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts . nameIndex: dsindex.New(dstore, ds.NewKey(pinNameIndexPath)), dserv: dserv, dstore: dstore, + done: make(chan struct{}), } for _, o := range opts { @@ -191,9 +201,56 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts . return p, nil } +// begin admits a caller. It returns [ipfspinner.ErrClosed] if the +// pinner has been closed; otherwise it increments the in-flight +// counter so that Close waits for the caller to finish. Successful +// callers MUST pair begin with a deferred p.wg.Done. +func (p *pinner) begin() error { + p.closedMu.Lock() + defer p.closedMu.Unlock() + if p.isClosed { + return ipfspinner.ErrClosed + } + p.wg.Add(1) + return nil +} + +// errClosedChan returns a pre-filled buffered channel carrying a single +// [ipfspinner.StreamedPin] with err, followed by close. Buffered so +// callers that never read the channel and never cancel their context +// do not leak the send goroutine. +func errClosedChan(err error) <-chan ipfspinner.StreamedPin { + out := make(chan ipfspinner.StreamedPin, 1) + out <- ipfspinner.StreamedPin{Err: err} + close(out) + return out +} + +// Close releases resources held by the pinner and blocks until every +// admitted operation has returned. Close does not close the backing +// datastore. After Close returns, every other method fails fast with +// [ipfspinner.ErrClosed]. Close is idempotent. +func (p *pinner) Close() error { + p.closedMu.Lock() + if p.isClosed { + p.closedMu.Unlock() + return nil + } + p.isClosed = true + close(p.done) + p.closedMu.Unlock() + + p.wg.Wait() + return nil +} + // SetAutosync allows auto-syncing to be enabled or disabled during runtime. // This may be used to turn off autosync before doing many repeated pinning // operations, and then turn it on after. Returns the previous value. +// +// SetAutosync is not part of the [ipfspinner.Pinner] interface and is +// not gated by Close: it mutates an in-memory flag only, never touches +// the datastore, and so is safe to call on a closed pinner. func (p *pinner) SetAutosync(auto bool) bool { p.lock.Lock() defer p.lock.Unlock() @@ -204,6 +261,11 @@ func (p *pinner) SetAutosync(auto bool) bool { // Pin the given node, optionally recursive func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool, name string) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + err := p.dserv.Add(ctx, node) if err != nil { return err @@ -440,6 +502,11 @@ func (p *pinner) removePin(ctx context.Context, pp *pin) error { // Unpin a given key func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + cidKey := c.KeyString() p.lock.Lock() @@ -485,6 +552,11 @@ func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error { // IsPinned returns whether or not the given key is pinned // and an explanation of why its pinned func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) { + if err := p.begin(); err != nil { + return "", false, err + } + defer p.wg.Done() + p.lock.RLock() defer p.lock.RUnlock() return p.isPinnedWithType(ctx, c, ipfspinner.Any) @@ -493,6 +565,11 @@ func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) // IsPinnedWithType returns whether or not the given cid is pinned with the // given pin type, as well as returning the type of pin its pinned with. func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) { + if err := p.begin(); err != nil { + return "", false, err + } + defer p.wg.Done() + p.lock.RLock() defer p.lock.RUnlock() return p.isPinnedWithType(ctx, c, mode) @@ -606,6 +683,11 @@ func (p *pinner) loadPinName(ctx context.Context, pin *ipfspinner.Pinned, pinID // CheckIfPinnedWithType implements the Pinner interface, checking specific pin types. // This method is optimized to only check the requested pin type(s). func (p *pinner) CheckIfPinnedWithType(ctx context.Context, mode ipfspinner.Mode, includeNames bool, cids ...cid.Cid) ([]ipfspinner.Pinned, error) { + if err := p.begin(); err != nil { + return nil, err + } + defer p.wg.Done() + p.lock.RLock() defer p.lock.RUnlock() @@ -944,29 +1026,38 @@ func (p *pinner) snapshotIndex(ctx context.Context, index dsindex.Indexer) ([]in } func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin { + if err := p.begin(); err != nil { + return errClosedChan(err) + } + out := make(chan ipfspinner.StreamedPin) go func() { + defer p.wg.Done() defer close(out) + // send delivers sp and reports whether the consumer is still + // listening. A closed pinner unblocks send via p.done so that + // Close can wait on wg without stalling on a parked send. send := func(sp ipfspinner.StreamedPin) (ok bool) { select { case <-ctx.Done(): return false + case <-p.done: + return false case out <- sp: return true } } - // If the backing datastore panics during enumeration, - // recover and surface the panic as an error on the output - // channel instead of crashing the process. This is - // datastore-implementation agnostic: any datastore may - // panic on use after Close (pebble being the prominent - // case), and the pinner does not own the datastore's - // lifecycle. The wording below gives the caller enough - // context to treat it as an expected shutdown-time - // interruption rather than a real failure. + // Defense in depth: if the backing datastore panics during + // enumeration, recover and surface the panic as an error on + // the output channel. Close on this pinner runs before the + // datastore is closed by the caller's lifecycle, so this path + // should no longer fire in practice. It remains here for + // callers that do not wire Close correctly; any datastore may + // panic on use after Close (pebble being the prominent case), + // and the pinner does not own the datastore's lifecycle. defer func() { if r := recover(); r != nil { send(ipfspinner.StreamedPin{Err: fmt.Errorf("pin stream interrupted by datastore panic (likely shutdown): %v", r)}) @@ -1020,8 +1111,17 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile } // InternalPins returns all cids kept pinned for the internal state of the -// pinner +// pinner. dspinner does not keep internal pins, so the returned channel +// is always empty; it carries a single [ipfspinner.ErrClosed] entry if +// Close has been called. func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspinner.StreamedPin { + if err := p.begin(); err != nil { + return errClosedChan(err) + } + // Not tracked by p.wg: the channel is closed synchronously before + // we return, so there is no background work for Close to wait on. + defer p.wg.Done() + c := make(chan ipfspinner.StreamedPin) close(c) return c @@ -1032,6 +1132,11 @@ func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspin // // TODO: This will not work when multiple pins are supported func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + p.lock.Lock() defer p.lock.Unlock() @@ -1112,6 +1217,11 @@ func (p *pinner) flushPins(ctx context.Context, force bool) error { // Flush encodes and writes pinner keysets to the datastore func (p *pinner) Flush(ctx context.Context) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + p.lock.Lock() defer p.lock.Unlock() @@ -1126,6 +1236,11 @@ func (p *pinner) Flush(ctx context.Context) error { // PinWithMode allows the user to have fine grained control over pin // counts func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, name string) error { + if err := p.begin(); err != nil { + return err + } + defer p.wg.Done() + // TODO: remove his to support multiple pins per CID switch mode { case ipfspinner.Recursive: diff --git a/pinning/pinner/dspinner/pin_test.go b/pinning/pinner/dspinner/pin_test.go index b7a254657..df4d8acb2 100644 --- a/pinning/pinner/dspinner/pin_test.go +++ b/pinning/pinner/dspinner/pin_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "path" + "sync" "sync/atomic" "testing" "testing/synctest" @@ -1590,3 +1591,200 @@ func TestStreamIndexDoesNotBlockWriters(t *testing.T) { }) } } + +// TestCloseIdempotent asserts Close can be called repeatedly and from +// multiple goroutines without error or panic. +func TestCloseIdempotent(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + require.NoError(t, p.Close()) + require.NoError(t, p.Close()) + + var wg sync.WaitGroup + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, p.Close()) + }() + } + wg.Wait() +} + +// TestCloseErrClosedAllMethods asserts every public Pinner method +// returns ErrClosed after Close has returned. Streaming methods +// surface ErrClosed as StreamedPin.Err on the first (and only) +// channel entry. +func TestCloseErrClosedAllMethods(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + // Seed one pin so the index is not empty; Close still succeeds + // but we want post-close streaming methods to emit ErrClosed + // rather than an empty stream, which would be indistinguishable + // from "no pins". + n, k := randNode() + require.NoError(t, dserv.Add(ctx, n)) + require.NoError(t, p.Pin(ctx, n, true, "")) + + require.NoError(t, p.Close()) + + // Scalar methods. + require.ErrorIs(t, p.Pin(ctx, n, true, ""), ipfspin.ErrClosed) + require.ErrorIs(t, p.Unpin(ctx, k, true), ipfspin.ErrClosed) + require.ErrorIs(t, p.Update(ctx, k, k, true), ipfspin.ErrClosed) + require.ErrorIs(t, p.Flush(ctx), ipfspin.ErrClosed) + require.ErrorIs(t, p.PinWithMode(ctx, k, ipfspin.Recursive, ""), ipfspin.ErrClosed) + + _, _, err = p.IsPinned(ctx, k) + require.ErrorIs(t, err, ipfspin.ErrClosed) + _, _, err = p.IsPinnedWithType(ctx, k, ipfspin.Recursive) + require.ErrorIs(t, err, ipfspin.ErrClosed) + + _, err = p.CheckIfPinned(ctx, k) + require.ErrorIs(t, err, ipfspin.ErrClosed) + _, err = p.CheckIfPinnedWithType(ctx, ipfspin.Recursive, false, k) + require.ErrorIs(t, err, ipfspin.ErrClosed) + + // Streaming methods. + assertStreamedErrClosed := func(t *testing.T, name string, ch <-chan ipfspin.StreamedPin) { + t.Helper() + got, ok := <-ch + require.True(t, ok, "%s: expected one entry before close, channel already closed", name) + require.ErrorIs(t, got.Err, ipfspin.ErrClosed, "%s: want ErrClosed on first entry", name) + _, ok = <-ch + require.False(t, ok, "%s: channel must be closed after ErrClosed entry", name) + } + assertStreamedErrClosed(t, "RecursiveKeys", p.RecursiveKeys(ctx, false)) + assertStreamedErrClosed(t, "DirectKeys", p.DirectKeys(ctx, false)) + assertStreamedErrClosed(t, "InternalPins", p.InternalPins(ctx, false)) +} + +// TestCloseWaitsForInFlightOperation asserts Close blocks until an +// operation admitted before Close was called has finished. We hold +// the pinner's write lock from the test to stall an in-flight Pin +// past its begin() admission, then verify Close does not return +// while the Pin is still running. +func TestCloseWaitsForInFlightOperation(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + n, _ := randNode() + require.NoError(t, dserv.Add(ctx, n)) + + // Acquire the index read lock so Pin's internal Lock() call + // blocks after begin() has already incremented wg. + p.lock.RLock() + + pinStarted := make(chan struct{}) + pinReturned := make(chan error, 1) + go func() { + close(pinStarted) + pinReturned <- p.Pin(ctx, n, true, "") + }() + <-pinStarted + // Give Pin time to pass begin() and block on p.lock.Lock(). + time.Sleep(20 * time.Millisecond) + + closeReturned := make(chan error, 1) + go func() { + closeReturned <- p.Close() + }() + + select { + case <-closeReturned: + t.Fatal("Close returned while Pin was still in flight") + case <-time.After(50 * time.Millisecond): + } + + // Releasing the RLock lets Pin acquire Lock and finish; Close + // then unblocks. + p.lock.RUnlock() + + require.NoError(t, <-pinReturned) + require.NoError(t, <-closeReturned) +} + +// TestCloseUnblocksParkedStream asserts Close unblocks a streamIndex +// goroutine that is parked on a send because no consumer is reading. +// Without the p.done signal in send(), Close would stall on wg.Wait +// for the lifetime of the caller's context. +func TestCloseUnblocksParkedStream(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + // Seed a few pins so streamIndex has entries to emit (and + // will therefore park on the first send against an absent + // consumer). + pinNodes(makeNodes(4, dserv), p, true) + + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + kch := p.RecursiveKeys(streamCtx, false) + + // Wait for the streamIndex goroutine to park on send. + synctest.Wait() + + require.NoError(t, p.Close()) + + // Channel must close after the goroutine exits. + for range kch { + } + }) +} + +// TestCloseConcurrent hammers Pin, RecursiveKeys, and Close from many +// goroutines. Run with -race to catch lifecycle races. Every admitted +// operation completes and every post-close caller sees ErrClosed. +func TestCloseConcurrent(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + nodes := makeNodes(4, dserv) + + var wg sync.WaitGroup + for i := 0; i < 32; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + _ = p.Pin(ctx, nodes[i%len(nodes)], true, "") + }(i) + } + for i := 0; i < 32; i++ { + wg.Add(1) + go func() { + defer wg.Done() + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + for range p.RecursiveKeys(streamCtx, false) { + } + }() + } + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = p.Close() + }() + } + wg.Wait() + + require.ErrorIs(t, p.Flush(ctx), ipfspin.ErrClosed) +} diff --git a/pinning/pinner/pin.go b/pinning/pinner/pin.go index efcb1c771..2d4a91dcc 100644 --- a/pinning/pinner/pin.go +++ b/pinning/pinner/pin.go @@ -79,6 +79,12 @@ func StringToMode(s string) (Mode, bool) { // ErrNotPinned is returned when trying to unpin items that are not pinned. var ErrNotPinned = errors.New("not pinned or pinned indirectly") +// ErrClosed is returned by [Pinner] methods after [Pinner.Close] has been +// called. Streaming methods ([Pinner.DirectKeys], [Pinner.RecursiveKeys], +// [Pinner.InternalPins]) surface it as the [StreamedPin.Err] of a single +// entry on the returned channel, which is then closed. +var ErrClosed = errors.New("pinner closed") + // A Pinner provides the necessary methods to keep track of Nodes which are // to be kept locally, according to a pin mode. In practice, a Pinner is in // charge of keeping the list of items from the local storage that should @@ -148,6 +154,18 @@ type Pinner interface { // InternalPins returns all cids kept pinned for the internal state of the // pinner InternalPins(ctx context.Context, detailed bool) <-chan StreamedPin + + // Close releases resources held by the pinner and blocks until every + // in-flight operation, including streaming goroutines from DirectKeys, + // RecursiveKeys, and InternalPins, has returned. + // + // Close does not close the backing datastore; the caller owns that + // lifecycle and must keep the datastore open until Close returns. + // + // After Close returns, every other Pinner method fails fast with + // [ErrClosed]. Close is idempotent and safe to call from any + // goroutine. + Close() error } // Pinned represents CID which has been pinned with a pinning strategy. From 70ffcfaf5262ca69442b672f4d192459f15818bc Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Wed, 13 May 2026 23:24:10 +0200 Subject: [PATCH 2/4] docs(pinner): tighten close lifecycle wording fix the errClosedChan rationale (buffering is needed because the send is synchronous and has no reader; there is no goroutine to leak) and the InternalPins comment that claimed the call was not tracked by p.wg despite begin() doing wg.Add. - changelog: condense the two close bullets, promote the downstream-impl break to an action-required callout, add #1150 refs - pinner.Pinner.Close / ErrClosed godoc: lead with the load-bearing verb, drop restatement - dspinner: errClosedChan, InternalPins, streamIndex send/recover, begin, Close, SetAutosync comments shortened or corrected --- CHANGELOG.md | 4 +-- pinning/pinner/dspinner/pin.go | 57 ++++++++++++++-------------------- pinning/pinner/pin.go | 18 +++++------ 3 files changed, 34 insertions(+), 45 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 085c73e36..d54316761 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,8 +16,8 @@ The following emojis are used to highlight certain changes: ### Added -- 🛠 `pinning/pinner`: `Pinner` now has a `Close() error` method. Close waits for every in-flight operation, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, to return before unblocking. After Close, every other method fails fast with the new `ErrClosed` sentinel; streaming methods surface it as the `Err` field of a single entry on the returned channel. Close is idempotent and safe to call from any goroutine. Downstream implementations of `Pinner` must add a `Close` method. -- `pinning/pinner/dspinner`: implements the new `Close`. Stream goroutines now also select on the pinner shutdown signal when they send, so Close never stalls on a parked consumer. Hosts that own the backing datastore should call `Close` on the pinner before closing the datastore to avoid the panic-on-use-after-close path in datastores such as pebble. +- 🛠 `pinning/pinner`: added `Pinner.Close() error`. Close drains all in-flight operations, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, before returning. After Close, every other method returns the new `ErrClosed` sentinel; streaming methods deliver it as `StreamedPin.Err` on a single entry, then close the channel. Close is idempotent and goroutine-safe. **Action required:** downstream `Pinner` implementations must add `Close`. [#1150](https://github.com/ipfs/boxo/pull/1150) +- `pinning/pinner/dspinner`: implements `Close`. Stream sends also select on the shutdown signal, so a parked consumer cannot stall Close. Hosts owning the datastore should call `Close` on the pinner before closing the datastore to avoid the use-after-close panic path in stores such as pebble. [#1150](https://github.com/ipfs/boxo/pull/1150) ### Changed diff --git a/pinning/pinner/dspinner/pin.go b/pinning/pinner/dspinner/pin.go index 22ddb56f5..27b2ad03f 100644 --- a/pinning/pinner/dspinner/pin.go +++ b/pinning/pinner/dspinner/pin.go @@ -101,10 +101,9 @@ type pinner struct { rootsProvider provider.MultihashProvider pinnedProvider provider.MultihashProvider - // Lifecycle state. closedMu serialises the isClosed check with - // wg.Add so Close reliably waits for every admitted operation. - // done is closed by Close and unblocks streaming goroutines parked - // on a send. + // Lifecycle state. closedMu serializes the isClosed check with + // wg.Add so Close waits for every admitted operation. done is + // closed by Close to unblock stream goroutines parked on a send. closedMu sync.Mutex isClosed bool done chan struct{} @@ -201,9 +200,8 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts . return p, nil } -// begin admits a caller. It returns [ipfspinner.ErrClosed] if the -// pinner has been closed; otherwise it increments the in-flight -// counter so that Close waits for the caller to finish. Successful +// begin admits a caller. Returns [ipfspinner.ErrClosed] if the pinner +// is closed; otherwise increments the in-flight counter. Successful // callers MUST pair begin with a deferred p.wg.Done. func (p *pinner) begin() error { p.closedMu.Lock() @@ -215,10 +213,9 @@ func (p *pinner) begin() error { return nil } -// errClosedChan returns a pre-filled buffered channel carrying a single -// [ipfspinner.StreamedPin] with err, followed by close. Buffered so -// callers that never read the channel and never cancel their context -// do not leak the send goroutine. +// errClosedChan returns a closed channel carrying a single +// [ipfspinner.StreamedPin] with err. The channel is buffered so the +// synchronous send below does not block when no reader is present. func errClosedChan(err error) <-chan ipfspinner.StreamedPin { out := make(chan ipfspinner.StreamedPin, 1) out <- ipfspinner.StreamedPin{Err: err} @@ -226,10 +223,8 @@ func errClosedChan(err error) <-chan ipfspinner.StreamedPin { return out } -// Close releases resources held by the pinner and blocks until every -// admitted operation has returned. Close does not close the backing -// datastore. After Close returns, every other method fails fast with -// [ipfspinner.ErrClosed]. Close is idempotent. +// Close implements [ipfspinner.Pinner.Close]. It does not close the +// backing datastore; the caller owns that lifecycle. func (p *pinner) Close() error { p.closedMu.Lock() if p.isClosed { @@ -248,9 +243,8 @@ func (p *pinner) Close() error { // This may be used to turn off autosync before doing many repeated pinning // operations, and then turn it on after. Returns the previous value. // -// SetAutosync is not part of the [ipfspinner.Pinner] interface and is -// not gated by Close: it mutates an in-memory flag only, never touches -// the datastore, and so is safe to call on a closed pinner. +// SetAutosync is not part of the [ipfspinner.Pinner] interface. It +// mutates an in-memory flag only and is safe to call after Close. func (p *pinner) SetAutosync(auto bool) bool { p.lock.Lock() defer p.lock.Unlock() @@ -1037,8 +1031,8 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile defer close(out) // send delivers sp and reports whether the consumer is still - // listening. A closed pinner unblocks send via p.done so that - // Close can wait on wg without stalling on a parked send. + // listening. The p.done case lets Close drain a stream whose + // consumer has stopped reading. send := func(sp ipfspinner.StreamedPin) (ok bool) { select { case <-ctx.Done(): @@ -1050,14 +1044,11 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile } } - // Defense in depth: if the backing datastore panics during - // enumeration, recover and surface the panic as an error on - // the output channel. Close on this pinner runs before the - // datastore is closed by the caller's lifecycle, so this path - // should no longer fire in practice. It remains here for - // callers that do not wire Close correctly; any datastore may - // panic on use after Close (pebble being the prominent case), - // and the pinner does not own the datastore's lifecycle. + // Defense in depth: surface a datastore panic as a stream + // error rather than crashing the process. Hosts that wire + // Close correctly will never trip this, but datastores like + // pebble panic on use-after-close and the pinner does not own + // the datastore lifecycle. defer func() { if r := recover(); r != nil { send(ipfspinner.StreamedPin{Err: fmt.Errorf("pin stream interrupted by datastore panic (likely shutdown): %v", r)}) @@ -1111,15 +1102,15 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile } // InternalPins returns all cids kept pinned for the internal state of the -// pinner. dspinner does not keep internal pins, so the returned channel -// is always empty; it carries a single [ipfspinner.ErrClosed] entry if -// Close has been called. +// pinner. dspinner has no internal pins, so the returned channel is +// always empty (or carries a single [ipfspinner.ErrClosed] entry after +// Close). func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspinner.StreamedPin { if err := p.begin(); err != nil { return errClosedChan(err) } - // Not tracked by p.wg: the channel is closed synchronously before - // we return, so there is no background work for Close to wait on. + // No background work: balance begin()'s wg.Add before returning the + // pre-closed channel. defer p.wg.Done() c := make(chan ipfspinner.StreamedPin) diff --git a/pinning/pinner/pin.go b/pinning/pinner/pin.go index 2d4a91dcc..00e5fe97d 100644 --- a/pinning/pinner/pin.go +++ b/pinning/pinner/pin.go @@ -79,10 +79,9 @@ func StringToMode(s string) (Mode, bool) { // ErrNotPinned is returned when trying to unpin items that are not pinned. var ErrNotPinned = errors.New("not pinned or pinned indirectly") -// ErrClosed is returned by [Pinner] methods after [Pinner.Close] has been -// called. Streaming methods ([Pinner.DirectKeys], [Pinner.RecursiveKeys], -// [Pinner.InternalPins]) surface it as the [StreamedPin.Err] of a single -// entry on the returned channel, which is then closed. +// ErrClosed is returned by [Pinner] methods after [Pinner.Close]. Streaming +// methods ([Pinner.DirectKeys], [Pinner.RecursiveKeys], [Pinner.InternalPins]) +// deliver it as [StreamedPin.Err] on a single entry, then close the channel. var ErrClosed = errors.New("pinner closed") // A Pinner provides the necessary methods to keep track of Nodes which are @@ -155,16 +154,15 @@ type Pinner interface { // pinner InternalPins(ctx context.Context, detailed bool) <-chan StreamedPin - // Close releases resources held by the pinner and blocks until every - // in-flight operation, including streaming goroutines from DirectKeys, - // RecursiveKeys, and InternalPins, has returned. + // Close drains all in-flight operations, including streaming + // goroutines from DirectKeys, RecursiveKeys, and InternalPins, and + // blocks until they return. After Close, every other Pinner method + // returns [ErrClosed]. // // Close does not close the backing datastore; the caller owns that // lifecycle and must keep the datastore open until Close returns. // - // After Close returns, every other Pinner method fails fast with - // [ErrClosed]. Close is idempotent and safe to call from any - // goroutine. + // Close is idempotent and goroutine-safe. Close() error } From 75481f4110119cc840d6742ff60dcd9ec34f80fa Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 14 May 2026 00:07:26 +0200 Subject: [PATCH 3/4] feat(pinner): close cancels in-flight contexts Close fires context.AfterFunc on every admitted op's derived ctx, so in-flight Pin/Unpin/streams that honor ctx bail out promptly instead of forcing kubo's shutdown to wait or trip the watchdog. - stopCtx + AfterFunc fan-out replaces the chan-based done signal - begin returns the derived ctx; admitted methods thread it through - streamIndex send watches one ctx.Done; Close-driven cancel from snapshotIndex surfaces as ErrClosed on the stream --- CHANGELOG.md | 4 +- pinning/pinner/dspinner/pin.go | 113 +++++++++++++++++----------- pinning/pinner/dspinner/pin_test.go | 91 +++++++++++++++++++++- pinning/pinner/pin.go | 15 +++- 4 files changed, 173 insertions(+), 50 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d54316761..a75314a32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,8 +16,8 @@ The following emojis are used to highlight certain changes: ### Added -- 🛠 `pinning/pinner`: added `Pinner.Close() error`. Close drains all in-flight operations, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, before returning. After Close, every other method returns the new `ErrClosed` sentinel; streaming methods deliver it as `StreamedPin.Err` on a single entry, then close the channel. Close is idempotent and goroutine-safe. **Action required:** downstream `Pinner` implementations must add `Close`. [#1150](https://github.com/ipfs/boxo/pull/1150) -- `pinning/pinner/dspinner`: implements `Close`. Stream sends also select on the shutdown signal, so a parked consumer cannot stall Close. Hosts owning the datastore should call `Close` on the pinner before closing the datastore to avoid the use-after-close panic path in stores such as pebble. [#1150](https://github.com/ipfs/boxo/pull/1150) +- 🛠 `pinning/pinner`: added `Pinner.Close() error`. Close cancels every in-flight operation's context, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, and waits for them to return. A scalar method that observes the cancellation may return `context.Canceled`; a stream interrupted by Close may surface `ErrClosed` on the channel before it closes. After Close returns, every other method returns the new `ErrClosed` sentinel; streaming methods deliver it as `StreamedPin.Err` on a single entry, then close the channel. Close is idempotent and goroutine-safe. **Action required:** downstream `Pinner` implementations must add `Close`. [#1150](https://github.com/ipfs/boxo/pull/1150) +- `pinning/pinner/dspinner`: implements `Close`. Close cancels the contexts of in-flight operations, so snapshot iteration in `RecursiveKeys`/`DirectKeys` and DAG fetches in `Pin` bail out promptly instead of draining to completion. Close returns as soon as those operations honor their ctx. Hosts owning the datastore should call `Close` on the pinner before closing the datastore to avoid the use-after-close panic path in stores such as pebble. [#1150](https://github.com/ipfs/boxo/pull/1150) ### Changed diff --git a/pinning/pinner/dspinner/pin.go b/pinning/pinner/dspinner/pin.go index 27b2ad03f..fbede8491 100644 --- a/pinning/pinner/dspinner/pin.go +++ b/pinning/pinner/dspinner/pin.go @@ -102,12 +102,16 @@ type pinner struct { pinnedProvider provider.MultihashProvider // Lifecycle state. closedMu serializes the isClosed check with - // wg.Add so Close waits for every admitted operation. done is - // closed by Close to unblock stream goroutines parked on a send. - closedMu sync.Mutex - isClosed bool - done chan struct{} - wg sync.WaitGroup + // wg.Add so Close waits for every admitted operation. Close + // cancels stopCtx; context.AfterFunc fans that cancellation into + // every admitted op's derived ctx, so in-flight work returns + // promptly through paths that already honor ctx (datastore + // queries, DAG fetches, index iteration). + closedMu sync.Mutex + isClosed bool + stopCtx context.Context + stopCancel context.CancelFunc + wg sync.WaitGroup } var _ ipfspinner.Pinner = (*pinner)(nil) @@ -174,8 +178,8 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts . nameIndex: dsindex.New(dstore, ds.NewKey(pinNameIndexPath)), dserv: dserv, dstore: dstore, - done: make(chan struct{}), } + p.stopCtx, p.stopCancel = context.WithCancel(context.Background()) for _, o := range opts { o.f(p) @@ -200,17 +204,25 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts . return p, nil } -// begin admits a caller. Returns [ipfspinner.ErrClosed] if the pinner -// is closed; otherwise increments the in-flight counter. Successful -// callers MUST pair begin with a deferred p.wg.Done. -func (p *pinner) begin() error { +// begin admits a caller and returns a context cancelled by either +// the parent ctx or [pinner.Close]. Successful callers MUST defer +// the returned cleanup. Returns [ipfspinner.ErrClosed] if the pinner +// is already closed. +func (p *pinner) begin(parent context.Context) (context.Context, func(), error) { p.closedMu.Lock() defer p.closedMu.Unlock() if p.isClosed { - return ipfspinner.ErrClosed + return parent, func() {}, ipfspinner.ErrClosed } p.wg.Add(1) - return nil + ctx, cancel := context.WithCancel(parent) + stop := context.AfterFunc(p.stopCtx, cancel) + cleanup := func() { + stop() + cancel() + p.wg.Done() + } + return ctx, cleanup, nil } // errClosedChan returns a closed channel carrying a single @@ -232,7 +244,9 @@ func (p *pinner) Close() error { return nil } p.isClosed = true - close(p.done) + // Fires every AfterFunc registered by begin, cancelling each + // admitted op's derived ctx, and closes stopCtx.Done(). + p.stopCancel() p.closedMu.Unlock() p.wg.Wait() @@ -255,12 +269,13 @@ func (p *pinner) SetAutosync(auto bool) bool { // Pin the given node, optionally recursive func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool, name string) error { - if err := p.begin(); err != nil { + ctx, cleanup, err := p.begin(ctx) + if err != nil { return err } - defer p.wg.Done() + defer cleanup() - err := p.dserv.Add(ctx, node) + err = p.dserv.Add(ctx, node) if err != nil { return err } @@ -496,10 +511,11 @@ func (p *pinner) removePin(ctx context.Context, pp *pin) error { // Unpin a given key func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error { - if err := p.begin(); err != nil { + ctx, cleanup, err := p.begin(ctx) + if err != nil { return err } - defer p.wg.Done() + defer cleanup() cidKey := c.KeyString() @@ -546,10 +562,11 @@ func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error { // IsPinned returns whether or not the given key is pinned // and an explanation of why its pinned func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) { - if err := p.begin(); err != nil { + ctx, cleanup, err := p.begin(ctx) + if err != nil { return "", false, err } - defer p.wg.Done() + defer cleanup() p.lock.RLock() defer p.lock.RUnlock() @@ -559,10 +576,11 @@ func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) // IsPinnedWithType returns whether or not the given cid is pinned with the // given pin type, as well as returning the type of pin its pinned with. func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) { - if err := p.begin(); err != nil { + ctx, cleanup, err := p.begin(ctx) + if err != nil { return "", false, err } - defer p.wg.Done() + defer cleanup() p.lock.RLock() defer p.lock.RUnlock() @@ -677,10 +695,11 @@ func (p *pinner) loadPinName(ctx context.Context, pin *ipfspinner.Pinned, pinID // CheckIfPinnedWithType implements the Pinner interface, checking specific pin types. // This method is optimized to only check the requested pin type(s). func (p *pinner) CheckIfPinnedWithType(ctx context.Context, mode ipfspinner.Mode, includeNames bool, cids ...cid.Cid) ([]ipfspinner.Pinned, error) { - if err := p.begin(); err != nil { + ctx, cleanup, err := p.begin(ctx) + if err != nil { return nil, err } - defer p.wg.Done() + defer cleanup() p.lock.RLock() defer p.lock.RUnlock() @@ -1019,26 +1038,26 @@ func (p *pinner) snapshotIndex(ctx context.Context, index dsindex.Indexer) ([]in return entries, err } -func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin { - if err := p.begin(); err != nil { +func (p *pinner) streamIndex(parent context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin { + ctx, cleanup, err := p.begin(parent) + if err != nil { return errClosedChan(err) } out := make(chan ipfspinner.StreamedPin) go func() { - defer p.wg.Done() + defer cleanup() defer close(out) - // send delivers sp and reports whether the consumer is still - // listening. The p.done case lets Close drain a stream whose - // consumer has stopped reading. + // send delivers sp and reports whether to keep going. The + // derived ctx is cancelled by the caller or by Close, so one + // ctx.Done() case covers both "consumer wandered off" and + // "shutting down" and the goroutine never parks. send := func(sp ipfspinner.StreamedPin) (ok bool) { select { case <-ctx.Done(): return false - case <-p.done: - return false case out <- sp: return true } @@ -1057,6 +1076,12 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile entries, err := p.snapshotIndex(ctx, index) if err != nil { + // Map a Close-driven cancel into ErrClosed so the + // consumer sees a meaningful error; a caller-driven + // cancel keeps its original ctx.Err(). + if p.stopCtx.Err() != nil && parent.Err() == nil { + err = ipfspinner.ErrClosed + } send(ipfspinner.StreamedPin{Err: err}) return } @@ -1106,12 +1131,13 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile // always empty (or carries a single [ipfspinner.ErrClosed] entry after // Close). func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspinner.StreamedPin { - if err := p.begin(); err != nil { + _, cleanup, err := p.begin(ctx) + if err != nil { return errClosedChan(err) } // No background work: balance begin()'s wg.Add before returning the // pre-closed channel. - defer p.wg.Done() + defer cleanup() c := make(chan ipfspinner.StreamedPin) close(c) @@ -1123,10 +1149,11 @@ func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspin // // TODO: This will not work when multiple pins are supported func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error { - if err := p.begin(); err != nil { + ctx, cleanup, err := p.begin(ctx) + if err != nil { return err } - defer p.wg.Done() + defer cleanup() p.lock.Lock() defer p.lock.Unlock() @@ -1208,15 +1235,16 @@ func (p *pinner) flushPins(ctx context.Context, force bool) error { // Flush encodes and writes pinner keysets to the datastore func (p *pinner) Flush(ctx context.Context) error { - if err := p.begin(); err != nil { + ctx, cleanup, err := p.begin(ctx) + if err != nil { return err } - defer p.wg.Done() + defer cleanup() p.lock.Lock() defer p.lock.Unlock() - err := p.flushDagService(ctx, true) + err = p.flushDagService(ctx, true) if err != nil { return err } @@ -1227,10 +1255,11 @@ func (p *pinner) Flush(ctx context.Context) error { // PinWithMode allows the user to have fine grained control over pin // counts func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, name string) error { - if err := p.begin(); err != nil { + ctx, cleanup, err := p.begin(ctx) + if err != nil { return err } - defer p.wg.Done() + defer cleanup() // TODO: remove his to support multiple pins per CID switch mode { diff --git a/pinning/pinner/dspinner/pin_test.go b/pinning/pinner/dspinner/pin_test.go index df4d8acb2..8ff7be417 100644 --- a/pinning/pinner/dspinner/pin_test.go +++ b/pinning/pinner/dspinner/pin_test.go @@ -1672,6 +1672,10 @@ func TestCloseErrClosedAllMethods(t *testing.T) { // the pinner's write lock from the test to stall an in-flight Pin // past its begin() admission, then verify Close does not return // while the Pin is still running. +// TestCloseWaitsForInFlightOperation asserts Close cannot return +// while an admitted operation is still running. Pin is blocked on a +// pinner lock that is not ctx-aware, so the cancellation fan-out +// cannot unstick it; Close must still wait until Pin actually exits. func TestCloseWaitsForInFlightOperation(t *testing.T) { ctx := t.Context() @@ -1708,10 +1712,12 @@ func TestCloseWaitsForInFlightOperation(t *testing.T) { } // Releasing the RLock lets Pin acquire Lock and finish; Close - // then unblocks. + // then unblocks. Pin sees its ctx already cancelled by the time + // it acquires the lock (because Close fired stopCancel while Pin + // was waiting), so Pin returns context.Canceled. p.lock.RUnlock() - require.NoError(t, <-pinReturned) + require.ErrorIs(t, <-pinReturned, context.Canceled) require.NoError(t, <-closeReturned) } @@ -1788,3 +1794,84 @@ func TestCloseConcurrent(t *testing.T) { require.ErrorIs(t, p.Flush(ctx), ipfspin.ErrClosed) } + +// blockingDAGService blocks Add until its context is cancelled. The +// first Add call signals via `entered` so a test goroutine can +// synchronize with it. +type blockingDAGService struct { + ipld.DAGService + entered chan struct{} +} + +func (b *blockingDAGService) Add(ctx context.Context, n ipld.Node) error { + select { + case b.entered <- struct{}{}: + default: + } + <-ctx.Done() + return ctx.Err() +} + +// TestCloseCancelsInFlightPin asserts that Close cancels the +// contexts of admitted in-flight operations. A Pin blocked inside a +// slow downstream call must return promptly with context.Canceled +// once Close fires, rather than stalling Close until the call +// finishes on its own. +func TestCloseCancelsInFlightPin(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + dstore, baseDserv := makeStore() + dserv := &blockingDAGService{DAGService: baseDserv, entered: make(chan struct{}, 1)} + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + n, _ := randNode() + pinErr := make(chan error, 1) + go func() { + pinErr <- p.Pin(ctx, n, true, "") + }() + + // Pin is blocked inside dserv.Add; the cancel signal it + // observes must come from the pinner's stopCtx fan-out. + <-dserv.entered + synctest.Wait() + + require.NoError(t, p.Close()) + + require.ErrorIs(t, <-pinErr, context.Canceled) + }) +} + +// TestCloseDuringStreamYieldsErrClosed asserts that when Close +// interrupts a stream goroutine, any error delivered on the channel +// surfaces as ErrClosed rather than the raw context.Canceled from the +// derived ctx. Delivery is best-effort (the send select races between +// ctx.Done and the consumer); the assertion is on the *type* of error +// when one is delivered. +func TestCloseDuringStreamYieldsErrClosed(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + pinNodes(makeNodes(4, dserv), p, true) + + // Caller ctx outlives Close, so any error observed is from + // the pinner's stopCtx, not parent cancellation. + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + kch := p.RecursiveKeys(streamCtx, false) + + synctest.Wait() + + require.NoError(t, p.Close()) + + for sc := range kch { + if sc.Err != nil { + require.ErrorIs(t, sc.Err, ipfspin.ErrClosed, + "want ErrClosed (not %v) when Close interrupts a stream", sc.Err) + } + } + }) +} diff --git a/pinning/pinner/pin.go b/pinning/pinner/pin.go index 00e5fe97d..6dd586ef9 100644 --- a/pinning/pinner/pin.go +++ b/pinning/pinner/pin.go @@ -154,13 +154,20 @@ type Pinner interface { // pinner InternalPins(ctx context.Context, detailed bool) <-chan StreamedPin - // Close drains all in-flight operations, including streaming - // goroutines from DirectKeys, RecursiveKeys, and InternalPins, and - // blocks until they return. After Close, every other Pinner method - // returns [ErrClosed]. + // Close shuts the pinner down. It cancels every in-flight + // operation's context, including streaming goroutines from + // DirectKeys, RecursiveKeys, and InternalPins, and waits for them + // to return. A scalar method that observes the cancellation may + // return [context.Canceled]; a stream interrupted by Close may + // surface [ErrClosed] on the channel before it closes. After Close + // returns, every other Pinner method returns [ErrClosed]. // // Close does not close the backing datastore; the caller owns that // lifecycle and must keep the datastore open until Close returns. + // Close itself returns promptly as long as in-flight operations + // honor their context. An operation that ignores ctx (a downstream + // bug) can still block Close, so hosts that need a hard shutdown + // deadline should bound Close at the call site. // // Close is idempotent and goroutine-safe. Close() error From 121e7adc00fc965375a53c9d55e52a3901e0c15d Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Thu, 14 May 2026 17:37:56 +0200 Subject: [PATCH 4/4] test(pinner): synctest close-waits-for-inflight drop the 20ms/50ms timing in TestCloseWaitsForInFlightOperation and move it into a synctest bubble (gammazero review). Go 1.26 does not treat sync.RWMutex.Lock waiting behind a durably-held RLock as durably blocked, so swap the non-ctx-aware blocker from the pinner's RWMutex to a tiny ctxIgnoringDAGService whose Add parks on a channel and ignores ctx. Same behavioral assertion (Close must wait on wg when the cancellation fan-out cannot unstick the in-flight op), deterministic and ~50x faster. also drop the duplicated doc paragraph above the test. --- pinning/pinner/dspinner/pin_test.go | 106 ++++++++++++++++------------ 1 file changed, 62 insertions(+), 44 deletions(-) diff --git a/pinning/pinner/dspinner/pin_test.go b/pinning/pinner/dspinner/pin_test.go index 8ff7be417..2deff46c3 100644 --- a/pinning/pinner/dspinner/pin_test.go +++ b/pinning/pinner/dspinner/pin_test.go @@ -1667,58 +1667,58 @@ func TestCloseErrClosedAllMethods(t *testing.T) { assertStreamedErrClosed(t, "InternalPins", p.InternalPins(ctx, false)) } -// TestCloseWaitsForInFlightOperation asserts Close blocks until an -// operation admitted before Close was called has finished. We hold -// the pinner's write lock from the test to stall an in-flight Pin -// past its begin() admission, then verify Close does not return -// while the Pin is still running. // TestCloseWaitsForInFlightOperation asserts Close cannot return -// while an admitted operation is still running. Pin is blocked on a -// pinner lock that is not ctx-aware, so the cancellation fan-out -// cannot unstick it; Close must still wait until Pin actually exits. +// while an admitted operation is still running, even when the +// operation is parked in a non-ctx-aware blocker that the pinner's +// stopCtx fan-out cannot interrupt. Pin sits inside a ctx-ignoring +// DAG service; Close must wait on wg until Pin actually exits. +// +// Runs inside a synctest bubble so the "Pin is parked", "Close is +// parked on wg.Wait", and "Pin exits after release" handoffs are +// deterministic instead of timed. func TestCloseWaitsForInFlightOperation(t *testing.T) { - ctx := t.Context() + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() - dstore, dserv := makeStore() - p, err := New(ctx, dstore, dserv) - require.NoError(t, err) + dstore, baseDserv := makeStore() + dserv := &ctxIgnoringDAGService{ + DAGService: baseDserv, + entered: make(chan struct{}, 1), + release: make(chan struct{}), + } + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) - n, _ := randNode() - require.NoError(t, dserv.Add(ctx, n)) + n, _ := randNode() + pinReturned := make(chan error, 1) + go func() { + pinReturned <- p.Pin(ctx, n, true, "") + }() + // Wait for Pin to park in dserv.Add. + <-dserv.entered + synctest.Wait() - // Acquire the index read lock so Pin's internal Lock() call - // blocks after begin() has already incremented wg. - p.lock.RLock() - - pinStarted := make(chan struct{}) - pinReturned := make(chan error, 1) - go func() { - close(pinStarted) - pinReturned <- p.Pin(ctx, n, true, "") - }() - <-pinStarted - // Give Pin time to pass begin() and block on p.lock.Lock(). - time.Sleep(20 * time.Millisecond) - - closeReturned := make(chan error, 1) - go func() { - closeReturned <- p.Close() - }() + closeReturned := make(chan error, 1) + go func() { + closeReturned <- p.Close() + }() + // Close fires stopCancel (Pin is in a non-ctx-aware blocker + // and ignores it) and parks on wg.Wait. + synctest.Wait() - select { - case <-closeReturned: - t.Fatal("Close returned while Pin was still in flight") - case <-time.After(50 * time.Millisecond): - } + select { + case <-closeReturned: + t.Fatal("Close returned while Pin was still in flight") + default: + } - // Releasing the RLock lets Pin acquire Lock and finish; Close - // then unblocks. Pin sees its ctx already cancelled by the time - // it acquires the lock (because Close fired stopCancel while Pin - // was waiting), so Pin returns context.Canceled. - p.lock.RUnlock() + // Let Pin out of dserv.Add. Its ctx is already cancelled, so + // the next ctx-aware step in Pin returns context.Canceled. + close(dserv.release) - require.ErrorIs(t, <-pinReturned, context.Canceled) - require.NoError(t, <-closeReturned) + require.ErrorIs(t, <-pinReturned, context.Canceled) + require.NoError(t, <-closeReturned) + }) } // TestCloseUnblocksParkedStream asserts Close unblocks a streamIndex @@ -1812,6 +1812,24 @@ func (b *blockingDAGService) Add(ctx context.Context, n ipld.Node) error { return ctx.Err() } +// ctxIgnoringDAGService blocks Add on release and never observes +// ctx, modeling an in-flight downstream call that the pinner's +// stopCtx fan-out cannot interrupt. +type ctxIgnoringDAGService struct { + ipld.DAGService + entered chan struct{} + release chan struct{} +} + +func (s *ctxIgnoringDAGService) Add(_ context.Context, n ipld.Node) error { + select { + case s.entered <- struct{}{}: + default: + } + <-s.release + return s.DAGService.Add(context.Background(), n) +} + // TestCloseCancelsInFlightPin asserts that Close cancels the // contexts of admitted in-flight operations. A Pin blocked inside a // slow downstream call must return promptly with context.Canceled