> ## Documentation Index
> Fetch the complete documentation index at: https://docs.atomscale.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Streaming

> Complete API reference for RHEEDStreamer and TimeseriesStreamer

The `RHEEDStreamer` class provides high-performance streaming of RHEED frames to the Atomscale platform. It bridges Python to a Rust/PyO3 backend for efficient, concurrent packaging and upload of grayscale frames.

<Info>
  [View source on
  GitHub](https://github.com/atomscale-ai/sdk/blob/main/src/atomscale/streaming/rheed_stream.pyi)
</Info>

## Constructor

```python theme={null}
from atomscale.streaming import RHEEDStreamer

streamer = RHEEDStreamer(
    api_key="YOUR_API_KEY",
    endpoint="https://api.atomscale.ai/",
    verbosity=2,
)
```

<ParamField path="api_key" type="str" required>
  API key for authentication.
</ParamField>

<ParamField path="endpoint" type="str | None" default="None">
  Root API endpoint. If not provided, uses the default production endpoint or reads from
  `AS_API_ENDPOINT` environment variable.
</ParamField>

<ParamField path="verbosity" type="int | None" default="None">
  Logging verbosity level. Set to `4` for detailed progress output.
</ParamField>

***

## initialize()

Start a new streaming session and return a data ID for subsequent operations.

```python theme={null}
data_id = streamer.initialize(
    fps=120.0,
    rotations_per_min=15.0,
    chunk_size=240,
    stream_name="Growth run 42",
    physical_sample="GaN-001",
)
```

### Parameters

<ParamField path="fps" type="float" required>
  Frame rate of the capture in frames per second.
</ParamField>

<ParamField path="rotations_per_min" type="float" required>
  Sample rotation speed in rotations per minute. Set to `0.0` for stationary samples.
</ParamField>

<ParamField path="chunk_size" type="int" required>
  Number of frames per upload chunk. Should cover at least 2 seconds of video for optimal
  throughput.
</ParamField>

<ParamField path="stream_name" type="str | None" default="None">
  Human-readable name for this stream. Helps identify runs in the UI.
</ParamField>

<ParamField path="physical_sample" type="str | None" default="None">
  Name of the physical sample to link this stream to. Names are matched case-insensitively. If no
  match is found, a new sample record is created.
</ParamField>

<ParamField path="project_id" type="str | None" default="None">
  UUID of the project to associate with this stream. When provided along with `physical_sample`,
  the project's tracking is automatically updated.
</ParamField>

### Returns

<ResponseField name="data_id" type="str">
  Unique identifier for this streaming session. Use this ID with `push()`, `run()`, and
  `finalize()`.
</ResponseField>

### Example

```python theme={null}
data_id = streamer.initialize(
    fps=120.0,
    rotations_per_min=15.0,
    chunk_size=240,
    stream_name="MBE growth - GaN on sapphire",
    physical_sample="GaN-001",
)
print(f"Started stream: {data_id}")
```

***

## run()

Stream frames using generator/pull mode. The streamer consumes chunks from an iterator and handles pacing and retry logic.

```python theme={null}
streamer.run(data_id, frame_iterator)
```

### Parameters

<ParamField path="data_id" type="str" required>
  The data ID returned by `initialize()`.
</ParamField>

<ParamField path="frames_iter" type="Iterable[NDArray[np.uint8]]" required>
  Iterator yielding frame chunks. Each chunk should be a numpy array of shape `(N, H, W)` with dtype
  `uint8`, where N is the chunk size.
</ParamField>

### Returns

None. Frames are uploaded as they are yielded by the iterator.

### Example

```python theme={null}
import numpy as np
import time


def frame_chunks(frames, *, chunk_size=240, fps=120.0):
    """Yield frame chunks at the capture cadence."""
    seconds_per_chunk = chunk_size / fps
    for start in range(0, len(frames), chunk_size):
        yield frames[start : start + chunk_size]
        time.sleep(seconds_per_chunk)


# Buffered frames (e.g., loaded from disk)
frames = np.random.randint(0, 256, size=(1200, 300, 500), dtype=np.uint8)

data_id = streamer.initialize(fps=10.0, rotations_per_min=0.0, chunk_size=20)
streamer.run(data_id, frame_chunks(frames, chunk_size=20, fps=10.0))
streamer.finalize(data_id)
```

***

## push()

Upload a single chunk of frames in callback/push mode. Use this when frames arrive live from the instrument.

```python theme={null}
streamer.push(data_id, chunk_idx, frames)
```

### Parameters

<ParamField path="data_id" type="str" required>
  The data ID returned by `initialize()`.
</ParamField>

<ParamField path="chunk_idx" type="int" required>
  Zero-based index of this chunk. Chunks must be pushed in order.
</ParamField>

<ParamField path="frames" type="NDArray[np.uint8]" required>
  Numpy array of frames with shape `(N, H, W)` or `(H, W)` for single frames. Must have dtype
  `uint8`.
</ParamField>

### Returns

None. The chunk is queued for upload.

### Example

```python theme={null}
import numpy as np
import time

fps = 120.0
chunk_size = 240
seconds_per_chunk = chunk_size / fps

data_id = streamer.initialize(
    fps=fps,
    rotations_per_min=15.0,
    chunk_size=chunk_size,
)

for chunk_idx in range(10):
    # Capture frames from instrument
    frames = np.random.randint(0, 256, size=(chunk_size, 300, 500), dtype=np.uint8)
    streamer.push(data_id, chunk_idx, frames)
    time.sleep(seconds_per_chunk)

time.sleep(1.0)  # Let in-flight uploads finish
streamer.finalize(data_id)
```

***

## finalize()

Complete the streaming session and signal the server to begin final processing.

```python theme={null}
streamer.finalize(data_id)
```

### Parameters

<ParamField path="data_id" type="str" required>
  The data ID returned by `initialize()`.
</ParamField>

### Returns

None. The stream is marked as complete and queued for final analysis.

<Warning>
  Always call `finalize()` even if the upload fails part-way. This signals the server to clean up
  resources and mark the stream appropriately.
</Warning>

### Example

```python theme={null}
try:
    data_id = streamer.initialize(fps=120.0, rotations_per_min=15.0, chunk_size=240)
    # ... push or run frames ...
finally:
    streamer.finalize(data_id)
```

***

## Frame Format Requirements

RHEED frames must meet these requirements:

| Property  | Requirement                                          |
| --------- | ---------------------------------------------------- |
| Data type | `numpy.uint8` (8-bit unsigned integer)               |
| Shape     | `(N, H, W)` for chunks or `(H, W)` for single frames |
| Color     | Grayscale only                                       |
| Values    | 0-255 intensity range                                |

### Example frame preparation

```python theme={null}
import numpy as np

# From a color image
color_frame = capture_from_camera()  # Shape: (H, W, 3)
gray_frame = np.mean(color_frame, axis=2).astype(np.uint8)

# From 16-bit data
raw_frame = capture_raw()  # Shape: (H, W), dtype: uint16
normalized = ((raw_frame / raw_frame.max()) * 255).astype(np.uint8)

# Batch into chunks
chunk_size = 240
frames = np.stack([capture_frame() for _ in range(chunk_size)])  # (240, H, W)
```

***

## Best Practices

| Recommendation              | Rationale                                        |
| --------------------------- | ------------------------------------------------ |
| Match the capture cadence   | The server expects frames at the declared FPS    |
| Use chunk sizes ≥ 2 seconds | Reduces network overhead and improves throughput |
| Always call `finalize()`    | Allows graceful cleanup even on failure          |
| Use distinct stream names   | Makes runs easier to find later                  |
| Link to physical samples    | Enables cross-run analysis and tracking          |

***

## Error Handling

The streamer raises exceptions for connection failures and invalid parameters. Wrap streaming code in try/finally to ensure `finalize()` is called:

```python theme={null}
data_id = None
try:
    data_id = streamer.initialize(
        fps=120.0,
        rotations_per_min=15.0,
        chunk_size=240,
    )

    for chunk_idx, frames in enumerate(capture_frames()):
        streamer.push(data_id, chunk_idx, frames)

except Exception as e:
    print(f"Streaming failed: {e}")
finally:
    if data_id:
        streamer.finalize(data_id)
```

***

# TimeseriesStreamer

The `TimeseriesStreamer` class streams scalar instrument data (temperature, pressure, growth rate, etc.) to the Atomscale platform. Like `RHEEDStreamer`, it uses a Rust/PyO3 backend.

<Info>
  [View source on
  GitHub](https://github.com/atomscale-ai/sdk/blob/main/src/atomscale/streaming/rheed_stream.pyi)
</Info>

## Constructor

```python theme={null}
from atomscale.streaming import TimeseriesStreamer

streamer = TimeseriesStreamer(
    api_key="YOUR_API_KEY",
    points_per_chunk=100,
    verbosity=2,
)
```

<ParamField path="api_key" type="str" required>
  API key for authentication.
</ParamField>

<ParamField path="endpoint" type="str | None" default="None">
  Root API endpoint. If not provided, uses the default production endpoint or reads from
  `AS_API_ENDPOINT` environment variable.
</ParamField>

<ParamField path="points_per_chunk" type="int" default="100">
  Expected number of data points per chunk. Used for array positioning.
</ParamField>

<ParamField path="verbosity" type="int | None" default="None">
  Logging verbosity level. Set to `4` for detailed progress output.
</ParamField>

***

## initialize()

Start a new timeseries streaming session.

```python theme={null}
data_id = streamer.initialize(
    stream_name="MBE growth - temperature log",
    synth_source_id=42,
    physical_sample="GaN-001",
    project_id="project-uuid",
)
```

### Parameters

<ParamField path="stream_name" type="str | None" default="None">
  Human-readable name for this stream.
</ParamField>

<ParamField path="synth_source_id" type="int | None" default="None">
  Growth instrument ID to link. Must belong to your organization.
</ParamField>

<ParamField path="physical_sample" type="str | None" default="None">
  Name or UUID of a physical sample to associate with the stream. Names are matched
  case-insensitively, or a new sample is created if no match is found.
</ParamField>

<ParamField path="project_id" type="str | None" default="None">
  UUID of the project to associate with this stream.
</ParamField>

### Returns

<ResponseField name="data_id" type="str">
  Unique identifier for this streaming session.
</ResponseField>

***

## push()

Upload a single chunk of data for one channel. Spawns an async upload task and returns immediately.

```python theme={null}
streamer.push(
    data_id,
    chunk_index=0,
    channel_name="temperature",
    timestamps=[1700000000.0, 1700000000.1],
    values=[25.0, 25.1],
    units="C",
)
```

### Parameters

<ParamField path="data_id" type="str" required>
  The data ID returned by `initialize()`.
</ParamField>

<ParamField path="chunk_index" type="int" required>
  Zero-based index of this chunk. Chunks must be pushed in order.
</ParamField>

<ParamField path="channel_name" type="str" required>
  Name of the data channel (e.g., "temperature", "pressure").
</ParamField>

<ParamField path="timestamps" type="list[float]" required>
  Unix epoch timestamps in seconds.
</ParamField>

<ParamField path="values" type="list[float]" required>
  Measured values corresponding to each timestamp.
</ParamField>

<ParamField path="units" type="str | None" default="None">
  Optional units for the values (e.g., "C", "mbar").
</ParamField>

### Returns

None. The chunk is queued for upload.

***

## push\_multi()

Upload data for multiple channels in a single call.

```python theme={null}
streamer.push_multi(data_id, chunk_index=0, channels={
    "temperature": {
        "timestamps": [1700000000.0, 1700000000.1],
        "values": [25.0, 25.1],
        "units": "C",
    },
    "pressure": {
        "timestamps": [1700000000.0, 1700000000.1],
        "values": [1.0, 1.1],
        "units": "mbar",
    },
})
```

### Parameters

<ParamField path="data_id" type="str" required>
  The data ID returned by `initialize()`.
</ParamField>

<ParamField path="chunk_index" type="int" required>
  Zero-based index of this chunk.
</ParamField>

<ParamField path="channels" type="dict[str, dict]" required>
  Mapping of channel names to channel data. Each value dict should contain `timestamps` (list of floats), `values` (list of floats), and optionally `units` (string).
</ParamField>

### Returns

None.

***

## run()

Stream data from an iterator. Blocks until all uploads complete.

```python theme={null}
def data_chunks():
    for i in range(10):
        timestamps = [time.time() + j * 0.1 for j in range(100)]
        values = [25.0 + j * 0.01 for j in range(100)]
        yield (timestamps, values)

streamer.run(
    data_id,
    channel_name="temperature",
    data_iter=data_chunks(),
    units="C",
)
```

### Parameters

<ParamField path="data_id" type="str" required>
  The data ID returned by `initialize()`.
</ParamField>

<ParamField path="channel_name" type="str" required>
  Name of the data channel.
</ParamField>

<ParamField path="data_iter" type="Iterable[tuple[list[float], list[float]]]" required>
  Iterator yielding `(timestamps, values)` tuples for each chunk.
</ParamField>

<ParamField path="units" type="str | None" default="None">
  Optional units for the values.
</ParamField>

### Returns

None.

***

## finalize()

Complete the streaming session and signal the server to begin processing.

```python theme={null}
streamer.finalize(data_id)
```

### Parameters

<ParamField path="data_id" type="str" required>
  The data ID returned by `initialize()`.
</ParamField>

### Returns

None.

<Warning>
  Always call `finalize()` even if the upload fails part-way. This signals the server to clean up
  resources and mark the stream appropriately.
</Warning>
