Skip to main content
The RHEEDStreamer class provides high-performance streaming of RHEED frames to the Atomscale platform. It bridges Python to a Rust/PyO3 backend for efficient, concurrent packaging and upload of grayscale frames.

Constructor

from atomscale.streaming import RHEEDStreamer

streamer = RHEEDStreamer(
    api_key="YOUR_API_KEY",
    endpoint="https://api.atomscale.ai/",
    verbosity=2,
)
api_key
str
required
API key for authentication.
endpoint
str | None
default:"None"
Root API endpoint. If not provided, uses the default production endpoint or reads from AS_API_ENDPOINT environment variable.
verbosity
int | None
default:"None"
Logging verbosity level. Set to 4 for detailed progress output.

initialize()

Start a new streaming session and return a data ID for subsequent operations.
data_id = streamer.initialize(
    fps=120.0,
    rotations_per_min=15.0,
    chunk_size=240,
    stream_name="Growth run 42",
    physical_sample="GaN-001",
)

Parameters

fps
float
required
Frame rate of the capture in frames per second.
rotations_per_min
float
required
Sample rotation speed in rotations per minute. Set to 0.0 for stationary samples.
chunk_size
int
required
Number of frames per upload chunk. Should cover at least 2 seconds of video for optimal throughput.
stream_name
str | None
default:"None"
Human-readable name for this stream. Helps identify runs in the UI.
physical_sample
str | None
default:"None"
Name of the physical sample to link this stream to. Names are matched case-insensitively. If no match is found, a new sample record is created.
project_id
str | None
default:"None"
UUID of the project to associate with this stream. When provided along with physical_sample, the project’s tracking is automatically updated.

Returns

data_id
str
Unique identifier for this streaming session. Use this ID with push(), run(), and finalize().

Example

data_id = streamer.initialize(
    fps=120.0,
    rotations_per_min=15.0,
    chunk_size=240,
    stream_name="MBE growth - GaN on sapphire",
    physical_sample="GaN-001",
)
print(f"Started stream: {data_id}")

run()

Stream frames using generator/pull mode. The streamer consumes chunks from an iterator and handles pacing and retry logic.
streamer.run(data_id, frame_iterator)

Parameters

data_id
str
required
The data ID returned by initialize().
frames_iter
Iterable[NDArray[np.uint8]]
required
Iterator yielding frame chunks. Each chunk should be a numpy array of shape (N, H, W) with dtype uint8, where N is the chunk size.

Returns

None. Frames are uploaded as they are yielded by the iterator.

Example

import numpy as np
import time


def frame_chunks(frames, *, chunk_size=240, fps=120.0):
    """Yield frame chunks at the capture cadence."""
    seconds_per_chunk = chunk_size / fps
    for start in range(0, len(frames), chunk_size):
        yield frames[start : start + chunk_size]
        time.sleep(seconds_per_chunk)


# Buffered frames (e.g., loaded from disk)
frames = np.random.randint(0, 256, size=(1200, 300, 500), dtype=np.uint8)

data_id = streamer.initialize(fps=10.0, rotations_per_min=0.0, chunk_size=20)
streamer.run(data_id, frame_chunks(frames, chunk_size=20, fps=10.0))
streamer.finalize(data_id)

push()

Upload a single chunk of frames in callback/push mode. Use this when frames arrive live from the instrument.
streamer.push(data_id, chunk_idx, frames)

Parameters

data_id
str
required
The data ID returned by initialize().
chunk_idx
int
required
Zero-based index of this chunk. Chunks must be pushed in order.
frames
NDArray[np.uint8]
required
Numpy array of frames with shape (N, H, W) or (H, W) for single frames. Must have dtype uint8.

Returns

None. The chunk is queued for upload.

Example

import numpy as np
import time

fps = 120.0
chunk_size = 240
seconds_per_chunk = chunk_size / fps

data_id = streamer.initialize(
    fps=fps,
    rotations_per_min=15.0,
    chunk_size=chunk_size,
)

for chunk_idx in range(10):
    # Capture frames from instrument
    frames = np.random.randint(0, 256, size=(chunk_size, 300, 500), dtype=np.uint8)
    streamer.push(data_id, chunk_idx, frames)
    time.sleep(seconds_per_chunk)

time.sleep(1.0)  # Let in-flight uploads finish
streamer.finalize(data_id)

finalize()

Complete the streaming session and signal the server to begin final processing.
streamer.finalize(data_id)

Parameters

data_id
str
required
The data ID returned by initialize().

Returns

None. The stream is marked as complete and queued for final analysis.
Always call finalize() even if the upload fails part-way. This signals the server to clean up resources and mark the stream appropriately.

Example

try:
    data_id = streamer.initialize(fps=120.0, rotations_per_min=15.0, chunk_size=240)
    # ... push or run frames ...
finally:
    streamer.finalize(data_id)

Frame Format Requirements

RHEED frames must meet these requirements:
PropertyRequirement
Data typenumpy.uint8 (8-bit unsigned integer)
Shape(N, H, W) for chunks or (H, W) for single frames
ColorGrayscale only
Values0-255 intensity range

Example frame preparation

import numpy as np

# From a color image
color_frame = capture_from_camera()  # Shape: (H, W, 3)
gray_frame = np.mean(color_frame, axis=2).astype(np.uint8)

# From 16-bit data
raw_frame = capture_raw()  # Shape: (H, W), dtype: uint16
normalized = ((raw_frame / raw_frame.max()) * 255).astype(np.uint8)

# Batch into chunks
chunk_size = 240
frames = np.stack([capture_frame() for _ in range(chunk_size)])  # (240, H, W)

Best Practices

RecommendationRationale
Match the capture cadenceThe server expects frames at the declared FPS
Use chunk sizes ≥ 2 secondsReduces network overhead and improves throughput
Always call finalize()Allows graceful cleanup even on failure
Use distinct stream namesMakes runs easier to find later
Link to physical samplesEnables cross-run analysis and tracking

Error Handling

The streamer raises exceptions for connection failures and invalid parameters. Wrap streaming code in try/finally to ensure finalize() is called:
data_id = None
try:
    data_id = streamer.initialize(
        fps=120.0,
        rotations_per_min=15.0,
        chunk_size=240,
    )

    for chunk_idx, frames in enumerate(capture_frames()):
        streamer.push(data_id, chunk_idx, frames)

except Exception as e:
    print(f"Streaming failed: {e}")
finally:
    if data_id:
        streamer.finalize(data_id)

TimeseriesStreamer

The TimeseriesStreamer class streams scalar instrument data (temperature, pressure, growth rate, etc.) to the Atomscale platform. Like RHEEDStreamer, it uses a Rust/PyO3 backend.

Constructor

from atomscale.streaming import TimeseriesStreamer

streamer = TimeseriesStreamer(
    api_key="YOUR_API_KEY",
    points_per_chunk=100,
    verbosity=2,
)
api_key
str
required
API key for authentication.
endpoint
str | None
default:"None"
Root API endpoint. If not provided, uses the default production endpoint or reads from AS_API_ENDPOINT environment variable.
points_per_chunk
int
default:"100"
Expected number of data points per chunk. Used for array positioning.
verbosity
int | None
default:"None"
Logging verbosity level. Set to 4 for detailed progress output.

initialize()

Start a new timeseries streaming session.
data_id = streamer.initialize(
    stream_name="MBE growth - temperature log",
    synth_source_id=42,
    physical_sample="GaN-001",
    project_id="project-uuid",
)

Parameters

stream_name
str | None
default:"None"
Human-readable name for this stream.
synth_source_id
int | None
default:"None"
Growth instrument ID to link. Must belong to your organization.
physical_sample
str | None
default:"None"
Name or UUID of a physical sample to associate with the stream. Names are matched case-insensitively, or a new sample is created if no match is found.
project_id
str | None
default:"None"
UUID of the project to associate with this stream.

Returns

data_id
str
Unique identifier for this streaming session.

push()

Upload a single chunk of data for one channel. Spawns an async upload task and returns immediately.
streamer.push(
    data_id,
    chunk_index=0,
    channel_name="temperature",
    timestamps=[1700000000.0, 1700000000.1],
    values=[25.0, 25.1],
    units="C",
)

Parameters

data_id
str
required
The data ID returned by initialize().
chunk_index
int
required
Zero-based index of this chunk. Chunks must be pushed in order.
channel_name
str
required
Name of the data channel (e.g., “temperature”, “pressure”).
timestamps
list[float]
required
Unix epoch timestamps in seconds.
values
list[float]
required
Measured values corresponding to each timestamp.
units
str | None
default:"None"
Optional units for the values (e.g., “C”, “mbar”).

Returns

None. The chunk is queued for upload.

push_multi()

Upload data for multiple channels in a single call.
streamer.push_multi(data_id, chunk_index=0, channels={
    "temperature": {
        "timestamps": [1700000000.0, 1700000000.1],
        "values": [25.0, 25.1],
        "units": "C",
    },
    "pressure": {
        "timestamps": [1700000000.0, 1700000000.1],
        "values": [1.0, 1.1],
        "units": "mbar",
    },
})

Parameters

data_id
str
required
The data ID returned by initialize().
chunk_index
int
required
Zero-based index of this chunk.
channels
dict[str, dict]
required
Mapping of channel names to channel data. Each value dict should contain timestamps (list of floats), values (list of floats), and optionally units (string).

Returns

None.

run()

Stream data from an iterator. Blocks until all uploads complete.
def data_chunks():
    for i in range(10):
        timestamps = [time.time() + j * 0.1 for j in range(100)]
        values = [25.0 + j * 0.01 for j in range(100)]
        yield (timestamps, values)

streamer.run(
    data_id,
    channel_name="temperature",
    data_iter=data_chunks(),
    units="C",
)

Parameters

data_id
str
required
The data ID returned by initialize().
channel_name
str
required
Name of the data channel.
data_iter
Iterable[tuple[list[float], list[float]]]
required
Iterator yielding (timestamps, values) tuples for each chunk.
units
str | None
default:"None"
Optional units for the values.

Returns

None.

finalize()

Complete the streaming session and signal the server to begin processing.
streamer.finalize(data_id)

Parameters

data_id
str
required
The data ID returned by initialize().

Returns

None.
Always call finalize() even if the upload fails part-way. This signals the server to clean up resources and mark the stream appropriately.