diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index 5f9187fd..dfcae424 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -1339,14 +1339,23 @@ A waitable can belong to at most one "waitable set" (defined next) which is referred to by the `wset` field. A `Waitable`'s `pending_event` is delivered (via `get_pending_event`) when core wasm code waits on its waitable set (via `waitable-set.wait` or, when using `callback`, by returning to the event loop). + +Lastly, a waitable cannot be waited on *both* asynchronously (via +waitable set) and synchronously (via synchronous `subtask.cancel` or +`{stream,future}.{,cancel-}{read,write}`) since this raises the possibility that +the waitable set "steals" events from the synchronous waiter, leaving the +synchronous waiter forever waiting. This condition is asserted by the `Waitable` +methods here and guarded via traps by the relevant built-ins below. ```python class Waitable: pending_event: Optional[Callable[[], EventTuple]] wset: Optional[WaitableSet] + has_sync_waiter: bool def __init__(self): self.pending_event = None self.wset = None + self.has_sync_waiter = False def set_pending_event(self, pending_event): self.pending_event = pending_event @@ -1354,12 +1363,22 @@ class Waitable: def has_pending_event(self): return bool(self.pending_event) + def in_waitable_set(self): + return self.wset is not None + + def wait_for_pending_event(self): + assert(not self.in_waitable_set() and not self.has_sync_waiter) + self.has_sync_waiter = True + current_thread().wait_until(self.has_pending_event, cancellable = False) + self.has_sync_waiter = False + def get_pending_event(self) -> EventTuple: pending_event = self.pending_event self.pending_event = None return pending_event() def join(self, wset): + assert(not self.has_sync_waiter) if self.wset: self.wset.elems.remove(self) self.wset = wset @@ -1368,6 +1387,7 @@ class Waitable: def drop(self): assert(not self.has_pending_event()) + assert(not self.has_sync_waiter) self.join(None) ``` @@ -1875,10 +1895,9 @@ state which is referenced by the `shared` field and either points to a ```python class CopyState(Enum): IDLE = 1 - SYNC_COPYING = 2 - ASYNC_COPYING = 3 - CANCELLING_COPY = 4 - DONE = 5 + COPYING = 2 + CANCELLING_COPY = 3 + DONE = 4 class CopyEnd(Waitable): state: CopyState @@ -1893,7 +1912,7 @@ class CopyEnd(Waitable): match self.state: case CopyState.IDLE | CopyState.DONE: return False - case CopyState.SYNC_COPYING | CopyState.ASYNC_COPYING | CopyState.CANCELLING_COPY: + case CopyState.COPYING | CopyState.CANCELLING_COPY: return True assert(False) @@ -1913,9 +1932,7 @@ class WritableStreamEnd(CopyEnd): As shown in `drop`, attempting to drop a readable or writable end while a copy is in progress or in the process of being cancelled traps. This means that client code must take care to wait for these operations to finish (potentially -cancelling them via `stream.cancel-{read,write}`) before dropping. The -`SYNC_COPYING` vs. `ASYNC_COPYING` distinction is tracked in the state to -determine whether the copy operation can be cancelled. +cancelling them via `stream.cancel-{read,write}`) before dropping. The polymorphic `copy` method dispatches to either `ReadableStream.read` or `WritableStream.write` and allows the implementations of `stream.{read,write}` @@ -4281,6 +4298,7 @@ def canon_waitable_join(wi, si): trap_if(not inst.may_leave) w = inst.handles.get(wi) trap_if(not isinstance(w, Waitable)) + trap_if(w.has_sync_waiter) if si == 0: w.join(None) else: @@ -4289,6 +4307,10 @@ def canon_waitable_join(wi, si): w.join(wset) return [] ``` +As described with the definition of `Waitable` above, to prevent surprising +deadlocks, a waitable that is currently being synchronously waited on traps if +added to a waitable set. + Note that tables do not allow elements at index `0`, so `0` is a valid sentinel that tells `join` to remove the given waitable from any set that it is currently a part of. Waitables can be a member of at most one set, so if the @@ -4334,6 +4356,7 @@ def canon_subtask_cancel(async_, i): trap_if(not isinstance(subtask, Subtask)) trap_if(subtask.resolve_delivered()) trap_if(subtask.cancellation_requested) + trap_if(subtask.in_waitable_set() and not async_) if subtask.resolved(): assert(subtask.has_pending_event()) else: @@ -4341,7 +4364,7 @@ def canon_subtask_cancel(async_, i): subtask.on_cancel() if not subtask.resolved(): if not async_: - thread.wait_until(subtask.resolved) + subtask.wait_for_pending_event() else: return [BLOCKED] code,index,payload = subtask.get_pending_event() @@ -4354,7 +4377,8 @@ unconditionally traps if it transitively attempts to make a synchronous call to `subtask.cancel` (regardless of whether the cancellation would have succeeded without blocking). The other traps disallow calling `subtask.cancel` twice for the same subtask or after the supertask has already been notified that the -subtask has returned. +subtask has returned or if the subtask is already being asynchronously waited +on via waitable set. A race condition handled by the above code is that it's possible for a subtask to have already resolved (by calling `task.return` or `task.cancel`) and @@ -4470,11 +4494,14 @@ def stream_copy(EndT, BufferT, event_code, stream_t, opts, i, ptr, n): Next, `stream_copy` checks that the element at index `i` is of the right type and allowed to start a new copy. (In the future, the "trap if not `IDLE`" condition could be relaxed to allow multiple pipelined reads or writes.) +There is also a trap if attempting to synchronously read or write from a +stream that is already being asynchronously waited on via waitable set. ```python e = thread.task.inst.handles.get(i) trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != stream_t.t) trap_if(e.state != CopyState.IDLE) + trap_if(e.in_waitable_set() and not opts.async_) ``` Then a readable or writable buffer is created which (in `Buffer`'s constructor) @@ -4511,6 +4538,7 @@ independently of the `addrtype`. ```python def stream_event(result, reclaim_buffer): reclaim_buffer() + assert(e.copying()) if result == CopyResult.DROPPED: e.state = CopyState.DONE else: @@ -4526,6 +4554,7 @@ independently of the `addrtype`. def on_copy_done(result): e.set_pending_event(partial(stream_event, result, reclaim_buffer = lambda:())) + e.state = CopyState.COPYING e.copy(thread.task.inst, buffer, on_copy, on_copy_done) ``` @@ -4537,10 +4566,8 @@ synchronously and return `BLOCKED` if not: ```python if not e.has_pending_event(): if not opts.async_: - e.state = CopyState.SYNC_COPYING - thread.wait_until(e.has_pending_event) + e.wait_for_pending_event() else: - e.state = CopyState.ASYNC_COPYING return [BLOCKED] code,index,payload = e.get_pending_event() assert(code == event_code and index == i and payload != BLOCKED) @@ -4593,6 +4620,7 @@ def future_copy(EndT, BufferT, event_code, future_t, opts, i, ptr): trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != future_t.t) trap_if(e.state != CopyState.IDLE) + trap_if(e.in_waitable_set() and not opts.async_) assert(not contains_borrow(future_t)) cx = LiftLowerContext(opts, thread.task.inst, borrow_scope = None) @@ -4612,6 +4640,7 @@ of elements copied is not packed in the high 28 bits; they're always zero. ```python def future_event(result): assert((buffer.remain() == 0) == (result == CopyResult.COMPLETED)) + assert(e.copying()) if result == CopyResult.DROPPED or result == CopyResult.COMPLETED: e.state = CopyState.DONE else: @@ -4622,6 +4651,7 @@ of elements copied is not packed in the high 28 bits; they're always zero. assert(result != CopyResult.DROPPED or event_code == EventCode.FUTURE_WRITE) e.set_pending_event(partial(future_event, result)) + e.state = CopyState.COPYING e.copy(thread.task.inst, buffer, on_copy_done) ``` @@ -4630,10 +4660,8 @@ synchronously and returning either the progress made or `BLOCKED`. ```python if not e.has_pending_event(): if not opts.async_: - e.state = CopyState.SYNC_COPYING - thread.wait_until(e.has_pending_event) + e.wait_for_pending_event() else: - e.state = CopyState.ASYNC_COPYING return [BLOCKED] code,index,payload = e.get_pending_event() assert(code == event_code and index == i) @@ -4677,13 +4705,14 @@ def cancel_copy(EndT, event_code, stream_or_future_t, async_, i): e = thread.task.inst.handles.get(i) trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != stream_or_future_t.t) - trap_if(e.state != CopyState.ASYNC_COPYING) + trap_if(e.state != CopyState.COPYING or e.has_sync_waiter) + trap_if(e.in_waitable_set() and not async_) e.state = CopyState.CANCELLING_COPY if not e.has_pending_event(): e.shared.cancel() if not e.has_pending_event(): if not async_: - thread.wait_until(e.has_pending_event) + e.wait_for_pending_event() else: return [BLOCKED] code,index,payload = e.get_pending_event() @@ -4696,7 +4725,9 @@ unconditionally traps if it transitively attempts to make a synchronous call to have completed without blocking). There is also a trap if there is not currently an async copy in progress (sync copies do not expect or check for cancellation and thus cannot be cancelled, and repeatedly cancelling the same -async copy after the first call blocked is not allowed). +async copy after the first call blocked is not allowed). Lastly, there is a +trap if attempting to synchronously cancel a stream operation when the stream +end is already being asynchronously waited on by a waitable set. The *first* check for `e.has_pending_event()` catches the case where the copy has already racily finished, in which case we must *not* call `cancel()`. Calling diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index 49c063ee..dad8f946 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -710,10 +710,12 @@ class EventCode(IntEnum): class Waitable: pending_event: Optional[Callable[[], EventTuple]] wset: Optional[WaitableSet] + has_sync_waiter: bool def __init__(self): self.pending_event = None self.wset = None + self.has_sync_waiter = False def set_pending_event(self, pending_event): self.pending_event = pending_event @@ -721,12 +723,22 @@ def set_pending_event(self, pending_event): def has_pending_event(self): return bool(self.pending_event) + def in_waitable_set(self): + return self.wset is not None + + def wait_for_pending_event(self): + assert(not self.in_waitable_set() and not self.has_sync_waiter) + self.has_sync_waiter = True + current_thread().wait_until(self.has_pending_event, cancellable = False) + self.has_sync_waiter = False + def get_pending_event(self) -> EventTuple: pending_event = self.pending_event self.pending_event = None return pending_event() def join(self, wset): + assert(not self.has_sync_waiter) if self.wset: self.wset.elems.remove(self) self.wset = wset @@ -735,6 +747,7 @@ def join(self, wset): def drop(self): assert(not self.has_pending_event()) + assert(not self.has_sync_waiter) self.join(None) class WaitableSet: @@ -995,10 +1008,9 @@ def none_or_number_type(t): class CopyState(Enum): IDLE = 1 - SYNC_COPYING = 2 - ASYNC_COPYING = 3 - CANCELLING_COPY = 4 - DONE = 5 + COPYING = 2 + CANCELLING_COPY = 3 + DONE = 4 class CopyEnd(Waitable): state: CopyState @@ -1013,7 +1025,7 @@ def copying(self): match self.state: case CopyState.IDLE | CopyState.DONE: return False - case CopyState.SYNC_COPYING | CopyState.ASYNC_COPYING | CopyState.CANCELLING_COPY: + case CopyState.COPYING | CopyState.CANCELLING_COPY: return True assert(False) @@ -2385,6 +2397,7 @@ def canon_waitable_join(wi, si): trap_if(not inst.may_leave) w = inst.handles.get(wi) trap_if(not isinstance(w, Waitable)) + trap_if(w.has_sync_waiter) if si == 0: w.join(None) else: @@ -2405,6 +2418,7 @@ def canon_subtask_cancel(async_, i): trap_if(not isinstance(subtask, Subtask)) trap_if(subtask.resolve_delivered()) trap_if(subtask.cancellation_requested) + trap_if(subtask.in_waitable_set() and not async_) if subtask.resolved(): assert(subtask.has_pending_event()) else: @@ -2412,7 +2426,7 @@ def canon_subtask_cancel(async_, i): subtask.on_cancel() if not subtask.resolved(): if not async_: - thread.wait_until(subtask.resolved) + subtask.wait_for_pending_event() else: return [BLOCKED] code,index,payload = subtask.get_pending_event() @@ -2467,6 +2481,7 @@ def stream_copy(EndT, BufferT, event_code, stream_t, opts, i, ptr, n): trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != stream_t.t) trap_if(e.state != CopyState.IDLE) + trap_if(e.in_waitable_set() and not opts.async_) assert(not isinstance(stream_t, CharType)) assert(not contains_borrow(stream_t)) @@ -2475,6 +2490,7 @@ def stream_copy(EndT, BufferT, event_code, stream_t, opts, i, ptr, n): def stream_event(result, reclaim_buffer): reclaim_buffer() + assert(e.copying()) if result == CopyResult.DROPPED: e.state = CopyState.DONE else: @@ -2490,14 +2506,13 @@ def on_copy(reclaim_buffer): def on_copy_done(result): e.set_pending_event(partial(stream_event, result, reclaim_buffer = lambda:())) + e.state = CopyState.COPYING e.copy(thread.task.inst, buffer, on_copy, on_copy_done) if not e.has_pending_event(): if not opts.async_: - e.state = CopyState.SYNC_COPYING - thread.wait_until(e.has_pending_event) + e.wait_for_pending_event() else: - e.state = CopyState.ASYNC_COPYING return [BLOCKED] code,index,payload = e.get_pending_event() assert(code == event_code and index == i and payload != BLOCKED) @@ -2522,6 +2537,7 @@ def future_copy(EndT, BufferT, event_code, future_t, opts, i, ptr): trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != future_t.t) trap_if(e.state != CopyState.IDLE) + trap_if(e.in_waitable_set() and not opts.async_) assert(not contains_borrow(future_t)) cx = LiftLowerContext(opts, thread.task.inst, borrow_scope = None) @@ -2529,6 +2545,7 @@ def future_copy(EndT, BufferT, event_code, future_t, opts, i, ptr): def future_event(result): assert((buffer.remain() == 0) == (result == CopyResult.COMPLETED)) + assert(e.copying()) if result == CopyResult.DROPPED or result == CopyResult.COMPLETED: e.state = CopyState.DONE else: @@ -2539,14 +2556,13 @@ def on_copy_done(result): assert(result != CopyResult.DROPPED or event_code == EventCode.FUTURE_WRITE) e.set_pending_event(partial(future_event, result)) + e.state = CopyState.COPYING e.copy(thread.task.inst, buffer, on_copy_done) if not e.has_pending_event(): if not opts.async_: - e.state = CopyState.SYNC_COPYING - thread.wait_until(e.has_pending_event) + e.wait_for_pending_event() else: - e.state = CopyState.ASYNC_COPYING return [BLOCKED] code,index,payload = e.get_pending_event() assert(code == event_code and index == i) @@ -2573,13 +2589,14 @@ def cancel_copy(EndT, event_code, stream_or_future_t, async_, i): e = thread.task.inst.handles.get(i) trap_if(not isinstance(e, EndT)) trap_if(e.shared.t != stream_or_future_t.t) - trap_if(e.state != CopyState.ASYNC_COPYING) + trap_if(e.state != CopyState.COPYING or e.has_sync_waiter) + trap_if(e.in_waitable_set() and not async_) e.state = CopyState.CANCELLING_COPY if not e.has_pending_event(): e.shared.cancel() if not e.has_pending_event(): if not async_: - thread.wait_until(e.has_pending_event) + e.wait_for_pending_event() else: return [BLOCKED] code,index,payload = e.get_pending_event() diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index 5a7a9c4b..539238a0 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -1590,6 +1590,7 @@ def core_func(args): [event] = canon_waitable_set_wait(True, MemInst(mem, 'i32'), seti, retp) assert(event == EventCode.STREAM_READ) assert(mem[retp+0] == rsi4) + [] = canon_waitable_join(rsi4, 0) result,n = unpack_result(mem[retp+4]) assert(n == 4 and result == CopyResult.COMPLETED) [ret] = canon_stream_read(StreamType(U8Type()), sync_opts, rsi4, 0, 4)