Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ The following emojis are used to highlight certain changes:

### Added

- 🛠 `pinning/pinner`: added `Pinner.Close() error`. Close cancels every in-flight operation's context, including streaming goroutines from `RecursiveKeys`, `DirectKeys`, and `InternalPins`, and waits for them to return. A scalar method that observes the cancellation may return `context.Canceled`; a stream interrupted by Close may surface `ErrClosed` on the channel before it closes. After Close returns, every other method returns the new `ErrClosed` sentinel; streaming methods deliver it as `StreamedPin.Err` on a single entry, then close the channel. Close is idempotent and goroutine-safe. **Action required:** downstream `Pinner` implementations must add `Close`. [#1150](https://github.com/ipfs/boxo/pull/1150)
- `pinning/pinner/dspinner`: implements `Close`. Close cancels the contexts of in-flight operations, so snapshot iteration in `RecursiveKeys`/`DirectKeys` and DAG fetches in `Pin` bail out promptly instead of draining to completion. Close returns as soon as those operations honor their ctx. Hosts owning the datastore should call `Close` on the pinner before closing the datastore to avoid the use-after-close panic path in stores such as pebble. [#1150](https://github.com/ipfs/boxo/pull/1150)

### Changed

### Removed
Expand Down
161 changes: 148 additions & 13 deletions pinning/pinner/dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ type pinner struct {

rootsProvider provider.MultihashProvider
pinnedProvider provider.MultihashProvider

// Lifecycle state. closedMu serializes the isClosed check with
// wg.Add so Close waits for every admitted operation. Close
// cancels stopCtx; context.AfterFunc fans that cancellation into
// every admitted op's derived ctx, so in-flight work returns
// promptly through paths that already honor ctx (datastore
// queries, DAG fetches, index iteration).
closedMu sync.Mutex
isClosed bool
stopCtx context.Context
stopCancel context.CancelFunc
wg sync.WaitGroup
}

var _ ipfspinner.Pinner = (*pinner)(nil)
Expand Down Expand Up @@ -167,6 +179,7 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts .
dserv: dserv,
dstore: dstore,
}
p.stopCtx, p.stopCancel = context.WithCancel(context.Background())

for _, o := range opts {
o.f(p)
Expand All @@ -191,9 +204,61 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService, opts .
return p, nil
}

// begin admits a caller and returns a context cancelled by either
// the parent ctx or [pinner.Close]. Successful callers MUST defer
// the returned cleanup. Returns [ipfspinner.ErrClosed] if the pinner
// is already closed.
func (p *pinner) begin(parent context.Context) (context.Context, func(), error) {
p.closedMu.Lock()
defer p.closedMu.Unlock()
if p.isClosed {
return parent, func() {}, ipfspinner.ErrClosed
}
p.wg.Add(1)
ctx, cancel := context.WithCancel(parent)
stop := context.AfterFunc(p.stopCtx, cancel)
cleanup := func() {
stop()
cancel()
p.wg.Done()
}
return ctx, cleanup, nil
}

// errClosedChan returns a closed channel carrying a single
// [ipfspinner.StreamedPin] with err. The channel is buffered so the
// synchronous send below does not block when no reader is present.
func errClosedChan(err error) <-chan ipfspinner.StreamedPin {
out := make(chan ipfspinner.StreamedPin, 1)
out <- ipfspinner.StreamedPin{Err: err}
close(out)
return out
}

// Close implements [ipfspinner.Pinner.Close]. It does not close the
// backing datastore; the caller owns that lifecycle.
func (p *pinner) Close() error {
p.closedMu.Lock()
if p.isClosed {
p.closedMu.Unlock()
return nil
}
p.isClosed = true
// Fires every AfterFunc registered by begin, cancelling each
// admitted op's derived ctx, and closes stopCtx.Done().
p.stopCancel()
p.closedMu.Unlock()

p.wg.Wait()
return nil
}

// SetAutosync allows auto-syncing to be enabled or disabled during runtime.
// This may be used to turn off autosync before doing many repeated pinning
// operations, and then turn it on after. Returns the previous value.
//
// SetAutosync is not part of the [ipfspinner.Pinner] interface. It
// mutates an in-memory flag only and is safe to call after Close.
func (p *pinner) SetAutosync(auto bool) bool {
p.lock.Lock()
defer p.lock.Unlock()
Expand All @@ -204,7 +269,13 @@ func (p *pinner) SetAutosync(auto bool) bool {

// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool, name string) error {
err := p.dserv.Add(ctx, node)
ctx, cleanup, err := p.begin(ctx)
if err != nil {
return err
}
defer cleanup()

err = p.dserv.Add(ctx, node)
if err != nil {
return err
}
Expand Down Expand Up @@ -440,6 +511,12 @@ func (p *pinner) removePin(ctx context.Context, pp *pin) error {

// Unpin a given key
func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error {
ctx, cleanup, err := p.begin(ctx)
if err != nil {
return err
}
defer cleanup()

cidKey := c.KeyString()

p.lock.Lock()
Expand Down Expand Up @@ -485,6 +562,12 @@ func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error {
// IsPinned returns whether or not the given key is pinned
// and an explanation of why its pinned
func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error) {
ctx, cleanup, err := p.begin(ctx)
if err != nil {
return "", false, err
}
defer cleanup()

p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(ctx, c, ipfspinner.Any)
Expand All @@ -493,6 +576,12 @@ func (p *pinner) IsPinned(ctx context.Context, c cid.Cid) (string, bool, error)
// IsPinnedWithType returns whether or not the given cid is pinned with the
// given pin type, as well as returning the type of pin its pinned with.
func (p *pinner) IsPinnedWithType(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) (string, bool, error) {
ctx, cleanup, err := p.begin(ctx)
if err != nil {
return "", false, err
}
defer cleanup()

p.lock.RLock()
defer p.lock.RUnlock()
return p.isPinnedWithType(ctx, c, mode)
Expand Down Expand Up @@ -606,6 +695,12 @@ func (p *pinner) loadPinName(ctx context.Context, pin *ipfspinner.Pinned, pinID
// CheckIfPinnedWithType implements the Pinner interface, checking specific pin types.
// This method is optimized to only check the requested pin type(s).
func (p *pinner) CheckIfPinnedWithType(ctx context.Context, mode ipfspinner.Mode, includeNames bool, cids ...cid.Cid) ([]ipfspinner.Pinned, error) {
ctx, cleanup, err := p.begin(ctx)
if err != nil {
return nil, err
}
defer cleanup()

p.lock.RLock()
defer p.lock.RUnlock()

Expand Down Expand Up @@ -943,12 +1038,22 @@ func (p *pinner) snapshotIndex(ctx context.Context, index dsindex.Indexer) ([]in
return entries, err
}

func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin {
func (p *pinner) streamIndex(parent context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin {
ctx, cleanup, err := p.begin(parent)
if err != nil {
return errClosedChan(err)
}

out := make(chan ipfspinner.StreamedPin)

go func() {
defer cleanup()
defer close(out)

// send delivers sp and reports whether to keep going. The
// derived ctx is cancelled by the caller or by Close, so one
// ctx.Done() case covers both "consumer wandered off" and
// "shutting down" and the goroutine never parks.
send := func(sp ipfspinner.StreamedPin) (ok bool) {
select {
case <-ctx.Done():
Expand All @@ -958,15 +1063,11 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile
}
}

// If the backing datastore panics during enumeration,
// recover and surface the panic as an error on the output
// channel instead of crashing the process. This is
// datastore-implementation agnostic: any datastore may
// panic on use after Close (pebble being the prominent
// case), and the pinner does not own the datastore's
// lifecycle. The wording below gives the caller enough
// context to treat it as an expected shutdown-time
// interruption rather than a real failure.
// Defense in depth: surface a datastore panic as a stream
// error rather than crashing the process. Hosts that wire
// Close correctly will never trip this, but datastores like
// pebble panic on use-after-close and the pinner does not own
// the datastore lifecycle.
defer func() {
if r := recover(); r != nil {
send(ipfspinner.StreamedPin{Err: fmt.Errorf("pin stream interrupted by datastore panic (likely shutdown): %v", r)})
Expand All @@ -975,6 +1076,12 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile

entries, err := p.snapshotIndex(ctx, index)
if err != nil {
// Map a Close-driven cancel into ErrClosed so the
// consumer sees a meaningful error; a caller-driven
// cancel keeps its original ctx.Err().
if p.stopCtx.Err() != nil && parent.Err() == nil {
err = ipfspinner.ErrClosed
}
send(ipfspinner.StreamedPin{Err: err})
return
}
Expand Down Expand Up @@ -1020,8 +1127,18 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile
}

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
// pinner. dspinner has no internal pins, so the returned channel is
// always empty (or carries a single [ipfspinner.ErrClosed] entry after
// Close).
func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspinner.StreamedPin {
_, cleanup, err := p.begin(ctx)
if err != nil {
return errClosedChan(err)
}
// No background work: balance begin()'s wg.Add before returning the
// pre-closed channel.
defer cleanup()

c := make(chan ipfspinner.StreamedPin)
close(c)
return c
Expand All @@ -1032,6 +1149,12 @@ func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspin
//
// TODO: This will not work when multiple pins are supported
func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error {
ctx, cleanup, err := p.begin(ctx)
if err != nil {
return err
}
defer cleanup()

p.lock.Lock()
defer p.lock.Unlock()

Expand Down Expand Up @@ -1112,10 +1235,16 @@ func (p *pinner) flushPins(ctx context.Context, force bool) error {

// Flush encodes and writes pinner keysets to the datastore
func (p *pinner) Flush(ctx context.Context) error {
ctx, cleanup, err := p.begin(ctx)
if err != nil {
return err
}
defer cleanup()

p.lock.Lock()
defer p.lock.Unlock()

err := p.flushDagService(ctx, true)
err = p.flushDagService(ctx, true)
if err != nil {
return err
}
Expand All @@ -1126,6 +1255,12 @@ func (p *pinner) Flush(ctx context.Context) error {
// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, name string) error {
ctx, cleanup, err := p.begin(ctx)
if err != nil {
return err
}
defer cleanup()

// TODO: remove his to support multiple pins per CID
switch mode {
case ipfspinner.Recursive:
Expand Down
Loading
Loading