Skip to main content
The TimeseriesStreamer class lets you push scalar instrument data (temperature, pressure, growth rate, etc.) to Atomscale for live tracking and analysis. Like RHEEDStreamer, it uses a Rust/PyO3 backend for efficient uploads.

Prerequisites

  • Instrument readings as lists of timestamps and values
  • Timestamps as Unix epoch seconds (float)

Create a streamer

from atomscale.streaming import TimeseriesStreamer

streamer = TimeseriesStreamer(
    api_key="YOUR_API_KEY",
    points_per_chunk=100,
)
Set verbosity=4 for detailed progress output.

Initialize a stream

data_id = streamer.initialize(
    stream_name="MBE growth - temperature log",
    physical_sample="GaN-001",
    synth_source_id=42,       # from client.list_growth_instruments()
    project_id="project-uuid",
)
The synth_source_id links the stream to a registered growth instrument. Use client.list_growth_instruments() to find available instruments.

Push single-channel data

Use push() to send one channel at a time. Chunks are uploaded asynchronously.
import time

timestamps = [time.time() + i * 0.1 for i in range(100)]
values = [25.0 + i * 0.01 for i in range(100)]

streamer.push(
    data_id,
    chunk_index=0,
    channel_name="temperature",
    timestamps=timestamps,
    values=values,
    units="C",
)

Push multi-channel data

Use push_multi() to send multiple channels in a single call:
streamer.push_multi(data_id, chunk_index=0, channels={
    "temperature": {
        "timestamps": timestamps,
        "values": temp_values,
        "units": "C",
    },
    "pressure": {
        "timestamps": timestamps,
        "values": pressure_values,
        "units": "mbar",
    },
})

Generator mode

Use run() when data is already buffered. It iterates over chunks and handles uploads:
def data_chunks(timestamps, values, chunk_size=100):
    for start in range(0, len(timestamps), chunk_size):
        yield (
            timestamps[start : start + chunk_size],
            values[start : start + chunk_size],
        )

data_id = streamer.initialize(stream_name="Pyrometer log")
streamer.run(
    data_id,
    channel_name="temperature",
    data_iter=data_chunks(all_timestamps, all_values),
    units="C",
)
streamer.finalize(data_id)

Finalize

Always call finalize() when the stream is complete:
streamer.finalize(data_id)
Always call finalize() even if the upload fails part-way. This signals the server to clean up resources and mark the stream appropriately.

Best practices

RecommendationRationale
Always call finalize()Allows graceful cleanup even on failure
Use push_multi() for correlated channelsKeeps channels in sync within each chunk
Link to growth instrumentsEnables cross-run analysis and instrument tracking
Use Unix epoch timestampsThe platform expects seconds since epoch

Next steps