Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
bea4f49
Add event handling for TCPIP instruments, allowing SRQ interrupts, in…
SimonMerrett Apr 28, 2026
c76b9ae
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 28, 2026
ddba517
revert _vicp guards
SimonMerrett May 6, 2026
524aebf
freeze @dataclass and add slots
SimonMerrett May 6, 2026
e0b54a8
replace integer values with flags
SimonMerrett May 6, 2026
7f18f96
use defaultdict in HandlerRegistry
SimonMerrett May 6, 2026
aa22d01
add docstring argument descriptions to HandlerCallback
SimonMerrett May 6, 2026
0a9eae5
remove annotations import from __future__ and add readability lines b…
SimonMerrett May 6, 2026
40834d7
rename _start and _stop _srq_monitor to _event_monitor to be more gen…
SimonMerrett May 6, 2026
92b915b
replace error logging with warning
SimonMerrett May 6, 2026
9b2c1c2
update tests with warning instead of error logging
SimonMerrett May 6, 2026
649f1fd
Replace getattr(self, '_session_handle', self) with a direct attribut…
SimonMerrett May 7, 2026
556c1ff
fix mypy pyvisa_py/sessions.py:387: error: Session' has no attribute …
SimonMerrett May 7, 2026
269f3a2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 7, 2026
f4d4485
Update pyvisa_py/highlevel.py
SimonMerrett May 24, 2026
b69ad8f
Update pyvisa_py/highlevel.py
SimonMerrett May 24, 2026
6f43fdc
Update pyvisa_py/highlevel.py
SimonMerrett May 24, 2026
a6238b4
Update pyvisa_py/events.py
SimonMerrett May 24, 2026
870c4d3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 24, 2026
032225b
Update pyvisa_py/events.py
SimonMerrett May 24, 2026
88ea6ef
Update pyvisa_py/events.py
SimonMerrett May 24, 2026
fcc5928
Merge branch 'pyvisa:main' into events
SimonMerrett May 24, 2026
665278d
update EventMechanism class to EventMechanismFlag and improve handlin…
SimonMerrett May 24, 2026
f4c0bc9
remove udp srq server
SimonMerrett May 24, 2026
3017cb5
strongly type HandlerCallback session param
SimonMerrett May 24, 2026
659fffa
fix ruff linting errors: import sorting, unused LOGGER import removal…
SimonMerrett May 24, 2026
878c033
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] May 24, 2026
fbcb8e4
organise imports to satisfy ruff
SimonMerrett May 24, 2026
3262906
Merge branch 'events' of github.com:SimonMerrett/pyvisa-py into events
SimonMerrett May 24, 2026
00e1731
Merge branch 'pyvisa:main' into events
SimonMerrett May 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ PyVISA-py Changelog
A second addressed issue is that timeout values never decrement to 0. A timeout value
of 0 is undefined in VXI-11 standard. It can mean "timeout immediately if no data
is in buffer" or "block permanently until transfer is finished".
- Implement the VISA event subsystem for VXI-11 (TCPIP::INSTR) resources:
`viEnableEvent`, `viDisableEvent`, `viDiscardEvents`, `viWaitOnEvent`,
`viInstallHandler`, and `viUninstallHandler`. SRQ (service request) events
are now supported via both queue-based (`wait_on_event`) and handler-based
(`install_handler`) delivery. A daemon thread runs an ONC RPC UDP interrupt
server to receive VXI-11 `DEVICE_INTR_SRQ` callbacks. This also fixes the
`create_intr_chan` XDR packer in `protocols/vxi11.py`.
Other transports (GPIB, USBTMC, HiSLIP, Serial) remain unsupported for now.

0.8.1 (04-09-2025)
------------------
Expand Down
3 changes: 3 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ No. We have implemented those attributes and methods that are most commonly
needed. We would like to reach feature parity. If there is something that you
need, let us know.

Event handling (``wait_on_event``, ``install_handler``, etc.) is currently
supported for **TCPIP INSTR** (VXI-11) resources only.


Why are you developing this?
----------------------------
Expand Down
312 changes: 312 additions & 0 deletions pyvisa_py/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
# -*- coding: utf-8 -*-
"""Event handling primitives for pyvisa-py.

This module provides the thread-safe building blocks used by the VISA event
subsystem: event contexts, queues, handler registries, and per-session state.

"""

import collections
import enum
import random
import threading
import time
import warnings
from dataclasses import dataclass, field
from typing import Any, Callable

from pyvisa import constants
from pyvisa.typing import VISASession


class EventMechanismFlag(enum.Flag):
"""Internal Flag enum mirroring VISA event-delivery mechanisms.

``ALL`` is a convenience alias for ``QUEUE | HANDLER | SUSPEND``.
The `from_int` classmethod canonicalises the VISA sentinel
``0xFFFF`` (``constants.EventMechanism.all``) to this composite
value so that bitwise ``~`` works correctly.
"""

NONE = 0
QUEUE = 1 # VI_QUEUE (1)
HANDLER = 2 # VI_HNDLR (2)
SUSPEND = 4 # VI_SUSPEND_HNDLR (4)
ALL = QUEUE | HANDLER | SUSPEND # = 7, not VI_ALL_MECH (0xFFFF)

@classmethod
def from_int(cls, value: int) -> "EventMechanismFlag":
if value == int(constants.EventMechanism.all): # 0xFFFF
return cls.ALL
return cls(value & (cls.QUEUE | cls.HANDLER | cls.SUSPEND).value)


@dataclass(frozen=True, slots=True)
class EventContext:
"""Immutable description of a single VISA event occurrence."""

event_type: constants.EventType
status_byte: int = 0
timestamp: float = field(default_factory=time.time)
context_id: int = field(default_factory=lambda: random.getrandbits(32))


class EventQueue:
"""Thread-safe FIFO queue for :class:`EventContext` objects."""

def __init__(self) -> None:
self._deque: collections.deque[EventContext] = collections.deque()
self._cond = threading.Condition()

def put(self, ctx: EventContext) -> None:
"""Add an event context to the queue (non-blocking)."""
with self._cond:
self._deque.append(ctx)
self._cond.notify_all()

def get(self, timeout_ms: int | None) -> EventContext | None:
"""Retrieve an event context.

Parameters
----------
timeout_ms :
``None`` blocks forever, ``0`` returns immediately if empty,
and a positive value blocks up to that many milliseconds.

Returns
-------
EventContext or None
The retrieved context, or ``None`` if the queue was empty.

"""
if timeout_ms is None:
with self._cond:
while not self._deque:
self._cond.wait()
return self._deque.popleft()
if timeout_ms == 0:
with self._cond:
if self._deque:
return self._deque.popleft()
return None
deadline = time.time() + timeout_ms / 1000.0
with self._cond:
while not self._deque:
remaining = deadline - time.time()
if remaining <= 0:
return None
self._cond.wait(remaining)
return self._deque.popleft()

def get_matching(
self,
event_type: constants.EventType | None,
timeout_ms: int | None,
) -> EventContext | None:
"""Retrieve the first event matching *event_type*.

If *event_type* is ``None``, matches any event.
``timeout_ms`` semantics are the same as :meth:`get`.
"""
if timeout_ms is None:
with self._cond:
while True:
for idx, ctx in enumerate(self._deque):
if event_type is None or ctx.event_type == event_type:
del self._deque[idx]
return ctx
self._cond.wait()
if timeout_ms == 0:
with self._cond:
for idx, ctx in enumerate(self._deque):
if event_type is None or ctx.event_type == event_type:
del self._deque[idx]
return ctx
return None
deadline = time.time() + timeout_ms / 1000.0
with self._cond:
while True:
for idx, ctx in enumerate(self._deque):
if event_type is None or ctx.event_type == event_type:
del self._deque[idx]
return ctx
remaining = deadline - time.time()
if remaining <= 0:
return None
self._cond.wait(remaining)

def discard_all(self, event_type: constants.EventType | None = None) -> None:
"""Remove items from the queue.

If *event_type* is ``None``, the entire queue is cleared.
Otherwise only contexts whose ``event_type`` matches are removed.

"""
with self._cond:
if event_type is None:
self._deque.clear()
else:
kept = [ctx for ctx in self._deque if ctx.event_type != event_type]
self._deque.clear()
self._deque.extend(kept)


HandlerCallback = Callable[[VISASession, constants.EventType, int, Any], None]
"""Callable invoked when a VISA event fires.

Parameters
----------
session : VISASession
Session handle (vi).
event_type : constants.EventType
The event type that fired.
context_id : int
Event context id.
user_handle : Any
User-supplied handle passed at install_handler time.
"""


class HandlerRegistry:
"""Thread-safe registry of user-installed event handlers."""

def __init__(self) -> None:
self._lock = threading.RLock()
# event_type -> list of (handler, user_handle)
self._handlers: collections.defaultdict[
constants.EventType, list[tuple[HandlerCallback, Any]]
] = collections.defaultdict(list)

def install(
self,
event_type: constants.EventType,
handler: HandlerCallback,
user_handle: Any,
) -> None:
"""Register a handler for the given event type."""
with self._lock:
self._handlers[event_type].append((handler, user_handle))

def uninstall(
self,
event_type: constants.EventType,
handler: HandlerCallback,
user_handle: Any = None,
) -> bool:
"""Remove a previously installed handler.

If *user_handle* is ``None``, the first entry matching *handler*
identity is removed regardless of its user handle.

Returns ``True`` if a handler was removed, ``False`` otherwise.

"""
with self._lock:
entries = self._handlers.get(event_type, [])
for idx, (h, uh) in enumerate(entries):
if h is handler and (user_handle is None or uh == user_handle):
entries.pop(idx)
return True
return False

def fire(
self,
event_type: constants.EventType,
session: VISASession,
context_id: int,
) -> None:
"""Invoke all handlers registered for *event_type*.

Each handler is called as ``handler(session, event_type, context_id,
user_handle)`` where *user_handle* is the value supplied at
installation. Exceptions raised by a handler are warned via
``warnings.warn`` and do not prevent subsequent handlers from running.

"""
with self._lock:
handlers = list(self._handlers.get(event_type, []))

for handler, user_handle in handlers:
try:
handler(session, event_type, context_id, user_handle)
except Exception as exc:
warnings.warn(
f"Event handler {handler!r} raised an exception: {exc!r}",
stacklevel=2,
)


class EventState:
"""Per-session container for event enablement, queuing, and handlers."""

def __init__(self) -> None:
# {event_type: EventMechanismFlag}
self._lock = threading.RLock()
self.enabled: dict[constants.EventType, EventMechanismFlag] = {}
self.queue = EventQueue()
self.registry = HandlerRegistry()
self.monitor_thread: threading.Thread | None = None
self.stop_flag: threading.Event = threading.Event()

def enable(
self,
event_type: constants.EventType,
mechanism: constants.EventMechanism,
) -> None:
"""Enable delivery of *event_type* via *mechanism_flag*."""
m = EventMechanismFlag.from_int(int(mechanism))
with self._lock:
self.enabled[event_type] = (
self.enabled.get(event_type, EventMechanismFlag.NONE) | m
)

def disable(
self,
event_type: constants.EventType,
mechanism: constants.EventMechanism,
) -> None:
"""Disable delivery of *event_type* via *mechanism_flag*."""
m = EventMechanismFlag.from_int(int(mechanism))
with self._lock:
if event_type not in self.enabled:
return
new = self.enabled[event_type] & ~m
if new is EventMechanismFlag.NONE:
del self.enabled[event_type]
else:
self.enabled[event_type] = new

def is_queue_enabled(self, event_type: constants.EventType) -> bool:
"""Return whether queue delivery is enabled for *event_type*."""
with self._lock:
return bool(
self.enabled.get(event_type, EventMechanismFlag.NONE)
& EventMechanismFlag.QUEUE
)

def is_handler_enabled(self, event_type: constants.EventType) -> bool:
"""Return whether handler (callback) delivery is enabled for *event_type*."""
with self._lock:
return bool(
self.enabled.get(event_type, EventMechanismFlag.NONE)
& EventMechanismFlag.HANDLER
)

def get_delivery_mechanisms(
self, event_type: constants.EventType
) -> tuple[bool, bool]:
"""Return (queue_enabled, handler_enabled) for *event_type*.

The check is performed atomically under the state lock.
"""
with self._lock:
mech = self.enabled.get(event_type, EventMechanismFlag.NONE)
return (
bool(mech & EventMechanismFlag.QUEUE),
bool(mech & EventMechanismFlag.HANDLER),
)
Comment thread
SimonMerrett marked this conversation as resolved.

def any_enabled(self) -> bool:
"""Return ``True`` if any event type has any mechanism enabled."""
with self._lock:
return any(m is not EventMechanismFlag.NONE for m in self.enabled.values())
Loading