> ## Documentation Index
> Fetch the complete documentation index at: https://docs.atomscale.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Stream Metrology Data

> Push instrument readings for real-time analysis

The `TimeseriesStreamer` class lets you push scalar instrument data (temperature, pressure, growth rate, etc.) to Atomscale for live tracking and analysis. Like `RHEEDStreamer`, it uses a Rust/PyO3 backend for efficient uploads.

## Prerequisites

* Instrument readings as lists of timestamps and values
* Timestamps as Unix epoch seconds (float)

## Create a streamer

```python theme={null}
from atomscale.streaming import TimeseriesStreamer

streamer = TimeseriesStreamer(
    api_key="YOUR_API_KEY",
    points_per_chunk=100,
)
```

Set `verbosity=4` for detailed progress output.

## Initialize a stream

```python theme={null}
data_id = streamer.initialize(
    stream_name="MBE growth - temperature log",
    physical_sample="GaN-001",
    synth_source_id=42,       # from client.list_growth_instruments()
    project_id="project-uuid",
)
```

The `synth_source_id` links the stream to a registered growth instrument. Use `client.list_growth_instruments()` to find available instruments.

## Push single-channel data

Use `push()` to send one channel at a time. Chunks are uploaded asynchronously.

```python theme={null}
import time

timestamps = [time.time() + i * 0.1 for i in range(100)]
values = [25.0 + i * 0.01 for i in range(100)]

streamer.push(
    data_id,
    chunk_index=0,
    channel_name="temperature",
    timestamps=timestamps,
    values=values,
    units="C",
)
```

## Push multi-channel data

Use `push_multi()` to send multiple channels in a single call:

```python theme={null}
streamer.push_multi(data_id, chunk_index=0, channels={
    "temperature": {
        "timestamps": timestamps,
        "values": temp_values,
        "units": "C",
    },
    "pressure": {
        "timestamps": timestamps,
        "values": pressure_values,
        "units": "mbar",
    },
})
```

## Generator mode

Use `run()` when data is already buffered. It iterates over chunks and handles uploads:

```python theme={null}
def data_chunks(timestamps, values, chunk_size=100):
    for start in range(0, len(timestamps), chunk_size):
        yield (
            timestamps[start : start + chunk_size],
            values[start : start + chunk_size],
        )

data_id = streamer.initialize(stream_name="Pyrometer log")
streamer.run(
    data_id,
    channel_name="temperature",
    data_iter=data_chunks(all_timestamps, all_values),
    units="C",
)
streamer.finalize(data_id)
```

## Finalize

Always call `finalize()` when the stream is complete:

```python theme={null}
streamer.finalize(data_id)
```

<Warning>
  Always call `finalize()` even if the upload fails part-way. This signals the server to clean up
  resources and mark the stream appropriately.
</Warning>

## Best practices

| Recommendation                             | Rationale                                          |
| ------------------------------------------ | -------------------------------------------------- |
| Always call `finalize()`                   | Allows graceful cleanup even on failure            |
| Use `push_multi()` for correlated channels | Keeps channels in sync within each chunk           |
| Link to growth instruments                 | Enables cross-run analysis and instrument tracking |
| Use Unix epoch timestamps                  | The platform expects seconds since epoch           |

## Next steps

<CardGroup cols={2}>
  <Card title="Poll Time Series" icon="clock" href="/sdk/poll-timeseries">
    Monitor streaming analysis in real time.
  </Card>

  <Card title="Streaming Reference" icon="code" href="/sdk/reference/streaming">
    Full streaming API documentation.
  </Card>
</CardGroup>
