Skip to content

Streaming Output Examples

This guide provides practical, copy-paste examples for streaming output in various scenarios.

Test streaming with this simple countdown:

Terminal window
curl -X POST https://anewera.dev/api/sessions/test-stream/stream \
-H "Content-Type: application/json" \
-d '{
"code": "import time\nfor i in range(5, 0, -1):\n print(f\"Countdown: {i}\")\n time.sleep(1)\nprint(\"Done!\")"
}'

Perfect for testing LLM integration before connecting a real model:

code = '''
import time
import sys
# Simulate LLM generating a response token by token
response = "The ERA Runtime Agent (ERA) provides isolated code execution environments with real-time streaming capabilities."
words = response.split()
for word in words:
print(word, end=" ", flush=True)
time.sleep(0.1) # Simulate generation latency
print() # Final newline
'''

JavaScript Client:

async function streamLLM(sessionId, code) {
const response = await fetch(
`https://anewera.dev/api/sessions/${sessionId}/stream`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ code }),
}
);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
let output = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const event = JSON.parse(line.slice(6));
if (event.type === 'stdout') {
output += event.content;
process.stdout.write(event.content); // Real-time display
}
}
}
}
return output;
}

Simulate OpenAI’s streaming format:

code = '''
import time
import json
# Simulate OpenAI streaming format
tokens = [
{"role": "assistant", "content": "Hello"},
{"role": "assistant", "content": "!"},
{"role": "assistant", "content": " How"},
{"role": "assistant", "content": " can"},
{"role": "assistant", "content": " I"},
{"role": "assistant", "content": " help"},
{"role": "assistant", "content": " you"},
{"role": "assistant", "content": " today"},
{"role": "assistant", "content": "?"},
]
for token in tokens:
print(json.dumps(token), flush=True)
time.sleep(0.15)
'''
code = '''
import time
def download_file(filename, size_mb):
print(f"Downloading {filename} ({size_mb}MB)...")
for progress in range(0, 101, 10):
bar = "=" * (progress // 5) + ">" + " " * (20 - progress // 5)
print(f"\\r[{bar}] {progress}%", end="", flush=True)
time.sleep(0.3)
print("\\n✓ Download complete!")
download_file("data.zip", 150)
'''
code = '''
import time
steps = [
("Initializing environment", 2),
("Loading dependencies", 3),
("Fetching data", 5),
("Processing records", 8),
("Generating report", 4),
("Cleanup", 1),
]
total = len(steps)
for i, (step, duration) in enumerate(steps, 1):
print(f"[{i}/{total}] {step}...", flush=True)
# Simulate work with progress updates
for j in range(duration):
time.sleep(0.5)
if j % 2 == 0:
print(f" ▸ Progress: {(j+1)/duration*100:.0f}%", flush=True)
print(f" ✓ {step} complete!", flush=True)
print("\\n🎉 All steps completed successfully!")
'''
code = '''
import time
items = [f"item_{i:03d}" for i in range(1, 51)]
batch_size = 10
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
print(f"Processing batch {i//batch_size + 1}/{len(items)//batch_size}:", flush=True)
for item in batch:
print(f" ✓ Processed {item}", flush=True)
time.sleep(0.1)
print(f"Batch complete. Total processed: {min(i+batch_size, len(items))}/{len(items)}\\n", flush=True)
print("All batches processed!")
'''
code = '''
import time
import random
from datetime import datetime
log_levels = ["DEBUG", "INFO", "WARN", "ERROR"]
actions = ["User login", "API call", "Database query", "Cache hit", "File upload"]
for i in range(20):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
level = random.choice(log_levels)
action = random.choice(actions)
print(f"[{timestamp}] [{level:5s}] {action} - Request {i+1}", flush=True)
time.sleep(0.3)
'''
code = '''
import time
import random
services = ["API Server", "Database", "Cache", "Queue", "Storage"]
print("Running health checks...\\n", flush=True)
for service in services:
print(f"Checking {service}...", end="", flush=True)
time.sleep(1)
status = random.choice(["✓ OK", "✓ OK", "✓ OK", "⚠ WARNING"]) # Mostly OK
latency = random.randint(10, 200)
print(f" {status} ({latency}ms)", flush=True)
print("\\nHealth check complete!")
'''
code = '''
import time
# Simulate reading and analyzing CSV data
print("Loading CSV data...", flush=True)
time.sleep(1)
rows = 10000
chunk_size = 1000
print(f"Processing {rows} rows in chunks of {chunk_size}...\\n", flush=True)
for i in range(0, rows, chunk_size):
end = min(i + chunk_size, rows)
print(f"Chunk {i//chunk_size + 1}:", flush=True)
print(f" • Rows {i+1}-{end}", flush=True)
print(f" • Validating...", flush=True)
time.sleep(0.5)
print(f" • Transforming...", flush=True)
time.sleep(0.5)
print(f" ✓ Chunk complete\\n", flush=True)
print("Analysis complete!")
print("Summary: 10,000 rows processed, 0 errors")
'''
code = '''
import time
images = [f"image_{i:04d}.jpg" for i in range(1, 21)]
print(f"Processing {len(images)} images...\\n", flush=True)
for i, image in enumerate(images, 1):
print(f"[{i}/{len(images)}] {image}", flush=True)
steps = ["Loading", "Resizing", "Filtering", "Compressing", "Saving"]
for step in steps:
print(f" → {step}...", end="", flush=True)
time.sleep(0.2)
print(" Done", flush=True)
if i % 5 == 0:
print(f"\\nProgress: {i}/{len(images)} ({i/len(images)*100:.0f}%)\\n", flush=True)
print("\\nAll images processed!")
'''
code = '''
import time
def is_prime(n):
if n < 2:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True
print("Finding prime numbers up to 1000...\\n", flush=True)
primes = []
for num in range(2, 1001):
if is_prime(num):
primes.append(num)
print(f"Found prime: {num}", flush=True)
if num % 100 == 0:
print(f"Progress: Checked up to {num}\\n", flush=True)
time.sleep(0.01) # Throttle output
print(f"\\nTotal primes found: {len(primes)}")
'''
code = '''
import time
import random
def estimate_pi(iterations):
inside_circle = 0
print(f"Running Monte Carlo simulation ({iterations} iterations)...\\n", flush=True)
for i in range(1, iterations + 1):
x = random.random()
y = random.random()
if x*x + y*y <= 1:
inside_circle += 1
if i % 10000 == 0:
pi_estimate = 4 * inside_circle / i
error = abs(pi_estimate - 3.14159) / 3.14159 * 100
print(f"Iteration {i:6d}: π ≈ {pi_estimate:.6f} (error: {error:.2f}%)", flush=True)
time.sleep(0.1)
final_pi = 4 * inside_circle / iterations
print(f"\\nFinal estimate: π ≈ {final_pi:.6f}", flush=True)
return final_pi
estimate_pi(100000)
'''
code = '''
import time
print("=" * 50, flush=True)
print(" ERA Agent Installation Wizard", flush=True)
print("=" * 50, flush=True)
print("", flush=True)
steps = [
"Checking system requirements",
"Downloading packages",
"Installing dependencies",
"Configuring environment",
"Setting up database",
"Running migrations",
"Creating admin user",
"Finalizing installation"
]
for i, step in enumerate(steps, 1):
print(f"[{i}/{len(steps)}] {step}...", end="", flush=True)
# Simulate varying installation times
duration = 1 + (i % 3) * 0.5
time.sleep(duration)
print(" ✓", flush=True)
print("", flush=True)
print("🎉 Installation complete!", flush=True)
print("Run 'era-agent start' to begin.", flush=True)
'''

Complete bash script for consuming streams:

#!/bin/bash
SESSION_ID="streaming-demo"
BASE_URL="https://anewera.dev"
# Create session
echo "Creating session..."
curl -s -X POST "$BASE_URL/api/sessions" \
-H "Content-Type: application/json" \
-d "{\"language\": \"python\", \"session_id\": \"$SESSION_ID\"}" > /dev/null
# Stream execution
echo "Starting stream..."
echo ""
curl -X POST "$BASE_URL/api/sessions/$SESSION_ID/stream" \
-H "Content-Type: application/json" \
-d '{
"code": "import time\nfor i in range(10):\n print(f\"Line {i+1}\")\n time.sleep(0.5)"
}' 2>/dev/null | while IFS= read -r line; do
if [[ $line == data:* ]]; then
JSON="${line#data: }"
TYPE=$(echo "$JSON" | jq -r '.type')
case "$TYPE" in
stdout)
CONTENT=$(echo "$JSON" | jq -r '.content')
echo -n "$CONTENT"
;;
stderr)
CONTENT=$(echo "$JSON" | jq -r '.content')
echo -e "\033[0;31m$CONTENT\033[0m" # Red color
;;
done)
EXIT_CODE=$(echo "$JSON" | jq -r '.exit_code')
DURATION=$(echo "$JSON" | jq -r '.duration')
echo ""
echo "Completed: exit_code=$EXIT_CODE, duration=$DURATION"
;;
error)
ERROR=$(echo "$JSON" | jq -r '.error')
echo "Error: $ERROR"
;;
esac
fi
done
# Cleanup
echo ""
echo "Cleaning up..."
curl -s -X DELETE "$BASE_URL/api/sessions/$SESSION_ID" > /dev/null
echo "Done!"
  1. Always flush output:

    print("message", flush=True) # Python
  2. Add delays for visibility:

    time.sleep(0.1) # Give time to see each update
  3. Use progress indicators:

    print(f"[{i}/{total}] Processing...", flush=True)
  4. Handle completion:

    # Always print a final status
    print("\n✓ Task complete!", flush=True)