diff --git a/qick_lib/qick/VERSION b/qick_lib/qick/VERSION index 028982f7..a09b995f 100644 --- a/qick_lib/qick/VERSION +++ b/qick_lib/qick/VERSION @@ -1 +1 @@ -0.2.401 +0.2.403 diff --git a/qick_lib/qick/qick_asm.py b/qick_lib/qick/qick_asm.py index 55333945..1e9c64b6 100644 --- a/qick_lib/qick/qick_asm.py +++ b/qick_lib/qick/qick_asm.py @@ -12,6 +12,7 @@ from numbers import Number from abc import ABC, abstractmethod from tqdm.auto import tqdm +from typing import Generator, Dict, Any, List from qick import obtain, get_version from .helpers import to_int, cosine, gauss, triang, DRAG, decode_array, nqz, nyquist_image @@ -2352,3 +2353,212 @@ def print_sg_mem(self, sg_idx=0, gen_file=False): else: print(s) + + + def stream_acquire( + self, + soc, + rounds: int = 1, + load_envelopes: bool = True, + start_src: str = "internal", + progress: bool = True, + remove_offset: bool = True, + len_normalize: bool = True, + include_full: bool = False, + return_end_of_exp_raw: bool = False, + ) -> Generator[Dict[str, Any], None, None]: + """Stream raw accumulated readout data. + + Provides a generator interface for streaming raw accumulated readout data incrementally. + Yields dict events describing acquisition progress until completion. The final return + (StopIteration value) is None; the final aggregated raw round buffers are provided in + the last yield with event='complete'. + + Parameters + ---------- + soc : QickSoc + Qick object that executes the program + rounds : int, optional + Number of times to rerun the program, averaging results in software (aka "soft averages"). + Default is 1. + load_envelopes : bool, optional + If True, load pulse envelopes before executing. Default is True. + start_src : str, optional + Start source for the tProc. Can be "internal" (tProc starts immediately) or "external" + (each round waits for an external trigger). Default is "internal". + progress : bool, optional + If True, display progress bars for rounds and shots per round. Default is True. + remove_offset : bool, optional + If True, subtract the readout's IQ offset from the partial channel data. Default is True. + len_normalize : bool, optional + If True, normalize partial channel data by dividing by the readout window length. + Default is True. + include_full : bool, optional + If True, include the current state of full buffers in each 'data' event. Default is False. + return_end_of_exp_raw : bool, optional + If True, yield a final event with all rounds' raw buffer data. Default is False. + + Yields + ------ + dict + Event dictionaries describing acquisition progress. Possible event forms: + + - 'data' event: {'event': 'data', 'round': r, 'rep_slice': (start, stop), + 'partial': {ch: ndarray(new_points, nreads, 2)}, + 'buffers': {ch: ndarray(*loop_dims, nreads, 2)} (optional)} + + - 'round-complete' event: {'event': 'round-complete', 'round': r, + 'round_raw': [buffer_per_ch]} + + - 'complete' event: {'event': 'complete', 'rounds_raw': list_of_round_buffers} + + Returns + ------- + None + StopIteration value is None when all rounds complete. + """ + # Validate required acquisition parameters were set via setup_acquire. + if any([x is None for x in [self.counter_addr, self.loop_dims, self.avg_level]]): + raise RuntimeError( + "data dimensions need to be defined with setup_acquire() before calling stream_acquire()" + ) + + # Configure acquisition params similar to AcquireMixin.acquire but mark a distinct type. + self.acquire_params = { + 'type': 'accumulated_stream', + 'soc': soc, + 'start_src': start_src, + 'rounds_remaining': rounds, + 'remove_offset': remove_offset, + 'hidereps': True, # we manage optional progress display separately + } + + total_count = int(np.prod(self.loop_dims)) # total flattened shots + reads_per_shot = [ro['trigs'] for ro in self.ro_chs.values()] + + # Preallocate NaN-filled full buffers per channel (same shape as acc_buf in AcquireMixin) + #TODO: consider keeping buffers for all rounds instead of just one? + self.acc_buf = [ + np.full((*self.loop_dims, nreads, 2), np.nan, dtype=float) + for nreads in reads_per_shot + ] + + # For completeness across rounds + all_rounds_raw: List[List[np.ndarray]] = [] + + # Load program & configure (do not load data memory yet; mirror acquire()) + self.config_all(soc, load_envelopes=load_envelopes, load_mem=False) + + # Progress bar for rounds only (optional) + from tqdm.auto import tqdm # local import; already used elsewhere + self.round_track = tqdm(total=rounds, disable=not progress, desc="Rounds") + + for round_index in range(rounds): + self.rounds_pbar = tqdm(total=total_count, disable=not progress, desc="Shots per Round", leave=False if rounds > 1 else True) + # Prepare firmware for this round (subset of prepare_round logic for accumulated type) + # Raw buffers already allocated; zero tProc counter & configure capture. + self.config_bufs(soc, enable_avg=True, enable_buf=False) + soc.reload_mem() + soc.clear_tproc_counter(addr=self.counter_addr) + soc.start_src(start_src) + + # Tracking variables + count = 0 # number of shots (flattened) fully received + + # Start combined readout capture (similar to AcquireMixin.finish_round accumulated section) + soc.start_readout( + total_count, + counter_addr=self.counter_addr, + ch_list=list(self.ro_chs), + reads_per_shot=reads_per_shot, + ) + + # Poll until all shots acquired + while count < total_count: + new_data = obtain(soc.poll_data()) # expect iterable of (new_points, (d, stats)) + new_data = list(new_data) # materialize if generator + # logger.info(f"new_data raw polled : {new_data} \n ================ \n ==============\n") + if not new_data: + continue # nothing arrived yet + for new_points, (d_list, stats) in new_data: + if new_points <= 0: + continue + # Package incremental partial arrays per channel + partial: Dict[int, np.ndarray] = {} + for ii, nreads in enumerate(reads_per_shot): + flat = d_list[ii] + # logger.info(f"Channel {ii} polled flat data: {flat}") + # Expect flat shape (new_points * nreads, 2) + if flat.shape[0] != new_points * nreads: + logger.error( + "data size mismatch: new_points=%d, nreads=%d, data shape %s", + new_points, + nreads, + flat.shape, + ) + if count + new_points > total_count: + logger.error( + "got too much data: count=%d, new_points=%d, total_count=%d", + count, + new_points, + total_count, + ) + start_flat = count * nreads + stop_flat = (count + new_points) * nreads + # Assign into preallocated buffer (reshape view for contiguous assignment) + self.acc_buf[ii].reshape((-1, 2))[start_flat:stop_flat] = flat + # Reshape partial to (new_points, nreads, 2) + partial_channel = flat.reshape(new_points, nreads, 2).astype(float) + # logger.info(f"Channel {ii} partial reshaped data to shape {partial_channel.shape}: {partial_channel}") + if len_normalize: + # length normalization: divide by readout length + ch_num = list(self.ro_chs.keys())[ii] + length = self.ro_chs[ch_num]['length'] + # logger.info(f"Length normalization: readout length for ch {ch_num} is {length}") + partial_channel /= length + if remove_offset and not np.isnan(partial_channel).all(): + # subtract offset scaled to raw accumulated counts + rocfg = self.ro_chs[ch_num].get('ro_config') + # _ro_offset returns the DC offset + offset = self._ro_offset(ch_num, rocfg) + partial_channel -= offset + partial[list(self.ro_chs.keys())[ii]] = partial_channel + rep_slice = (count, count + new_points) + count += new_points + + yield_event = { + 'event': 'data', + 'round': round_index, + 'rep_slice': rep_slice, + 'partial': partial, + } + if include_full: + # Provide current state of full buffers (copy to avoid external mutation) + yield_event['buffers'] = { + ch: buf.copy() for ch, buf in zip(self.ro_chs.keys(), self.acc_buf) + } + self.rounds_pbar.update(new_points) + yield yield_event + + # Round complete; snapshot raw buffers for this round (copy to freeze state) + round_raw = [buf.copy() for buf in self.acc_buf] + all_rounds_raw.append(round_raw) + yield { + 'event': 'round-complete', + 'round': round_index, + 'round_raw': round_raw, + } + # Prepare for next round (reset buffers to NaN if more rounds remaining) + if round_index < rounds - 1: + for ii, nreads in enumerate(reads_per_shot): + self.acc_buf[ii].fill(np.nan) + + self.rounds_pbar.close() + self.round_track.update(1) + self.round_track.close() + # Final event summarizing all rounds' raw data + if return_end_of_exp_raw: + yield { + 'event': 'complete', + 'rounds_raw': all_rounds_raw, + }