diff --git a/CHANGELOG.md b/CHANGELOG.md index 91e497b83..c5f8fe229 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,9 @@ The following emojis are used to highlight certain changes: ### Added +- 🛠 `pinning/pinner`: added `Pinner.Close() error`. Close cancels every in-flight operation's context, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, and waits for them to return. A scalar method that observes the cancellation may return `context.Canceled`; a stream interrupted by Close may surface `ErrClosed` on the channel before it closes. After Close returns, every other method returns the new `ErrClosed` sentinel; streaming methods deliver it as `StreamedPin.Err` on a single entry, then close the channel. Close is idempotent and goroutine-safe. **Action required:** downstream `Pinner` implementations must add `Close`. [#1150](https://github.com/ipfs/boxo/pull/1150) +- `pinning/pinner/dspinner`: implements `Close`. Close cancels the contexts of in-flight operations, so snapshot iteration in `RecursiveKeys`/`DirectKeys` and DAG fetches in `Pin` bail out promptly instead of draining to completion. Close returns as soon as those operations honor their ctx. Hosts owning the datastore should call `Close` on the pinner before closing the datastore to avoid the use-after-close panic path in stores such as pebble. [#1150](https://github.com/ipfs/boxo/pull/1150) + ### Changed ### Removed diff --git a/pinning/pinner/dspinner/pin.go b/pinning/pinner/dspinner/pin.go index 3dc44863d..fbede8491 100644 --- a/pinning/pinner/dspinner/pin.go +++ b/pinning/pinner/dspinner/pin.go @@ -100,6 +100,18 @@ type pinner struct { rootsProvider provider.MultihashProvider pinnedProvider provider.MultihashProvider + + // Lifecycle state. closedMu serializes the isClosed check with + // wg.Add so Close waits for every admitted operation. Close + // cancels stopCtx; context.AfterFunc fans that cancellation into + // every admitted op's derived ctx, so in-flight work returns + // promptly through paths that already honor ctx (datastore + // queries, DAG fetches, index iteration). + closedMu sync.Mutex + isClosed bool + stopCtx context.Context + stopCancel context.CancelFunc + wg sync.WaitGroup } var _ ipfspinner.Pinner = (*pinner)(nil) @@ -167,6 +179,7 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts . dserv: dserv, dstore: dstore, } + p.stopCtx, p.stopCancel = context.WithCancel(context.Background()) for _, o := range opts { o.f(p) @@ -191,9 +204,61 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts . return p, nil } +// begin admits a caller and returns a context cancelled by either +// the parent ctx or [pinner.Close]. Successful callers MUST defer +// the returned cleanup. Returns [ipfspinner.ErrClosed] if the pinner +// is already closed. +func (p *pinner) begin(parent context.Context) (context.Context, func(), error) { + p.closedMu.Lock() + defer p.closedMu.Unlock() + if p.isClosed { + return parent, func() {}, ipfspinner.ErrClosed + } + p.wg.Add(1) + ctx, cancel := context.WithCancel(parent) + stop := context.AfterFunc(p.stopCtx, cancel) + cleanup := func() { + stop() + cancel() + p.wg.Done() + } + return ctx, cleanup, nil +} + +// errClosedChan returns a closed channel carrying a single +// [ipfspinner.StreamedPin] with err. The channel is buffered so the +// synchronous send below does not block when no reader is present. +func errClosedChan(err error) <-chan ipfspinner.StreamedPin { + out := make(chan ipfspinner.StreamedPin, 1) + out <- ipfspinner.StreamedPin{Err: err} + close(out) + return out +} + +// Close implements [ipfspinner.Pinner.Close]. It does not close the +// backing datastore; the caller owns that lifecycle. +func (p *pinner) Close() error { + p.closedMu.Lock() + if p.isClosed { + p.closedMu.Unlock() + return nil + } + p.isClosed = true + // Fires every AfterFunc registered by begin, cancelling each + // admitted op's derived ctx, and closes stopCtx.Done(). + p.stopCancel() + p.closedMu.Unlock() + + p.wg.Wait() + return nil +} + // SetAutosync allows auto-syncing to be enabled or disabled during runtime. // This may be used to turn off autosync before doing many repeated pinning // operations, and then turn it on after. Returns the previous value. +// +// SetAutosync is not part of the [ipfspinner.Pinner] interface. It +// mutates an in-memory flag only and is safe to call after Close. func (p *pinner) SetAutosync(auto bool) bool { p.lock.Lock() defer p.lock.Unlock() @@ -204,7 +269,13 @@ func (p *pinner) SetAutosync(auto bool) bool { // Pin the given node, optionally recursive func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool, name string) error { - err := p.dserv.Add(ctx, node) + ctx, cleanup, err := p.begin(ctx) + if err != nil { + return err + } + defer cleanup() + + err = p.dserv.Add(ctx, node) if err != nil { return err } @@ -440,6 +511,12 @@ func (p *pinner) removePin(ctx context.Context, pp *pin) error { // Unpin a given key func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error { + ctx, cleanup, err := p.begin(ctx) + if err != nil { + return err + } + defer cleanup() + cidKey := c.KeyString() p.lock.Lock() @@ -485,6 +562,12 @@ func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error { // IsPinned returns whether or not the given key is pinned // and an explanation of why its pinned func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) { + ctx, cleanup, err := p.begin(ctx) + if err != nil { + return "", false, err + } + defer cleanup() + p.lock.RLock() defer p.lock.RUnlock() return p.isPinnedWithType(ctx, c, ipfspinner.Any) @@ -493,6 +576,12 @@ func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) // IsPinnedWithType returns whether or not the given cid is pinned with the // given pin type, as well as returning the type of pin its pinned with. func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) { + ctx, cleanup, err := p.begin(ctx) + if err != nil { + return "", false, err + } + defer cleanup() + p.lock.RLock() defer p.lock.RUnlock() return p.isPinnedWithType(ctx, c, mode) @@ -606,6 +695,12 @@ func (p *pinner) loadPinName(ctx context.Context, pin *ipfspinner.Pinned, pinID // CheckIfPinnedWithType implements the Pinner interface, checking specific pin types. // This method is optimized to only check the requested pin type(s). func (p *pinner) CheckIfPinnedWithType(ctx context.Context, mode ipfspinner.Mode, includeNames bool, cids ...cid.Cid) ([]ipfspinner.Pinned, error) { + ctx, cleanup, err := p.begin(ctx) + if err != nil { + return nil, err + } + defer cleanup() + p.lock.RLock() defer p.lock.RUnlock() @@ -943,12 +1038,22 @@ func (p *pinner) snapshotIndex(ctx context.Context, index dsindex.Indexer) ([]in return entries, err } -func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin { +func (p *pinner) streamIndex(parent context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin { + ctx, cleanup, err := p.begin(parent) + if err != nil { + return errClosedChan(err) + } + out := make(chan ipfspinner.StreamedPin) go func() { + defer cleanup() defer close(out) + // send delivers sp and reports whether to keep going. The + // derived ctx is cancelled by the caller or by Close, so one + // ctx.Done() case covers both "consumer wandered off" and + // "shutting down" and the goroutine never parks. send := func(sp ipfspinner.StreamedPin) (ok bool) { select { case <-ctx.Done(): @@ -958,15 +1063,11 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile } } - // If the backing datastore panics during enumeration, - // recover and surface the panic as an error on the output - // channel instead of crashing the process. This is - // datastore-implementation agnostic: any datastore may - // panic on use after Close (pebble being the prominent - // case), and the pinner does not own the datastore's - // lifecycle. The wording below gives the caller enough - // context to treat it as an expected shutdown-time - // interruption rather than a real failure. + // Defense in depth: surface a datastore panic as a stream + // error rather than crashing the process. Hosts that wire + // Close correctly will never trip this, but datastores like + // pebble panic on use-after-close and the pinner does not own + // the datastore lifecycle. defer func() { if r := recover(); r != nil { send(ipfspinner.StreamedPin{Err: fmt.Errorf("pin stream interrupted by datastore panic (likely shutdown): %v", r)}) @@ -975,6 +1076,12 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile entries, err := p.snapshotIndex(ctx, index) if err != nil { + // Map a Close-driven cancel into ErrClosed so the + // consumer sees a meaningful error; a caller-driven + // cancel keeps its original ctx.Err(). + if p.stopCtx.Err() != nil && parent.Err() == nil { + err = ipfspinner.ErrClosed + } send(ipfspinner.StreamedPin{Err: err}) return } @@ -1020,8 +1127,18 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile } // InternalPins returns all cids kept pinned for the internal state of the -// pinner +// pinner. dspinner has no internal pins, so the returned channel is +// always empty (or carries a single [ipfspinner.ErrClosed] entry after +// Close). func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspinner.StreamedPin { + _, cleanup, err := p.begin(ctx) + if err != nil { + return errClosedChan(err) + } + // No background work: balance begin()'s wg.Add before returning the + // pre-closed channel. + defer cleanup() + c := make(chan ipfspinner.StreamedPin) close(c) return c @@ -1032,6 +1149,12 @@ func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspin // // TODO: This will not work when multiple pins are supported func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error { + ctx, cleanup, err := p.begin(ctx) + if err != nil { + return err + } + defer cleanup() + p.lock.Lock() defer p.lock.Unlock() @@ -1112,10 +1235,16 @@ func (p *pinner) flushPins(ctx context.Context, force bool) error { // Flush encodes and writes pinner keysets to the datastore func (p *pinner) Flush(ctx context.Context) error { + ctx, cleanup, err := p.begin(ctx) + if err != nil { + return err + } + defer cleanup() + p.lock.Lock() defer p.lock.Unlock() - err := p.flushDagService(ctx, true) + err = p.flushDagService(ctx, true) if err != nil { return err } @@ -1126,6 +1255,12 @@ func (p *pinner) Flush(ctx context.Context) error { // PinWithMode allows the user to have fine grained control over pin // counts func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, name string) error { + ctx, cleanup, err := p.begin(ctx) + if err != nil { + return err + } + defer cleanup() + // TODO: remove his to support multiple pins per CID switch mode { case ipfspinner.Recursive: diff --git a/pinning/pinner/dspinner/pin_test.go b/pinning/pinner/dspinner/pin_test.go index b7a254657..2deff46c3 100644 --- a/pinning/pinner/dspinner/pin_test.go +++ b/pinning/pinner/dspinner/pin_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "path" + "sync" "sync/atomic" "testing" "testing/synctest" @@ -1590,3 +1591,305 @@ func TestStreamIndexDoesNotBlockWriters(t *testing.T) { }) } } + +// TestCloseIdempotent asserts Close can be called repeatedly and from +// multiple goroutines without error or panic. +func TestCloseIdempotent(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + require.NoError(t, p.Close()) + require.NoError(t, p.Close()) + + var wg sync.WaitGroup + for i := 0; i < 8; i++ { + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, p.Close()) + }() + } + wg.Wait() +} + +// TestCloseErrClosedAllMethods asserts every public Pinner method +// returns ErrClosed after Close has returned. Streaming methods +// surface ErrClosed as StreamedPin.Err on the first (and only) +// channel entry. +func TestCloseErrClosedAllMethods(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + // Seed one pin so the index is not empty; Close still succeeds + // but we want post-close streaming methods to emit ErrClosed + // rather than an empty stream, which would be indistinguishable + // from "no pins". + n, k := randNode() + require.NoError(t, dserv.Add(ctx, n)) + require.NoError(t, p.Pin(ctx, n, true, "")) + + require.NoError(t, p.Close()) + + // Scalar methods. + require.ErrorIs(t, p.Pin(ctx, n, true, ""), ipfspin.ErrClosed) + require.ErrorIs(t, p.Unpin(ctx, k, true), ipfspin.ErrClosed) + require.ErrorIs(t, p.Update(ctx, k, k, true), ipfspin.ErrClosed) + require.ErrorIs(t, p.Flush(ctx), ipfspin.ErrClosed) + require.ErrorIs(t, p.PinWithMode(ctx, k, ipfspin.Recursive, ""), ipfspin.ErrClosed) + + _, _, err = p.IsPinned(ctx, k) + require.ErrorIs(t, err, ipfspin.ErrClosed) + _, _, err = p.IsPinnedWithType(ctx, k, ipfspin.Recursive) + require.ErrorIs(t, err, ipfspin.ErrClosed) + + _, err = p.CheckIfPinned(ctx, k) + require.ErrorIs(t, err, ipfspin.ErrClosed) + _, err = p.CheckIfPinnedWithType(ctx, ipfspin.Recursive, false, k) + require.ErrorIs(t, err, ipfspin.ErrClosed) + + // Streaming methods. + assertStreamedErrClosed := func(t *testing.T, name string, ch <-chan ipfspin.StreamedPin) { + t.Helper() + got, ok := <-ch + require.True(t, ok, "%s: expected one entry before close, channel already closed", name) + require.ErrorIs(t, got.Err, ipfspin.ErrClosed, "%s: want ErrClosed on first entry", name) + _, ok = <-ch + require.False(t, ok, "%s: channel must be closed after ErrClosed entry", name) + } + assertStreamedErrClosed(t, "RecursiveKeys", p.RecursiveKeys(ctx, false)) + assertStreamedErrClosed(t, "DirectKeys", p.DirectKeys(ctx, false)) + assertStreamedErrClosed(t, "InternalPins", p.InternalPins(ctx, false)) +} + +// TestCloseWaitsForInFlightOperation asserts Close cannot return +// while an admitted operation is still running, even when the +// operation is parked in a non-ctx-aware blocker that the pinner's +// stopCtx fan-out cannot interrupt. Pin sits inside a ctx-ignoring +// DAG service; Close must wait on wg until Pin actually exits. +// +// Runs inside a synctest bubble so the "Pin is parked", "Close is +// parked on wg.Wait", and "Pin exits after release" handoffs are +// deterministic instead of timed. +func TestCloseWaitsForInFlightOperation(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + + dstore, baseDserv := makeStore() + dserv := &ctxIgnoringDAGService{ + DAGService: baseDserv, + entered: make(chan struct{}, 1), + release: make(chan struct{}), + } + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + n, _ := randNode() + pinReturned := make(chan error, 1) + go func() { + pinReturned <- p.Pin(ctx, n, true, "") + }() + // Wait for Pin to park in dserv.Add. + <-dserv.entered + synctest.Wait() + + closeReturned := make(chan error, 1) + go func() { + closeReturned <- p.Close() + }() + // Close fires stopCancel (Pin is in a non-ctx-aware blocker + // and ignores it) and parks on wg.Wait. + synctest.Wait() + + select { + case <-closeReturned: + t.Fatal("Close returned while Pin was still in flight") + default: + } + + // Let Pin out of dserv.Add. Its ctx is already cancelled, so + // the next ctx-aware step in Pin returns context.Canceled. + close(dserv.release) + + require.ErrorIs(t, <-pinReturned, context.Canceled) + require.NoError(t, <-closeReturned) + }) +} + +// TestCloseUnblocksParkedStream asserts Close unblocks a streamIndex +// goroutine that is parked on a send because no consumer is reading. +// Without the p.done signal in send(), Close would stall on wg.Wait +// for the lifetime of the caller's context. +func TestCloseUnblocksParkedStream(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + // Seed a few pins so streamIndex has entries to emit (and + // will therefore park on the first send against an absent + // consumer). + pinNodes(makeNodes(4, dserv), p, true) + + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + kch := p.RecursiveKeys(streamCtx, false) + + // Wait for the streamIndex goroutine to park on send. + synctest.Wait() + + require.NoError(t, p.Close()) + + // Channel must close after the goroutine exits. + for range kch { + } + }) +} + +// TestCloseConcurrent hammers Pin, RecursiveKeys, and Close from many +// goroutines. Run with -race to catch lifecycle races. Every admitted +// operation completes and every post-close caller sees ErrClosed. +func TestCloseConcurrent(t *testing.T) { + ctx := t.Context() + + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + nodes := makeNodes(4, dserv) + + var wg sync.WaitGroup + for i := 0; i < 32; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + _ = p.Pin(ctx, nodes[i%len(nodes)], true, "") + }(i) + } + for i := 0; i < 32; i++ { + wg.Add(1) + go func() { + defer wg.Done() + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + for range p.RecursiveKeys(streamCtx, false) { + } + }() + } + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _ = p.Close() + }() + } + wg.Wait() + + require.ErrorIs(t, p.Flush(ctx), ipfspin.ErrClosed) +} + +// blockingDAGService blocks Add until its context is cancelled. The +// first Add call signals via `entered` so a test goroutine can +// synchronize with it. +type blockingDAGService struct { + ipld.DAGService + entered chan struct{} +} + +func (b *blockingDAGService) Add(ctx context.Context, n ipld.Node) error { + select { + case b.entered <- struct{}{}: + default: + } + <-ctx.Done() + return ctx.Err() +} + +// ctxIgnoringDAGService blocks Add on release and never observes +// ctx, modeling an in-flight downstream call that the pinner's +// stopCtx fan-out cannot interrupt. +type ctxIgnoringDAGService struct { + ipld.DAGService + entered chan struct{} + release chan struct{} +} + +func (s *ctxIgnoringDAGService) Add(_ context.Context, n ipld.Node) error { + select { + case s.entered <- struct{}{}: + default: + } + <-s.release + return s.DAGService.Add(context.Background(), n) +} + +// TestCloseCancelsInFlightPin asserts that Close cancels the +// contexts of admitted in-flight operations. A Pin blocked inside a +// slow downstream call must return promptly with context.Canceled +// once Close fires, rather than stalling Close until the call +// finishes on its own. +func TestCloseCancelsInFlightPin(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + dstore, baseDserv := makeStore() + dserv := &blockingDAGService{DAGService: baseDserv, entered: make(chan struct{}, 1)} + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + n, _ := randNode() + pinErr := make(chan error, 1) + go func() { + pinErr <- p.Pin(ctx, n, true, "") + }() + + // Pin is blocked inside dserv.Add; the cancel signal it + // observes must come from the pinner's stopCtx fan-out. + <-dserv.entered + synctest.Wait() + + require.NoError(t, p.Close()) + + require.ErrorIs(t, <-pinErr, context.Canceled) + }) +} + +// TestCloseDuringStreamYieldsErrClosed asserts that when Close +// interrupts a stream goroutine, any error delivered on the channel +// surfaces as ErrClosed rather than the raw context.Canceled from the +// derived ctx. Delivery is best-effort (the send select races between +// ctx.Done and the consumer); the assertion is on the *type* of error +// when one is delivered. +func TestCloseDuringStreamYieldsErrClosed(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + dstore, dserv := makeStore() + p, err := New(ctx, dstore, dserv) + require.NoError(t, err) + + pinNodes(makeNodes(4, dserv), p, true) + + // Caller ctx outlives Close, so any error observed is from + // the pinner's stopCtx, not parent cancellation. + streamCtx, cancel := context.WithCancel(ctx) + defer cancel() + kch := p.RecursiveKeys(streamCtx, false) + + synctest.Wait() + + require.NoError(t, p.Close()) + + for sc := range kch { + if sc.Err != nil { + require.ErrorIs(t, sc.Err, ipfspin.ErrClosed, + "want ErrClosed (not %v) when Close interrupts a stream", sc.Err) + } + } + }) +} diff --git a/pinning/pinner/pin.go b/pinning/pinner/pin.go index efcb1c771..6dd586ef9 100644 --- a/pinning/pinner/pin.go +++ b/pinning/pinner/pin.go @@ -79,6 +79,11 @@ func StringToMode(s string) (Mode, bool) { // ErrNotPinned is returned when trying to unpin items that are not pinned. var ErrNotPinned = errors.New("not pinned or pinned indirectly") +// ErrClosed is returned by [Pinner] methods after [Pinner.Close]. Streaming +// methods ([Pinner.DirectKeys], [Pinner.RecursiveKeys], [Pinner.InternalPins]) +// deliver it as [StreamedPin.Err] on a single entry, then close the channel. +var ErrClosed = errors.New("pinner closed") + // A Pinner provides the necessary methods to keep track of Nodes which are // to be kept locally, according to a pin mode. In practice, a Pinner is in // charge of keeping the list of items from the local storage that should @@ -148,6 +153,24 @@ type Pinner interface { // InternalPins returns all cids kept pinned for the internal state of the // pinner InternalPins(ctx context.Context, detailed bool) <-chan StreamedPin + + // Close shuts the pinner down. It cancels every in-flight + // operation's context, including streaming goroutines from + // DirectKeys, RecursiveKeys, and InternalPins, and waits for them + // to return. A scalar method that observes the cancellation may + // return [context.Canceled]; a stream interrupted by Close may + // surface [ErrClosed] on the channel before it closes. After Close + // returns, every other Pinner method returns [ErrClosed]. + // + // Close does not close the backing datastore; the caller owns that + // lifecycle and must keep the datastore open until Close returns. + // Close itself returns promptly as long as in-flight operations + // honor their context. An operation that ignores ctx (a downstream + // bug) can still block Close, so hosts that need a hard shutdown + // deadline should bound Close at the call site. + // + // Close is idempotent and goroutine-safe. + Close() error } // Pinned represents CID which has been pinned with a pinning strategy.