Quick Start

Get IonStream running locally in under a minute. You'll need the .NET 10 SDK installed.

ℹ️

Early Access: IonStream is in active development. Core messaging, Raft consensus, and .NET client are production-ready. Some advanced features (mTLS, SASL, admin CLI) are planned.

Prerequisites

  • .NET 10 SDK — for building from source
  • Docker Desktop — required only for Linux AOT builds
  • No other external dependencies — no ZooKeeper, no Kafka, nothing else

Build & Run

1

Clone the repository

bash
git clone https://github.com/ElectricHavoc/IonStream.git
cd IonStream
2

Build the solution

bash
dotnet build
3

Start a development server (HTTP, no TLS)

Set IONSTREAM_USE_HTTP=true for local development to skip TLS certificate setup.

bash
IONSTREAM_USE_HTTP=true \
NODE_ID=node1 \
dotnet run --project src/server/IonStream.Server

On Windows:

powershell
$env:IONSTREAM_USE_HTTP = "true"
$env:NODE_ID = "node1"
dotnet run --project src/server/IonStream.Server
4

Connect a .NET client

C#
await using var client = new IonStreamClient("localhost", 9000);

// Produce a message
await client.SendAsync(new ProducerMessage(
    "my-topic",
    Encoding.UTF8.GetBytes("Hello IonStream!"),
    Encoding.UTF8.GetBytes("key-1")
));

// Consume with IAsyncEnumerable
await using var consumer = new Consumer(client);
await consumer.SubscribeAsync("my-topic", "my-group");

await foreach (var msg in consumer.ConsumeAsync())
{
    Console.WriteLine($"offset {msg.Offset}: {Encoding.UTF8.GetString(msg.Value.Span)}");
}
5

Run the test suite

powershell
.\run-unit-tests.ps1
# Coverage summary written to tests/TestResults/summary.txt

Native AOT Binaries

IonStream.Server compiles to a native AOT binary — no .NET runtime required on the target machine. Use build-aot.ps1 to produce win-x64 and linux-x64 artifacts.

powershell
cd src/server
.\build-aot.ps1

# Skip targets as needed:
.\build-aot.ps1 -SkipLinux    # Windows only
.\build-aot.ps1 -SkipWindows  # Linux only (requires Docker Desktop)

Output lands in src/server/dist/{rid}/. The Linux binary builds inside a mcr.microsoft.com/dotnet/sdk:10.0 Docker container — Docker Desktop must be running for the Linux build.

💡

AOT binaries start instantly with minimal memory — ideal for container deployments where warm-up time and image size matter.

Cluster Setup

Single Node (Development)

The simplest deployment — one broker, no replication, no TLS.

bash
NODE_ID=node1
IONSTREAM_USE_HTTP=true
ASPNETCORE_URLS=http://0.0.0.0:9000

Three-Node Cluster (Static)

For a production-grade cluster, use SERVER_PEERS to define the full peer list. All nodes must be started with the same peer configuration.

bash
# Node 1 (start on host-1)
NODE_ID=node1
ASPNETCORE_URLS=https://0.0.0.0:9000
SERVER_PEERS=node1=https://host-1:9000,node2=https://host-2:9000,node3=https://host-3:9000
IONSTREAM_CLUSTER_SECRET=my-production-secret

# Node 2 (start on host-2)
NODE_ID=node2
ASPNETCORE_URLS=https://0.0.0.0:9000
SERVER_PEERS=node1=https://host-1:9000,node2=https://host-2:9000,node3=https://host-3:9000
IONSTREAM_CLUSTER_SECRET=my-production-secret

# Node 3 (start on host-3)
NODE_ID=node3
ASPNETCORE_URLS=https://0.0.0.0:9000
SERVER_PEERS=node1=https://host-1:9000,node2=https://host-2:9000,node3=https://host-3:9000
IONSTREAM_CLUSTER_SECRET=my-production-secret
ℹ️

All nodes in the same cluster must share the same IONSTREAM_CLUSTER_SECRET. This is used to deterministically derive the shared cluster CA for mutual TLS.

Dynamic Expansion (No Restart)

Add a new node to a running cluster without any downtime using PROXY_MANAGERS. Point the new node at any existing broker URI.

bash
# New node 4 — joins the running cluster at node1/node2
NODE_ID=node4
ASPNETCORE_URLS=https://0.0.0.0:9000
PROXY_MANAGERS=https://host-1:9000,https://host-2:9000
IONSTREAM_CLUSTER_SECRET=my-production-secret
  • Odd total node count — new node is automatically promoted into the Raft quorum
  • Even total node count — new node joins as a read-only mirror until another node is added
  • PROXY_MANAGERS and SERVER_PEERS are mutually exclusive — use one or the other

Cross-Cluster Replication

Mirror topics from a remote cluster using numbered REPLICA_N_* environment variables. Multiple remote targets are supported by incrementing N.

bash
# Required: name this cluster to prevent replication loops
CLUSTER_NAME=us-east

# Replicate from a remote EU cluster
REPLICA_0_SERVERS=https://eu-1:9000,https://eu-2:9000
REPLICA_0_TOPICS=events,transactions,audit
REPLICA_0_CLUSTER_SECRET=eu-cluster-secret

# Replicate a second remote cluster
REPLICA_1_SERVERS=https://ap-1:9000
REPLICA_1_TOPICS=events
REPLICA_1_CLUSTER_SECRET=ap-cluster-secret

TLS & Security

TLS is enabled by default. IonStream auto-generates a cluster CA from IONSTREAM_CLUSTER_SECRET and issues per-node certificates on first run.

Default (Auto-Generated Certs)

Set the same IONSTREAM_CLUSTER_SECRET on every node. Certificates are generated automatically and persisted to <base>/certs/server.pfx.

bash
IONSTREAM_CLUSTER_SECRET=your-shared-cluster-secret-here

Custom Certificates (PKCS#12)

Supply your own CA and node certificates as PKCS#12 (.pfx) files. The CA cert is used to sign and validate inter-node connections.

bash
IONSTREAM_TLS_CA_CERT=/etc/ionstream/ca.pfx
IONSTREAM_TLS_CA_CERT_PASSWORD=ca-cert-password

IONSTREAM_TLS_CERT=/etc/ionstream/node.pfx
IONSTREAM_TLS_CERT_PASSWORD=node-cert-password

Development Mode (No TLS)

⚠️

Development only. IONSTREAM_USE_HTTP=true disables TLS entirely. Never use in production.

bash
IONSTREAM_USE_HTTP=true

Environment Variables

IonStream is configured entirely through environment variables — no config files required.

Server Variables

VariableDefaultDescription
NODE_ID node1 Node identity. Can be a plain ID or a full URI (https://user@host:8080) to set advertised address simultaneously.
ASPNETCORE_URLS Standard ASP.NET Core URL binding, e.g. https://0.0.0.0:9000.
SERVER_PEERS none Comma/semicolon-separated peer list for static cluster formation. Format: id=uri or bare URI. Mutually exclusive with PROXY_MANAGERS.
PROXY_MANAGERS none Comma-separated URIs of existing cluster nodes for dynamic join. Node starts in mirror mode and self-registers without cluster restart.
LOG_DIR <base>/data/<nodeId> Directory for WAL segment and index files.
IONSTREAM_USE_HTTP false Disable TLS and use plain HTTP/2. Development only.
IONSTREAM_CLUSTER_SECRET IonStream-Default-Cluster-v1 Shared secret for deterministic CA derivation. All cluster nodes must use the same value.
IONSTREAM_TLS_CA_CERT auto-generated Path to PKCS#12 CA certificate. When set, overrides secret-derived CA.
IONSTREAM_TLS_CA_CERT_PASSWORD password Password for the CA certificate file.
IONSTREAM_TLS_CERT <base>/certs/server.pfx Path to the node's PKCS#12 server certificate. Auto-generated if absent.
IONSTREAM_TLS_CERT_PASSWORD password Password for the server certificate file.

Cross-Cluster Replication Variables

Replace N with 0, 1, 2, … for each remote target.

VariableDefaultDescription
CLUSTER_NAME "" Logical name for this cluster. Required for replication loop prevention. Stamped on every produced message.
REPLICA_N_SERVERS Comma-separated broker addresses for this remote cluster. Required.
REPLICA_N_TOPICS Comma-separated topic names to mirror from this remote cluster. Required.
REPLICA_N_USE_HTTP false Use plain HTTP/2 to connect to this remote cluster (no TLS).
REPLICA_N_CLUSTER_SECRET same as local TLS CA derivation secret for this remote cluster.
REPLICA_N_TLS_CA_CERT "" Path to a PKCS#12 CA cert for this remote cluster. Takes precedence over _CLUSTER_SECRET.

Client Variables

VariableDefaultDescription
PRODUCER_BUFFER_SIZE 1 Capacity of the ChannelWriter returned by CreateProducerChannel(). Increase for higher-throughput buffered production.
CONSUMER_BUFFER_SIZE 1 Capacity of the ChannelReader returned by CreateConsumerChannel(). Increase to buffer more messages ahead of consumption.

.NET / C# Client

The .NET client is the primary SDK, built natively on System.Threading.Channels and ReadOnlyMemory<byte> for zero-copy, high-throughput messaging.

Basic Produce & Consume

C#
using IonStream.Abstractions;
using IonStream.Client;
using System.Text;

await using var client = new IonStreamClient("localhost", 9000);

// Produce
var msg = Encoding.UTF8.GetBytes("Hello IonStream!");
var key = Encoding.UTF8.GetBytes("key-1");
await client.SendAsync(new ProducerMessage("my-topic", msg, key));

// Consume with IAsyncEnumerable
await using var consumer = new Consumer(client);
await consumer.SubscribeAsync("my-topic", "my-group");

await foreach (var received in consumer.ConsumeAsync())
{
    var text = Encoding.UTF8.GetString(received.Value.Span);
    Console.WriteLine($"[offset {received.Offset}] {text}");
}

Channels API (Buffered)

C#
using System.Threading.Channels;

await using var client = new IonStreamClient("localhost", 9000);

// Buffered producer channel
ChannelWriter<ProducerMessage> producer = client.CreateProducerChannel();
await producer.WriteAsync(new ProducerMessage("events", payload, key));

// Buffered consumer channel
ChannelReader<StreamMessage> reader =
    client.CreateConsumerChannel("events", "my-group");

await foreach (var msg in reader.ReadAllAsync())
{
    // process msg...
}

Retry & DLQ (MessageHandle API)

Create a topic with retry stages and a DLQ, then consume using the ack/nack handle API. Failed messages automatically route through staged retries and into the DLQ.

C#
await using var client = new IonStreamClient("localhost", 9000);

// Create topic with 3-stage retry (1s, 5s, 15s) and DLQ enabled
await client.CreateTopicAsync("orders", partitions: 1,
    retryTimingsMs: [1000, 5000, 15000], dlqEnabled: true);

// Consume with explicit ack/nack
await using var consumer = new Consumer(client);
await consumer.SubscribeAsync("orders", "order-svc");

await consumer.RunAsync(async handle =>
{
    try
    {
        await ProcessOrderAsync(handle.Message);
        await handle.AckAsync();   // advances the committed offset
    }
    catch
    {
        await handle.NackAsync(); // routes to retry queue → DLQ on exhaustion
    }
});

// Inspect and replay DLQ messages via the client SDK
DlqListResult dlq = await client.ListDlqAsync("orders", "order-svc");
foreach (var entry in dlq.Messages)
    Console.WriteLine($"DLQ offset {entry.DlqOffset}: {entry.OriginalTopic} stage {entry.RetryStage}");

await client.ReplayDlqAsync("orders", "order-svc"); // replay all

Go Client

The Go client uses the REST /api/v1 endpoint exposed by IonStream.Broker. Full REST API coverage.

go
import "github.com/ElectricHavoc/ionstream/clients/go"

client := ionstream.NewClient(ionstream.Options{
    SeedNodes: []ionstream.SeedNode{
        {Host: "localhost", Port: 8080},
    },
    UseHTTP: true,
})

metadata, err := client.GetMetadata(false)

err = client.Produce(ionstream.ProduceMessage{
    Topic: "demo",
    Value: []byte("hello"),
})
💡

The Go client requires Docker to build — Go is not assumed to be installed locally. Use build_go.ps1 which runs the build inside a Docker container.

TypeScript / JavaScript Client

Full gRPC client for Node.js using @grpc/proto-loader. Also works in browsers via the REST API or gRPC-Web.

TypeScript
import { IonStreamClient } from "ionstream-typescript-client";

const client = new IonStreamClient({
  seedNodes: [{ host: "localhost", port: 8080 }],
  useHttp: true,
});

const metadata = await client.getMetadata();
await client.produce({ topic: "demo", value: Buffer.from("hello") });

const response = await client.consume({
  topic: "demo",
  partition: 0,
  offset: 0,
});
JavaScript
const { IonStreamJsClient } = require("./ionstream-client");

async function main() {
  const client = new IonStreamJsClient({
    seedNodes: [{ host: "localhost", port: 8080 }],
    useHttp: true,
  });

  const metadata = await client.getMetadata();
  await client.produce({ topic: "demo", value: Buffer.from("hello") });
}

main().catch((err) => { console.error(err); process.exitCode = 1; });

Python Client

Async Python client using gRPC. Foundation-level coverage for produce, consume, and metadata operations.

Python
from ionstream_client import IonStreamClient, ProduceMessage, SeedNode

client = IonStreamClient(
    [SeedNode(host="localhost", port=8080)],
    use_http=True
)

metadata = await client.get_metadata()

await client.produce(ProduceMessage(
    topic="demo",
    value=b"hello"
))

Rust Client

Async Rust client using Tokio and gRPC. Foundation-level coverage.

Rust
use ionstream_rust_client::{ClientOptions, IonStreamClient, ProduceMessage, SeedNode};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let options = ClientOptions::new(vec![SeedNode::new("localhost", 8080)])
        .with_http(true);

    let client = IonStreamClient::connect(options).await?;

    client.get_metadata(false).await?;
    client.produce(ProduceMessage::new("demo", b"hello".to_vec())).await?;
    Ok(())
}

Java Client

Java client with unary gRPC coverage for metadata, produce, and consume operations.

Java
import com.ionstream.client.IonStreamClient;
import java.nio.charset.StandardCharsets;
import java.util.List;

public final class Example {
    public static void main(String[] args) {
        var options = IonStreamClient.ClientOptions.defaults(
            List.of(new IonStreamClient.SeedNode("localhost", 8080))
        );

        try (var client = new IonStreamClient(options)) {
            client.getMetadata(false);
            client.produce(IonStreamClient.ProduceMessage.create(
                "demo",
                "hello".getBytes(StandardCharsets.UTF_8)
            ));
        }
    }
}

Prometheus & Grafana

IonStream exports OpenTelemetry metrics via a Prometheus endpoint. Pre-built Grafana dashboards are included in the repository.

Docker Compose Observability Stack

A full observability stack with Prometheus, Grafana, cAdvisor, and node-exporter is included:

bash
docker compose up -d

What's included:

Metric Categories

Extensibility

IonStream exposes plugin interfaces for intercepting messages at every layer:

C#
// Producer-side interception
public class TracingProducerInterceptor : IProducerInterceptor
{
    public async ValueTask OnProduceAsync(ProducerMessage msg, ProducerDelegate next)
    {
        // Add tracing headers, validate, transform...
        msg.Headers["trace-id"] = Activity.Current?.Id ?? "";
        await next(msg);
    }
}

// Consumer-side interception
public class LoggingConsumerInterceptor : IConsumerInterceptor
{
    public async ValueTask OnConsumeAsync(StreamMessage msg, ConsumerDelegate next)
    {
        logger.LogInformation("consuming offset {O}", msg.Offset);
        await next(msg);
    }
}

Register interceptors on the client builder. IBrokerInterceptor provides server-side message interception for cross-cutting concerns like audit logging or schema validation.