Skip to main content
When streaming RHEED data, use the polling utilities in atomscale.timeseries to receive analysis updates as they arrive. Four entry points cover different concurrency needs:
FunctionUse case
iter_pollSynchronous loop that blocks between polls
start_polling_threadBackground thread for GUI or acquisition loops
aiter_pollAsync iterator for asyncio applications
start_polling_taskFire-and-forget asyncio task

Setup

from atomscale import Client
from atomscale.timeseries import (
    iter_poll,
    aiter_poll,
    start_polling_thread,
    start_polling_task,
)

client = Client(api_key="YOUR_API_KEY")
data_id = "YOUR_STREAMING_DATA_ID"


def latest_timestamp(df):
    """Extract the last timestamp for deduplication."""
    if df.empty or "timestamp" not in df.columns:
        return None
    return df.iloc[-1]["timestamp"]

Synchronous polling

Loop over iter_poll to fetch fresh rows on a fixed cadence. The helper waits interval seconds between polls.
for idx, result in enumerate(
    iter_poll(
        client,
        data_id=data_id,
        interval=5.0,
        last_n=10,
        distinct_by=latest_timestamp,
        max_polls=3,
    ),
    start=1,
):
    print(f"Poll {idx}: latest timestamp -> {latest_timestamp(result)}")
    print(result.tail())

Parameters

ParameterDescription
intervalSeconds between polls (default: 1.0)
last_nOnly fetch the last N rows (None for all)
distinct_byFunction to extract a key for deduplication
max_pollsStop after this many polls
fire_immediatelyPoll immediately or wait one interval first (default: True)
jitterRandom delay (0 to jitter) added to each sleep
untilPredicate function; stop when it returns True
on_errorHandler called when a poll fails

Background thread polling

Use start_polling_thread when you can’t block the main thread (GUI, acquisition loop):
collected = []


def on_result(result):
    print(f"Thread received {len(result)} rows")
    collected.append(result)


stop_event = start_polling_thread(
    client,
    data_id=data_id,
    interval=10.0,
    last_n=10,
    max_polls=5,
    distinct_by=latest_timestamp,
    on_result=on_result,
)

# Later, to stop polling:
# stop_event.set()
The function returns a threading.Event that you can set to stop polling gracefully.

Async polling

Two helpers integrate with asyncio:

Async iterator

aiter_poll yields results without blocking the event loop:
import asyncio


async def stream_updates():
    async for result in aiter_poll(
        client,
        data_id=data_id,
        interval=5.0,
        last_n=10,
        distinct_by=latest_timestamp,
        max_polls=3,
    ):
        print(f"Async poll received {len(result)} rows")
        print(result.tail())


asyncio.run(stream_updates())

Background task

start_polling_task creates a background task that invokes an optional async handler:
async def handle_async(result):
    print(f"Task handler received {len(result)} rows")


async def main():
    task = start_polling_task(
        client,
        data_id=data_id,
        interval=5.0,
        last_n=5,
        max_polls=3,
        distinct_by=latest_timestamp,
        on_result=handle_async,
    )
    await task


asyncio.run(main())

Deduplication

The distinct_by parameter accepts a function that extracts a hashable key from each result. Only results with a new key are yielded, so you won’t process the same data twice when polling faster than updates arrive.
def latest_timestamp(df):
    if df.empty or "timestamp" not in df.columns:
        return None
    return df.iloc[-1]["timestamp"]

Drift correction

All polling utilities use drift-corrected scheduling to maintain the requested cadence even if individual polls are slow. Missed ticks are skipped rather than accumulated.

Next steps

Stream RHEED

Set up RHEED streaming from your instrument.

Client Reference

Full Client class documentation.