Streaming LLM responses in Rust looks simple until you hit the async runtime footguns. A naive implementation that reads the full response before forwarding blocks the Tokio executor thread. An implementation that buffers incorrectly introduces latency spikes that defeat the purpose of streaming. Here is the pattern that works correctly under concurrency.
Analysis Briefing
- Topic: Non-blocking Grok API streaming implementation in async Rust with Tokio
- Analyst: Mike D (@MrComputerScience)
- Context: A technical briefing developed with Claude Sonnet 4.6
- Source: Pithy Cyborg | Pithy Security
- Key Question: Why does the obvious Rust streaming implementation block Tokio, and what does the correct one look like?
The Footgun: Blocking Inside an Async Context
The most common mistake is calling a blocking HTTP operation inside a Tokio task. Tokio runs on a fixed-size thread pool. If a task blocks one of those threads waiting for network I/O, every other task waiting to be scheduled on that thread stalls.
// WRONG: This blocks the Tokio thread
#[tokio::main]
async fn main() {
// reqwest's blocking client inside async context
let client = reqwest::blocking::Client::new(); // blocks thread
let response = client.post("https://api.x.ai/v1/chat/completions")
.json(&body)
.send()
.unwrap();
// ...
}
The fix is always reqwest::Client (the async client), never reqwest::blocking::Client, inside async code. But the deeper footgun is what happens when you process the streaming response body.
The Correct Streaming Pattern
Grok’s streaming API sends Server-Sent Events (SSE): lines starting with data: followed by a JSON object for each token, terminated by data: [DONE].
The correct approach reads the response body as a stream of bytes, processes it line by line, and yields each parsed token without buffering the full response:
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use futures_util::StreamExt;
use std::io::BufRead;
use tokio::sync::mpsc;
#[derive(Debug, Serialize)]
struct ChatRequest {
model: String,
messages: Vec<Message>,
stream: bool,
max_tokens: u32,
}
#[derive(Debug, Serialize, Deserialize)]
struct Message {
role: String,
content: String,
}
pub async fn stream_grok(
prompt: &str,
tx: mpsc::Sender<String>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = Client::new();
let api_key = std::env::var("XAI_API_KEY")?;
let request_body = ChatRequest {
model: "grok-4".to_string(),
messages: vec![Message {
role: "user".to_string(),
content: prompt.to_string(),
}],
stream: true,
max_tokens: 1024,
};
let response = client
.post("https://api.x.ai/v1/chat/completions")
.bearer_auth(&api_key)
.json(&request_body)
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await?;
return Err(format!("API error {}: {}", status, body).into());
}
// Stream the response body as bytes
let mut byte_stream = response.bytes_stream();
let mut buffer = String::new();
while let Some(chunk) = byte_stream.next().await {
let chunk = chunk?;
let chunk_str = std::str::from_utf8(&chunk)?;
buffer.push_str(chunk_str);
// Process complete lines
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim().to_string();
buffer = buffer[newline_pos + 1..].to_string();
if line.is_empty() || line == ":" {
continue; // SSE keepalive or empty line
}
if line == "data: [DONE]" {
return Ok(());
}
if let Some(data) = line.strip_prefix("data: ") {
if let Ok(json) = serde_json::from_str::<Value>(data) {
if let Some(content) = json
.get("choices")
.and_then(|c| c.get(0))
.and_then(|c| c.get("delta"))
.and_then(|d| d.get("content"))
.and_then(|c| c.as_str())
{
if !content.is_empty() {
// Send to consumer without blocking
if tx.send(content.to_string()).await.is_err() {
return Ok(()); // Receiver dropped, exit cleanly
}
}
}
}
}
}
}
Ok(())
}
The key properties of this implementation: bytes_stream() returns a Stream that yields chunks as they arrive over the network without blocking. The line buffer handles chunk boundaries correctly when SSE lines are split across chunks. The mpsc::Sender lets the consumer process tokens without the producer waiting for each one to be processed.
Wiring the Stream to a Web Response
The most common use case is forwarding the stream to an HTTP client. With Axum:
use axum::{
routing::post,
Router,
response::{Response, IntoResponse},
http::{StatusCode, header},
body::Body,
extract::Json,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use futures_util::StreamExt;
async fn chat_handler(
Json(payload): Json<serde_json::Value>,
) -> impl IntoResponse {
let prompt = payload["prompt"]
.as_str()
.unwrap_or("")
.to_string();
let (tx, rx) = mpsc::channel::<String>(32);
// Spawn the Grok streaming task
tokio::spawn(async move {
if let Err(e) = stream_grok(&prompt, tx).await {
eprintln!("Streaming error: {}", e);
}
});
// Convert the receiver into a byte stream for the HTTP response
let stream = ReceiverStream::new(rx)
.map(|token| {
let sse_line = format!("data: {}\n\n", token);
Ok::<_, std::convert::Infallible>(
axum::body::Bytes::from(sse_line)
)
});
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/event-stream")
.header(header::CACHE_CONTROL, "no-cache")
.header("X-Accel-Buffering", "no") // Disable nginx buffering
.body(Body::from_stream(stream))
.unwrap()
}
#[tokio::main]
async fn main() {
let app = Router::new().route("/chat", post(chat_handler));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
The mpsc::channel(32) buffer size controls backpressure. If the HTTP client reads tokens slower than Grok produces them, the channel fills and the tx.send().await in the streaming task waits until the client catches up. Backpressure propagates from the client through the channel to the Grok API connection without dropping tokens or consuming unbounded memory.
The X-Accel-Buffering: no Header Matters
Nginx buffers responses by default. A streaming SSE response behind an Nginx proxy without X-Accel-Buffering: no delivers all tokens at once when the connection closes, not as they arrive. This defeats the purpose of streaming. Always include this header on SSE responses if you are behind any proxy.
The same issue can appear with Cloudflare, AWS ALB, and other reverse proxies. Each has its own configuration to disable response buffering. Verify that tokens arrive incrementally at the client before shipping.
What This Means For You
- Use
reqwest::Client, neverreqwest::blocking::Client, inside async Tokio code. The blocking client starves the thread pool under concurrency in a way that only appears at load, not in development. - Process the response body as a
bytes_stream(), not a buffered string. Buffering the full response before processing defeats streaming and introduces unbounded memory usage for long responses. - Set
X-Accel-Buffering: noon all SSE endpoints and verify token delivery is incremental before shipping. Proxy buffering is the most common reason a correct streaming implementation appears non-streaming in production. - Size the
mpsc::channelbuffer based on your expected token rate and client read speed. A buffer of 32 handles typical token rates without excessive memory overhead. Too small causes backpressure against Grok’s stream. Too large consumes memory when clients are slow.
Enjoyed this deep dive? Join my inner circle:
- Pithy Cyborg → AI news made simple without hype.
- Pithy Security → Stay ahead of cybersecurity threats.
