The TimeseriesStreamer class lets you push scalar instrument data (temperature, pressure, growth rate, etc.) to Atomscale for live tracking and analysis. Like RHEEDStreamer, it uses a Rust/PyO3 backend for efficient uploads.
Prerequisites
- Instrument readings as lists of timestamps and values
- Timestamps as Unix epoch seconds (float)
Create a streamer
from atomscale.streaming import TimeseriesStreamer
streamer = TimeseriesStreamer(
api_key="YOUR_API_KEY",
points_per_chunk=100,
)
Set verbosity=4 for detailed progress output.
Initialize a stream
data_id = streamer.initialize(
stream_name="MBE growth - temperature log",
physical_sample="GaN-001",
synth_source_id=42, # from client.list_growth_instruments()
project_id="project-uuid",
)
The synth_source_id links the stream to a registered growth instrument. Use client.list_growth_instruments() to find available instruments.
Push single-channel data
Use push() to send one channel at a time. Chunks are uploaded asynchronously.
import time
timestamps = [time.time() + i * 0.1 for i in range(100)]
values = [25.0 + i * 0.01 for i in range(100)]
streamer.push(
data_id,
chunk_index=0,
channel_name="temperature",
timestamps=timestamps,
values=values,
units="C",
)
Push multi-channel data
Use push_multi() to send multiple channels in a single call:
streamer.push_multi(data_id, chunk_index=0, channels={
"temperature": {
"timestamps": timestamps,
"values": temp_values,
"units": "C",
},
"pressure": {
"timestamps": timestamps,
"values": pressure_values,
"units": "mbar",
},
})
Generator mode
Use run() when data is already buffered. It iterates over chunks and handles uploads:
def data_chunks(timestamps, values, chunk_size=100):
for start in range(0, len(timestamps), chunk_size):
yield (
timestamps[start : start + chunk_size],
values[start : start + chunk_size],
)
data_id = streamer.initialize(stream_name="Pyrometer log")
streamer.run(
data_id,
channel_name="temperature",
data_iter=data_chunks(all_timestamps, all_values),
units="C",
)
streamer.finalize(data_id)
Finalize
Always call finalize() when the stream is complete:
streamer.finalize(data_id)
Always call finalize() even if the upload fails part-way. This signals the server to clean up
resources and mark the stream appropriately.
Best practices
| Recommendation | Rationale |
|---|
Always call finalize() | Allows graceful cleanup even on failure |
Use push_multi() for correlated channels | Keeps channels in sync within each chunk |
| Link to growth instruments | Enables cross-run analysis and instrument tracking |
| Use Unix epoch timestamps | The platform expects seconds since epoch |
Next steps