The RHEEDStreamer class lets you push RHEED frames directly from your instrument to Atomscale for live analysis. Choose between two streaming modes:
- Callback / push mode - The camera or SDK hands you fresh frames and you upload each chunk immediately
- Generator / pull mode - You already have frames buffered and want the helper to pace the upload for you
Prerequisites
numpy installed
- RHEED frames as
uint8 arrays shaped (N, H, W) or (H, W)
- A stable clock so you can honor the capture cadence
Create a streamer
from atomscale.streaming import RHEEDStreamer
streamer = RHEEDStreamer(api_key="YOUR_API_KEY")
Optional keyword arguments tune chunking and logging. Set verbosity=4 for detailed progress output.
Callback / push mode
Use this variant when frames arrive live from the instrument. The outer loop is your acquisition callback: once a chunk is ready, send it to the API and wait just long enough to match the capture cadence.
import numpy as np
import time
fps = 120.0
chunk_size = 240 # 2 seconds of frames at 120 fps
seconds_per_chunk = chunk_size / fps
data_id = streamer.initialize(
fps=fps,
rotations_per_min=15.0, # set to 0.0 for stationary
chunk_size=chunk_size,
stream_name="Demo (callback mode)",
physical_sample="Demo wafer",
)
for chunk_idx in range(5):
# Replace with actual frame capture
frames = np.random.randint(0, 256, size=(chunk_size, 300, 500), dtype=np.uint8)
streamer.push(data_id, chunk_idx, frames)
time.sleep(seconds_per_chunk)
time.sleep(1.0) # let in-flight uploads finish
streamer.finalize(data_id)
Generator / pull mode
Use this form when frames are already buffered (from disk, memory, or a simulated source). Provide an iterator that yields chunks and the helper handles pacing and retry logic.
import numpy as np
import time
def frame_chunks(frames, *, chunk_size=240, fps=120.0):
seconds_per_chunk = chunk_size / fps
for start in range(0, len(frames), chunk_size):
yield frames[start : start + chunk_size]
time.sleep(seconds_per_chunk)
# Simulated buffered frames
frames = np.random.randint(0, 256, size=(1200, 300, 500), dtype=np.uint8)
data_id = streamer.initialize(
fps=10.0,
rotations_per_min=0.0,
chunk_size=20,
stream_name="Demo (generator mode)",
physical_sample="Demo wafer",
)
streamer.run(data_id, frame_chunks(frames, chunk_size=20, fps=10.0))
streamer.finalize(data_id)
Best practices
| Recommendation | Why |
|---|
| Maintain original capture cadence | The server expects frames at the declared FPS |
| Use chunk sizes of at least 2 seconds | Reduces overhead and improves throughput |
Always call finalize() | Lets the pipeline clean up gracefully, even on failure |
| Use distinct stream names | Makes it easier to find runs later |
Always call finalize() even if the upload fails part-way. This signals the server to clean up
resources and mark the stream appropriately.
Link to samples and projects
Pass physical_sample to initialize() to link the stream to an existing sample. Names are matched case-insensitively, or a new sample is created if no match is found. You can also pass project_id to associate the stream with a project.
data_id = streamer.initialize(
fps=120.0,
rotations_per_min=15.0,
chunk_size=240,
stream_name="Growth run 42",
physical_sample="GaN-001",
project_id="your-project-uuid",
)
Next steps