Skip to main content
When streaming RHEED data, use the polling utilities in atomscale.timeseries to receive analysis updates as they arrive. Four entry points cover different concurrency needs:
FunctionUse case
iter_pollSynchronous loop that blocks between polls
start_polling_threadBackground thread for GUI or acquisition loops
aiter_pollAsync iterator for asyncio applications
start_polling_taskFire-and-forget asyncio task

Setup

from atomscale import Client
from atomscale.timeseries import (
    iter_poll,
    aiter_poll,
    start_polling_thread,
    start_polling_task,
)

client = Client(api_key="YOUR_API_KEY")
data_id = "YOUR_STREAMING_DATA_ID"


def latest_timestamp(df):
    """Extract the last timestamp for deduplication."""
    if df.empty or "timestamp" not in df.columns:
        return None
    return df.iloc[-1]["timestamp"]

Synchronous polling

Loop over iter_poll to fetch fresh rows on a fixed cadence. The helper waits interval seconds between polls.
for idx, result in enumerate(
    iter_poll(
        client,
        data_id=data_id,
        interval=5.0,
        last_n=10,
        distinct_by=latest_timestamp,
        max_polls=3,
    ),
    start=1,
):
    print(f"Poll {idx}: latest timestamp -> {latest_timestamp(result)}")
    print(result.tail())

Parameters

ParameterDescription
intervalSeconds between polls (default: 1.0)
last_nOnly fetch the last N rows (None for all)
distinct_byFunction to extract a key for deduplication
max_pollsStop after this many polls
fire_immediatelyPoll immediately or wait one interval first (default: True)
jitterRandom delay (0 to jitter) added to each sleep
untilPredicate function; stop when it returns True
on_errorHandler called when a poll fails

Background thread polling

Use start_polling_thread when you can’t block the main thread (GUI, acquisition loop):
collected = []


def on_result(result):
    print(f"Thread received {len(result)} rows")
    collected.append(result)


stop_event = start_polling_thread(
    client,
    data_id=data_id,
    interval=10.0,
    last_n=10,
    max_polls=5,
    distinct_by=latest_timestamp,
    on_result=on_result,
)

# Later, to stop polling:
# stop_event.set()
The function returns a threading.Event that you can set to stop polling gracefully.

Async polling

Two helpers integrate with asyncio:

Async iterator

aiter_poll yields results without blocking the event loop:
import asyncio


async def stream_updates():
    async for result in aiter_poll(
        client,
        data_id=data_id,
        interval=5.0,
        last_n=10,
        distinct_by=latest_timestamp,
        max_polls=3,
    ):
        print(f"Async poll received {len(result)} rows")
        print(result.tail())


asyncio.run(stream_updates())

Background task

start_polling_task creates a background task that invokes an optional async handler:
async def handle_async(result):
    print(f"Task handler received {len(result)} rows")


async def main():
    task = start_polling_task(
        client,
        data_id=data_id,
        interval=5.0,
        last_n=5,
        max_polls=3,
        distinct_by=latest_timestamp,
        on_result=handle_async,
    )
    await task


asyncio.run(main())

Deduplication

The distinct_by parameter accepts a function that extracts a hashable key from each result. Only results with a new key are yielded, so you won’t process the same data twice when polling faster than updates arrive.
def latest_timestamp(df):
    if df.empty or "timestamp" not in df.columns:
        return None
    return df.iloc[-1]["timestamp"]

Drift correction

All polling utilities use drift-corrected scheduling to maintain the requested cadence even if individual polls are slow. Missed ticks are skipped rather than accumulated.

Next steps