Streaming Output Examples
This guide provides practical, copy-paste examples for streaming output in various scenarios.
Quick Test
Section titled “Quick Test”Test streaming with this simple countdown:
curl -X POST https://anewera.dev/api/sessions/test-stream/stream \ -H "Content-Type: application/json" \ -d '{ "code": "import time\nfor i in range(5, 0, -1):\n print(f\"Countdown: {i}\")\n time.sleep(1)\nprint(\"Done!\")" }'LLM Token Streaming
Section titled “LLM Token Streaming”Example 1: Simulated AI Response
Section titled “Example 1: Simulated AI Response”Perfect for testing LLM integration before connecting a real model:
code = '''import timeimport sys
# Simulate LLM generating a response token by tokenresponse = "The ERA Runtime Agent (ERA) provides isolated code execution environments with real-time streaming capabilities."
words = response.split()
for word in words: print(word, end=" ", flush=True) time.sleep(0.1) # Simulate generation latency
print() # Final newline'''JavaScript Client:
async function streamLLM(sessionId, code) { const response = await fetch( `https://anewera.dev/api/sessions/${sessionId}/stream`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ code }), } );
const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; let output = '';
while (true) { const { done, value } = await reader.read(); if (done) break;
buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n'); buffer = lines.pop() || '';
for (const line of lines) { if (line.startsWith('data: ')) { const event = JSON.parse(line.slice(6)); if (event.type === 'stdout') { output += event.content; process.stdout.write(event.content); // Real-time display } } } }
return output;}Example 2: OpenAI-Style Streaming
Section titled “Example 2: OpenAI-Style Streaming”Simulate OpenAI’s streaming format:
code = '''import timeimport json
# Simulate OpenAI streaming formattokens = [ {"role": "assistant", "content": "Hello"}, {"role": "assistant", "content": "!"}, {"role": "assistant", "content": " How"}, {"role": "assistant", "content": " can"}, {"role": "assistant", "content": " I"}, {"role": "assistant", "content": " help"}, {"role": "assistant", "content": " you"}, {"role": "assistant", "content": " today"}, {"role": "assistant", "content": "?"},]
for token in tokens: print(json.dumps(token), flush=True) time.sleep(0.15)'''Progress Bars & Status Updates
Section titled “Progress Bars & Status Updates”Example 3: Download Progress
Section titled “Example 3: Download Progress”code = '''import time
def download_file(filename, size_mb): print(f"Downloading {filename} ({size_mb}MB)...")
for progress in range(0, 101, 10): bar = "=" * (progress // 5) + ">" + " " * (20 - progress // 5) print(f"\\r[{bar}] {progress}%", end="", flush=True) time.sleep(0.3)
print("\\n✓ Download complete!")
download_file("data.zip", 150)'''Example 4: Multi-Step Pipeline
Section titled “Example 4: Multi-Step Pipeline”code = '''import time
steps = [ ("Initializing environment", 2), ("Loading dependencies", 3), ("Fetching data", 5), ("Processing records", 8), ("Generating report", 4), ("Cleanup", 1),]
total = len(steps)
for i, (step, duration) in enumerate(steps, 1): print(f"[{i}/{total}] {step}...", flush=True)
# Simulate work with progress updates for j in range(duration): time.sleep(0.5) if j % 2 == 0: print(f" ▸ Progress: {(j+1)/duration*100:.0f}%", flush=True)
print(f" ✓ {step} complete!", flush=True)
print("\\n🎉 All steps completed successfully!")'''Example 5: Batch Processing
Section titled “Example 5: Batch Processing”code = '''import time
items = [f"item_{i:03d}" for i in range(1, 51)]batch_size = 10
for i in range(0, len(items), batch_size): batch = items[i:i+batch_size]
print(f"Processing batch {i//batch_size + 1}/{len(items)//batch_size}:", flush=True)
for item in batch: print(f" ✓ Processed {item}", flush=True) time.sleep(0.1)
print(f"Batch complete. Total processed: {min(i+batch_size, len(items))}/{len(items)}\\n", flush=True)
print("All batches processed!")'''Live Log Monitoring
Section titled “Live Log Monitoring”Example 6: Application Logs
Section titled “Example 6: Application Logs”code = '''import timeimport randomfrom datetime import datetime
log_levels = ["DEBUG", "INFO", "WARN", "ERROR"]actions = ["User login", "API call", "Database query", "Cache hit", "File upload"]
for i in range(20): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") level = random.choice(log_levels) action = random.choice(actions)
print(f"[{timestamp}] [{level:5s}] {action} - Request {i+1}", flush=True) time.sleep(0.3)'''Example 7: Server Health Check
Section titled “Example 7: Server Health Check”code = '''import timeimport random
services = ["API Server", "Database", "Cache", "Queue", "Storage"]
print("Running health checks...\\n", flush=True)
for service in services: print(f"Checking {service}...", end="", flush=True) time.sleep(1)
status = random.choice(["✓ OK", "✓ OK", "✓ OK", "⚠ WARNING"]) # Mostly OK latency = random.randint(10, 200)
print(f" {status} ({latency}ms)", flush=True)
print("\\nHealth check complete!")'''Data Processing
Section titled “Data Processing”Example 8: CSV Data Analysis
Section titled “Example 8: CSV Data Analysis”code = '''import time
# Simulate reading and analyzing CSV dataprint("Loading CSV data...", flush=True)time.sleep(1)
rows = 10000chunk_size = 1000
print(f"Processing {rows} rows in chunks of {chunk_size}...\\n", flush=True)
for i in range(0, rows, chunk_size): end = min(i + chunk_size, rows)
print(f"Chunk {i//chunk_size + 1}:", flush=True) print(f" • Rows {i+1}-{end}", flush=True) print(f" • Validating...", flush=True) time.sleep(0.5) print(f" • Transforming...", flush=True) time.sleep(0.5) print(f" ✓ Chunk complete\\n", flush=True)
print("Analysis complete!")print("Summary: 10,000 rows processed, 0 errors")'''Example 9: Image Processing Pipeline
Section titled “Example 9: Image Processing Pipeline”code = '''import time
images = [f"image_{i:04d}.jpg" for i in range(1, 21)]
print(f"Processing {len(images)} images...\\n", flush=True)
for i, image in enumerate(images, 1): print(f"[{i}/{len(images)}] {image}", flush=True)
steps = ["Loading", "Resizing", "Filtering", "Compressing", "Saving"] for step in steps: print(f" → {step}...", end="", flush=True) time.sleep(0.2) print(" Done", flush=True)
if i % 5 == 0: print(f"\\nProgress: {i}/{len(images)} ({i/len(images)*100:.0f}%)\\n", flush=True)
print("\\nAll images processed!")'''Real-Time Computation
Section titled “Real-Time Computation”Example 10: Prime Number Generator
Section titled “Example 10: Prime Number Generator”code = '''import time
def is_prime(n): if n < 2: return False for i in range(2, int(n**0.5) + 1): if n % i == 0: return False return True
print("Finding prime numbers up to 1000...\\n", flush=True)
primes = []for num in range(2, 1001): if is_prime(num): primes.append(num) print(f"Found prime: {num}", flush=True)
if num % 100 == 0: print(f"Progress: Checked up to {num}\\n", flush=True)
time.sleep(0.01) # Throttle output
print(f"\\nTotal primes found: {len(primes)}")'''Example 11: Monte Carlo Simulation
Section titled “Example 11: Monte Carlo Simulation”code = '''import timeimport random
def estimate_pi(iterations): inside_circle = 0
print(f"Running Monte Carlo simulation ({iterations} iterations)...\\n", flush=True)
for i in range(1, iterations + 1): x = random.random() y = random.random()
if x*x + y*y <= 1: inside_circle += 1
if i % 10000 == 0: pi_estimate = 4 * inside_circle / i error = abs(pi_estimate - 3.14159) / 3.14159 * 100 print(f"Iteration {i:6d}: π ≈ {pi_estimate:.6f} (error: {error:.2f}%)", flush=True) time.sleep(0.1)
final_pi = 4 * inside_circle / iterations print(f"\\nFinal estimate: π ≈ {final_pi:.6f}", flush=True) return final_pi
estimate_pi(100000)'''Interactive CLI Simulation
Section titled “Interactive CLI Simulation”Example 12: Installation Wizard
Section titled “Example 12: Installation Wizard”code = '''import time
print("=" * 50, flush=True)print(" ERA Agent Installation Wizard", flush=True)print("=" * 50, flush=True)print("", flush=True)
steps = [ "Checking system requirements", "Downloading packages", "Installing dependencies", "Configuring environment", "Setting up database", "Running migrations", "Creating admin user", "Finalizing installation"]
for i, step in enumerate(steps, 1): print(f"[{i}/{len(steps)}] {step}...", end="", flush=True)
# Simulate varying installation times duration = 1 + (i % 3) * 0.5 time.sleep(duration)
print(" ✓", flush=True)
print("", flush=True)print("🎉 Installation complete!", flush=True)print("Run 'era-agent start' to begin.", flush=True)'''Bash Client Script
Section titled “Bash Client Script”Complete bash script for consuming streams:
#!/bin/bash
SESSION_ID="streaming-demo"BASE_URL="https://anewera.dev"
# Create sessionecho "Creating session..."curl -s -X POST "$BASE_URL/api/sessions" \ -H "Content-Type: application/json" \ -d "{\"language\": \"python\", \"session_id\": \"$SESSION_ID\"}" > /dev/null
# Stream executionecho "Starting stream..."echo ""
curl -X POST "$BASE_URL/api/sessions/$SESSION_ID/stream" \ -H "Content-Type: application/json" \ -d '{ "code": "import time\nfor i in range(10):\n print(f\"Line {i+1}\")\n time.sleep(0.5)" }' 2>/dev/null | while IFS= read -r line; do if [[ $line == data:* ]]; then JSON="${line#data: }" TYPE=$(echo "$JSON" | jq -r '.type')
case "$TYPE" in stdout) CONTENT=$(echo "$JSON" | jq -r '.content') echo -n "$CONTENT" ;; stderr) CONTENT=$(echo "$JSON" | jq -r '.content') echo -e "\033[0;31m$CONTENT\033[0m" # Red color ;; done) EXIT_CODE=$(echo "$JSON" | jq -r '.exit_code') DURATION=$(echo "$JSON" | jq -r '.duration') echo "" echo "Completed: exit_code=$EXIT_CODE, duration=$DURATION" ;; error) ERROR=$(echo "$JSON" | jq -r '.error') echo "Error: $ERROR" ;; esac fidone
# Cleanupecho ""echo "Cleaning up..."curl -s -X DELETE "$BASE_URL/api/sessions/$SESSION_ID" > /dev/nullecho "Done!"Tips for Best Results
Section titled “Tips for Best Results”-
Always flush output:
print("message", flush=True) # Python -
Add delays for visibility:
time.sleep(0.1) # Give time to see each update -
Use progress indicators:
print(f"[{i}/{total}] Processing...", flush=True) -
Handle completion:
# Always print a final statusprint("\n✓ Task complete!", flush=True)
See Also
Section titled “See Also”- Streaming Output Guide - Complete streaming documentation
- API Reference - Full API documentation
- Language Examples - Language-specific examples