Skip to main content
The TimeseriesStreamer class lets you push scalar instrument data (temperature, pressure, growth rate, etc.) to Atomscale for live tracking and analysis. Like RHEEDStreamer, it uses a Rust/PyO3 backend for efficient uploads.

Prerequisites

  • Instrument readings as lists of timestamps and values
  • Timestamps as Unix epoch seconds (float)

Create a streamer

from atomscale.streaming import TimeseriesStreamer

streamer = TimeseriesStreamer(
    api_key="YOUR_API_KEY",
    points_per_chunk=100,
)
Set verbosity=4 for detailed progress output.

Initialize a stream

data_id = streamer.initialize(
    stream_name="MBE growth - temperature log",
    physical_sample="GaN-001",
    synth_source_id=42,       # from client.list_growth_instruments()
    project_id="project-uuid",
)
The synth_source_id links the stream to a registered growth instrument. Use client.list_growth_instruments() to find available instruments.

Push single-channel data

Use push() to send one channel at a time. Chunks are uploaded asynchronously.
import time

timestamps = [time.time() + i * 0.1 for i in range(100)]
values = [25.0 + i * 0.01 for i in range(100)]

streamer.push(
    data_id,
    chunk_index=0,
    channel_name="temperature",
    timestamps=timestamps,
    values=values,
    units="C",
)

Push multi-channel data

Use push_multi() to send multiple channels in a single call:
streamer.push_multi(data_id, chunk_index=0, channels={
    "temperature": {
        "timestamps": timestamps,
        "values": temp_values,
        "units": "C",
    },
    "pressure": {
        "timestamps": timestamps,
        "values": pressure_values,
        "units": "mbar",
    },
})

Generator mode

Use run() when data is already buffered. It iterates over chunks and handles uploads:
def data_chunks(timestamps, values, chunk_size=100):
    for start in range(0, len(timestamps), chunk_size):
        yield (
            timestamps[start : start + chunk_size],
            values[start : start + chunk_size],
        )

data_id = streamer.initialize(stream_name="Pyrometer log")
streamer.run(
    data_id,
    channel_name="temperature",
    data_iter=data_chunks(all_timestamps, all_values),
    units="C",
)
streamer.finalize(data_id)

Finalize

Always call finalize() when the stream is complete:
streamer.finalize(data_id)
Always call finalize() even if the upload fails part-way. This signals the server to clean up resources and mark the stream appropriately.

Best practices

RecommendationRationale
Always call finalize()Allows graceful cleanup even on failure
Use push_multi() for correlated channelsKeeps channels in sync within each chunk
Link to growth instrumentsEnables cross-run analysis and instrument tracking
Use Unix epoch timestampsThe platform expects seconds since epoch

Next steps

Poll Time Series

Monitor streaming analysis in real time.

Streaming Reference

Full streaming API documentation.