import csv import time from collections import defaultdict, deque import msgpack import websocket NANOSECONDS_PER_SECOND = 1_000_000_000.0 GAP_DETECT = False INFER_ALPHA = 0.05 GAP_MULTIPLIER = 2.0 OUTLIER_MULTIPLIER = 5.0 JITTER_MULTIPLIER = 3.0 BASELINE_MARGIN_RATIO = 0.02 class FX4Streamer: def __init__( self, ip_address, paths=None, socket_timeout=10.0, status_interval=1.0, ): self.ip_address = ip_address self.socket_timeout = socket_timeout self.status_interval = status_interval self.ws = None self.paths = paths or [ "/fx4/adc/channel_1/value", "/fx4/adc/channel_2/value", "/fx4/adc/channel_3/value", "/fx4/adc/channel_4/value", ] self.database = {path: [] for path in self.paths} self.stats = { "messages": 0, "samples_by_path": defaultdict(int), "timestamp_backwards": 0, "timestamp_gaps": 0, "last_timestamp_by_path": {}, "max_gap_by_path": defaultdict(float), "inferred_expected_dt_by_path": {}, "inferred_jitter_by_path": {}, } def _update_inferred_expected_dt(self, path, dt): if dt <= 0: return inferred = self.stats["inferred_expected_dt_by_path"].get(path) if inferred is None: self.stats["inferred_expected_dt_by_path"][path] = dt self.stats["inferred_jitter_by_path"][path] = 0.0 return # Keep the baseline stable by ignoring obvious gap outliers. if dt > OUTLIER_MULTIPLIER * inferred: return self.stats["inferred_expected_dt_by_path"][path] = ( (1.0 - INFER_ALPHA) * inferred + INFER_ALPHA * dt ) jitter_before = self.stats["inferred_jitter_by_path"].get(path, 0.0) self.stats["inferred_jitter_by_path"][path] = ( (1.0 - INFER_ALPHA) * jitter_before + INFER_ALPHA * abs(dt - inferred) ) def connect(self): self.ws = websocket.create_connection( f"ws://{self.ip_address}", subprotocols=["mpack"], timeout=self.socket_timeout, ) def close(self): if self.ws is not None: try: self.ws.close() except Exception: pass self.ws = None def send_event(self, event, data=None): payload = {"event": event, "data": data} packed = msgpack.packb(payload, use_bin_type=True) self.ws.send_binary(packed) def subscribe(self, buffered=True): sub = {path: buffered for path in self.paths} self.send_event("subscribe", sub) def get(self): self.send_event("get") def recv_message(self): raw = self.ws.recv() if isinstance(raw, str): raw = raw.encode("utf-8") try: return msgpack.unpackb(raw, raw=False) except msgpack.ExtraData as exc: raise RuntimeError(f"Received malformed msgpack payload: {exc}") from exc except msgpack.FormatError as exc: raise RuntimeError(f"Received invalid msgpack format: {exc}") from exc except msgpack.StackError as exc: raise RuntimeError( f"Received msgpack payload too deeply nested: {exc}" ) from exc def process_update( self, data, expected_dt_by_path=None, store_samples=True, return_valid=False, ): stats = self.stats database = self.database samples_by_path = stats["samples_by_path"] last_timestamp_by_path = stats["last_timestamp_by_path"] max_gap_by_path = stats["max_gap_by_path"] valid_by_path = {} if return_valid else None for path, values in data.items(): if path not in database: continue if not isinstance(values, list): continue if not values: continue last_ts = last_timestamp_by_path.get(path) expected_dt = None if expected_dt_by_path is not None: expected_dt = expected_dt_by_path.get(path) collect_valid_samples = store_samples or return_valid valid_samples = [] if collect_valid_samples else None valid_count = 0 for sample in values: if not isinstance(sample, (list, tuple)) or len(sample) < 2: continue value = sample[0] timestamp = sample[1] if not isinstance(timestamp, (int, float)): continue if collect_valid_samples: valid_samples.append((value, timestamp)) valid_count += 1 if last_ts is not None: dt = timestamp - last_ts inferred_expected_dt = stats["inferred_expected_dt_by_path"].get(path) baseline_expected_dt = expected_dt if baseline_expected_dt is None: baseline_expected_dt = inferred_expected_dt if dt < 0: stats["timestamp_backwards"] += 1 if GAP_DETECT: jitter = stats["inferred_jitter_by_path"].get(path, 0.0) jitter_margin = 0.0 if baseline_expected_dt is not None: jitter_margin = max( BASELINE_MARGIN_RATIO * baseline_expected_dt, JITTER_MULTIPLIER * jitter, ) gap_threshold = None if baseline_expected_dt is not None: gap_threshold = ( GAP_MULTIPLIER * baseline_expected_dt + jitter_margin ) is_gap = gap_threshold is not None and dt > gap_threshold if is_gap: stats["timestamp_gaps"] += 1 if dt > max_gap_by_path[path]: max_gap_by_path[path] = dt if expected_dt is None: self._update_inferred_expected_dt(path, dt) last_ts = timestamp if valid_count: if store_samples: database[path].extend(valid_samples) samples_by_path[path] += valid_count if return_valid: valid_by_path[path] = valid_samples last_timestamp_by_path[path] = last_ts return valid_by_path def _run_stream_loop( self, collect_time, buffered=True, expected_sample_rate=None, on_update=None ): expected_dt_by_path = None if expected_sample_rate: # Device timestamps are nanoseconds, so expected dt must also be nanoseconds. expected_dt = NANOSECONDS_PER_SECOND / expected_sample_rate expected_dt_by_path = {path: expected_dt for path in self.paths} self.connect() self.subscribe(buffered=buffered) self.get() start = time.monotonic() last_status = start try: while True: now = time.monotonic() if now - start >= collect_time: break message = self.recv_message() if not isinstance(message, dict): continue self.stats["messages"] += 1 event = message.get("event") data = message.get("data") if event == "update" and isinstance(data, dict): if on_update is not None: on_update(data, expected_dt_by_path) self.get() elif event == "error": raise RuntimeError(f"FX4 returned error: {data}") now = time.monotonic() if now - last_status >= self.status_interval: elapsed = now - start ch1_count = self.stats["samples_by_path"].get(self.paths[0], 0) print( f"[{elapsed:8.2f}s] messages={self.stats['messages']} " f"samples_ch1={ch1_count}" ) last_status = now except ( ConnectionResetError, websocket.WebSocketConnectionClosedException, websocket.WebSocketTimeoutException, ) as exc: raise RuntimeError(f"Connection lost during acquisition: {exc}") from exc finally: self.close() def stream(self, collect_time, buffered=True, expected_sample_rate=None): def on_update(data, expected_dt_by_path): self.process_update(data, expected_dt_by_path=expected_dt_by_path) self._run_stream_loop( collect_time=collect_time, buffered=buffered, expected_sample_rate=expected_sample_rate, on_update=on_update, ) def stream_to_csv( self, output_file, collect_time, buffered=True, expected_sample_rate=None, flush_every_rows=5000, max_pending_per_path=200000, ): if not self.paths: raise RuntimeError("No paths configured") if flush_every_rows <= 0: raise ValueError("flush_every_rows must be > 0") if max_pending_per_path <= 0: raise ValueError("max_pending_per_path must be > 0") pending_by_path = {path: deque() for path in self.paths} rows_buffer = [] rows_written = 0 with open(output_file, "w", newline="", buffering=1024 * 1024) as file: writer = csv.writer(file) headers = ["Timestamp"] + [ f"Chan {index}" for index in range(1, len(self.paths) + 1) ] writer.writerow(headers) def flush_rows(): nonlocal rows_written if rows_buffer: writer.writerows(rows_buffer) rows_written += len(rows_buffer) rows_buffer.clear() file.flush() def emit_from_pending(): if not self.paths: return emit_count = min(len(pending_by_path[path]) for path in self.paths) if emit_count <= 0: return first_path = self.paths[0] first_queue = pending_by_path[first_path] other_paths = self.paths[1:] pending_get = pending_by_path.get for _ in range(emit_count): first_sample = first_queue.popleft() row = [first_sample[1], first_sample[0]] for path in other_paths: row.append(pending_get(path).popleft()[0]) rows_buffer.append(row) if len(rows_buffer) >= flush_every_rows: flush_rows() def on_update(data, expected_dt_by_path): valid_by_path = self.process_update( data, expected_dt_by_path=expected_dt_by_path, store_samples=False, return_valid=True, ) if not valid_by_path: return fast_path_count = 0 if all(not pending_by_path[path] for path in self.paths): if all(path in valid_by_path for path in self.paths): fast_path_count = len(valid_by_path[self.paths[0]]) if any( len(valid_by_path[path]) != fast_path_count for path in self.paths[1:] ): fast_path_count = 0 if fast_path_count > 0: for idx in range(fast_path_count): first_sample = valid_by_path[self.paths[0]][idx] row = [first_sample[1], first_sample[0]] for path in self.paths[1:]: row.append(valid_by_path[path][idx][0]) rows_buffer.append(row) if len(rows_buffer) >= flush_every_rows: flush_rows() return for path, valid_samples in valid_by_path.items(): if path in pending_by_path and valid_samples: pending_by_path[path].extend(valid_samples) emit_from_pending() for path in self.paths: queued = len(pending_by_path[path]) if queued > max_pending_per_path: raise RuntimeError( f"Pending buffer exceeded limit for {path}. " f"Queued={queued}, " f"limit={max_pending_per_path}. " "Try disabling buffered mode or reducing collect_time." ) self._run_stream_loop( collect_time=collect_time, buffered=buffered, expected_sample_rate=expected_sample_rate, on_update=on_update, ) emit_from_pending() flush_rows() lengths = {path: self.stats["samples_by_path"][path] for path in self.paths} print(f"Wrote {rows_written} aligned rows to {output_file}") print(f"Raw sample counts by channel: {lengths}") def write_csv(self, output_file): if not self.paths: raise RuntimeError("No paths configured") channel_data = [self.database[path] for path in self.paths] min_len = min(len(samples) for samples in channel_data) if min_len == 0: raise RuntimeError("No samples collected") with open(output_file, "w", newline="") as file: writer = csv.writer(file) headers = ["Timestamp"] + [ f"Chan {index}" for index in range(1, len(self.paths) + 1) ] writer.writerow(headers) for i in range(min_len): row = [channel_data[0][i][1]] for ch in range(len(channel_data)): row.append(channel_data[ch][i][0]) writer.writerow(row) lengths = {path: len(self.database[path]) for path in self.paths} print(f"Wrote {min_len} aligned rows to {output_file}") print(f"Raw sample counts by channel: {lengths}") def print_summary(self): print("\nSummary") print("-------") print(f"Messages received: {self.stats['messages']}") print(f"Timestamp backwards events: {self.stats['timestamp_backwards']}") print(f"Timestamp gap events: {self.stats['timestamp_gaps']}") for path in self.paths: inferred_dt = self.stats["inferred_expected_dt_by_path"].get(path) inferred_rate = "n/a" if inferred_dt and inferred_dt > 0: inferred_rate = f"{NANOSECONDS_PER_SECOND / inferred_dt:.3f} Hz" max_gap_ns = self.stats["max_gap_by_path"][path] print( f"{path}: samples={self.stats['samples_by_path'][path]}, " f"max_gap_ns={max_gap_ns:.1f} ({max_gap_ns / 1_000_000.0:.3f} ms), " f"inferred_rate={inferred_rate}" ) def test_fx4_mpack( ip_address, output_file, collect_time, sample_rate_hz=None, buffered=True, stream_to_file=True, flush_every_rows=5000, max_pending_per_path=200000, plot_at_end=True, max_plot_points=20000, ): streamer = FX4Streamer(ip_address=ip_address) if stream_to_file: streamer.stream_to_csv( output_file=output_file, collect_time=collect_time, buffered=buffered, expected_sample_rate=sample_rate_hz, flush_every_rows=flush_every_rows, max_pending_per_path=max_pending_per_path, ) else: streamer.stream( collect_time=collect_time, buffered=buffered, expected_sample_rate=sample_rate_hz, ) streamer.write_csv(output_file) streamer.print_summary() if plot_at_end: plot_csv_gaps( csv_file=output_file, expected_sample_rate=sample_rate_hz, max_points=max_plot_points, ) def plot_csv_gaps(csv_file, expected_sample_rate=None, max_points=20000): try: import matplotlib.pyplot as plt except ImportError: print( "Plot skipped: matplotlib is not installed. " "Run 'pip install matplotlib' to enable plotting." ) return if max_points <= 0: raise ValueError("max_points must be > 0") total_rows = 0 with open(csv_file, "r", newline="") as file: reader = csv.reader(file) header = next(reader, None) if not header: print("Plot skipped: CSV is empty.") return for _ in reader: total_rows += 1 if total_rows == 0: print("Plot skipped: CSV has no data rows.") return stride = max(1, total_rows // max_points) times = [] channel_series = [[] for _ in header[1:]] dt_times = [] dt_values = [] gap_points_t = [] gap_points_dt = [] expected_dt = ( NANOSECONDS_PER_SECOND / expected_sample_rate if expected_sample_rate else None ) inferred_expected_dt = None inferred_jitter_dt = 0.0 with open(csv_file, "r", newline="") as file: reader = csv.reader(file) next(reader, None) prev_t = None for idx, row in enumerate(reader): if len(row) < 2: continue try: t = float(row[0]) except ValueError: continue if prev_t is not None: dt = t - prev_t if expected_dt is None and dt > 0: if inferred_expected_dt is None: inferred_expected_dt = dt elif dt <= OUTLIER_MULTIPLIER * inferred_expected_dt: inferred_jitter_dt = ( (1.0 - INFER_ALPHA) * inferred_jitter_dt + INFER_ALPHA * abs(dt - inferred_expected_dt) ) inferred_expected_dt = ( (1.0 - INFER_ALPHA) * inferred_expected_dt + INFER_ALPHA * dt ) baseline_expected_dt = expected_dt or inferred_expected_dt if idx % stride == 0: dt_times.append(t) dt_values.append(dt) gap_threshold = None if GAP_DETECT and baseline_expected_dt is not None: jitter_margin = max( BASELINE_MARGIN_RATIO * baseline_expected_dt, JITTER_MULTIPLIER * inferred_jitter_dt, ) gap_threshold = GAP_MULTIPLIER * baseline_expected_dt + jitter_margin if gap_threshold is not None and dt > gap_threshold: gap_points_t.append(t) gap_points_dt.append(dt) prev_t = t if idx % stride != 0: continue times.append(t) for ch_idx, series in enumerate(channel_series, start=1): try: series.append(float(row[ch_idx])) except (ValueError, IndexError): series.append(float("nan")) if not times: print("Plot skipped: no plottable rows found.") return fig, (ax_signal, ax_dt) = plt.subplots(2, 1, figsize=(14, 8), sharex=True) fig.suptitle("FX4 Capture - Data and Timestamp Gaps") for ch_idx, series in enumerate(channel_series, start=1): ax_signal.plot(times, series, linewidth=0.7, label=f"Chan {ch_idx}", alpha=0.9) ax_signal.set_ylabel("ADC Value") ax_signal.legend(loc="upper right", ncol=min(4, len(channel_series))) ax_signal.grid(True, alpha=0.3) ax_dt.plot( dt_times, dt_values, linewidth=0.8, color="tab:blue", label="dt (timestamp units)", ) baseline_for_plot = expected_dt or inferred_expected_dt if baseline_for_plot is not None: baseline_label = "Expected dt" if expected_dt is None: baseline_label = "Inferred dt" ax_dt.axhline( baseline_for_plot, color="tab:green", linestyle="--", linewidth=1.0, label=f"{baseline_label} ({baseline_for_plot:.6g} ns)", ) ax_dt.axhline( GAP_MULTIPLIER * baseline_for_plot, color="tab:red", linestyle="--", linewidth=1.0, label=f"Gap threshold ({GAP_MULTIPLIER * baseline_for_plot:.6g} ns)", ) if GAP_DETECT and gap_points_t: ax_dt.scatter( gap_points_t, gap_points_dt, s=8, color="tab:red", alpha=0.7, label=f"Detected gaps ({len(gap_points_t)})", ) ax_dt.set_ylabel("Delta t (timestamp units)") ax_dt.set_xlabel("Timestamp") ax_dt.grid(True, alpha=0.3) ax_dt.legend(loc="upper right") fig.tight_layout() plt.show() if __name__ == "__main__": try: test_fx4_mpack( ip_address="192.168.100.117", output_file="fx4_capture.csv", collect_time=200.0, sample_rate_hz=None, buffered=True, stream_to_file=True, flush_every_rows=5000, max_pending_per_path=200000, plot_at_end=True, max_plot_points=100000, ) except RuntimeError as exc: print(f"Acquisition failed: {exc}")