Skip to content

Storage Proxy

The Storage Proxy gives your sandboxed code direct access to Cloudflare’s storage primitives—KV for key-value data, D1 for SQL databases, and R2 for object storage. The SDK is automatically injected into every session, making storage instantly available without any setup.

When you create a session, ERA automatically:

  1. Injects SDK files into the VM (era_storage.py and era_storage.js)
  2. Sets environment variables (ERA_STORAGE_URL) pointing to the storage proxy
  3. Provides storage access through a simple HTTP API with namespacing
┌─────────────────────────────┐
│ Your Code (Python/JS) │
│ import era_storage │
│ era_storage.kv.set(...) │
└────────────┬────────────────┘
│ HTTP via ERA_STORAGE_URL
┌─────────────────────────────┐
│ Storage Proxy │
│ /api/storage/kv/... │
└────────────┬────────────────┘
┌──────┴──────┬──────────┐
▼ ▼ ▼
┌─────┐ ┌─────┐ ┌─────┐
│ KV │ │ D1 │ │ R2 │
└─────┘ └─────┘ └─────┘

The SDK is automatically available. Just import and use:

import era_storage
# Store user preferences
era_storage.kv.set("myapp", "user:123", '{"theme": "dark"}')
# Retrieve them
prefs = era_storage.kv.get("myapp", "user:123")
print(f"User preferences: {prefs}")
const { KVStorage, D1Storage, R2Storage } = require('./era_storage.js');
// Store user preferences
await KVStorage.set("myapp", "user:123", '{"theme": "dark"}');
// Retrieve them
const prefs = await KVStorage.get("myapp", "user:123");
console.log(`User preferences: ${prefs}`);

Key-Value storage for lightweight data like configs, user preferences, and cached data.

Python Example:

import era_storage
import json
# Set a value
success = era_storage.kv.set(
"myapp", # namespace
"config:api", # key
json.dumps({"timeout": 30}) # value (string)
)
# Get a value
config_json = era_storage.kv.get("myapp", "config:api")
config = json.loads(config_json)
print(f"Timeout: {config['timeout']}")
# List keys with prefix
keys = era_storage.kv.list("myapp", prefix="config:", limit=100)
print(f"Found {len(keys)} config keys")
# Delete a key
era_storage.kv.delete("myapp", "config:api")

JavaScript Example:

const { KVStorage } = require('./era_storage.js');
// Set a value
const success = await KVStorage.set(
"myapp", // namespace
"config:api", // key
JSON.stringify({timeout: 30}) // value (string)
);
// Get a value
const configJson = await KVStorage.get("myapp", "config:api");
const config = JSON.parse(configJson);
console.log(`Timeout: ${config.timeout}`);
// List keys with prefix
const keys = await KVStorage.list("myapp", "config:", 100);
console.log(`Found ${keys.length} config keys`);
// Delete a key
await KVStorage.delete("myapp", "config:api");

SQL database for structured data, relationships, and complex queries.

Python Example:

import era_storage
# Create table
era_storage.d1.exec("myapp", """
CREATE TABLE IF NOT EXISTS todos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
completed BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Insert data
era_storage.d1.exec("myapp",
"INSERT INTO todos (title) VALUES (?)",
["Write storage docs"]
)
# Query data
todos = era_storage.d1.query("myapp",
"SELECT * FROM todos WHERE completed = ?",
[0]
)
for todo in todos:
print(f"Todo: {todo['title']}")
# Update data
era_storage.d1.exec("myapp",
"UPDATE todos SET completed = 1 WHERE id = ?",
[1]
)

JavaScript Example:

const { D1Storage } = require('./era_storage.js');
// Create table
await D1Storage.exec("myapp", `
CREATE TABLE IF NOT EXISTS todos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
completed BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`);
// Insert data
await D1Storage.exec("myapp",
"INSERT INTO todos (title) VALUES (?)",
["Write storage docs"]
);
// Query data
const todos = await D1Storage.query("myapp",
"SELECT * FROM todos WHERE completed = ?",
[0]
);
todos.forEach(todo => {
console.log(`Todo: ${todo.title}`);
});
// Update data
await D1Storage.exec("myapp",
"UPDATE todos SET completed = 1 WHERE id = ?",
[1]
);

Object storage for files, images, and binary data.

Python Example:

import era_storage
# Upload a file
with open("avatar.jpg", "rb") as f:
image_data = f.read()
era_storage.r2.put("myapp", "avatars/user123.jpg", image_data,
metadata={"user_id": "123", "uploaded_by": "web_ui"})
# Download a file
avatar_data = era_storage.r2.get("myapp", "avatars/user123.jpg")
if avatar_data:
with open("downloaded_avatar.jpg", "wb") as f:
f.write(avatar_data)
# List objects
objects = era_storage.r2.list("myapp", prefix="avatars/", limit=50)
print(f"Found {len(objects)} avatars")
for obj in objects:
print(f" {obj['key']} - {obj['size']} bytes")
# Delete a file
era_storage.r2.delete("myapp", "avatars/user123.jpg")

JavaScript Example:

const { R2Storage } = require('./era_storage.js');
const fs = require('fs');
// Upload a file
const imageData = fs.readFileSync("avatar.jpg");
await R2Storage.put("myapp", "avatars/user123.jpg", imageData,
{user_id: "123", uploaded_by: "web_ui"});
// Download a file
const avatarData = await R2Storage.get("myapp", "avatars/user123.jpg");
if (avatarData) {
fs.writeFileSync("downloaded_avatar.jpg", avatarData);
}
// List objects
const objects = await R2Storage.list("myapp", "avatars/", 50);
console.log(`Found ${objects.length} avatars`);
objects.forEach(obj => {
console.log(` ${obj.key} - ${obj.size} bytes`);
});
// Delete a file
await R2Storage.delete("myapp", "avatars/user123.jpg");

Namespaces let you organize resources by application or feature. All storage operations are namespaced to prevent collisions.

import era_storage
# App 1: User management
era_storage.kv.set("users_app", "user:123", '{"name": "Alice"}')
era_storage.d1.exec("users_app", "CREATE TABLE profiles ...")
# App 2: Content management
era_storage.kv.set("content_app", "post:456", '{"title": "Hello"}')
era_storage.d1.exec("content_app", "CREATE TABLE articles ...")
# Shared: Application-wide settings
era_storage.kv.set("shared", "api_key", "sk-...")
# Access across namespaces
api_key = era_storage.kv.get("shared", "api_key")
user = era_storage.kv.get("users_app", "user:123")

Internal Storage:

  • KV: namespace:keyusers_app:user:123
  • D1: Table prefixed → users_app_profiles
  • R2: Path prefixed → users_app/avatar.jpg
import era_storage
import json
class UserProfile:
@staticmethod
def save(user_id, profile_data):
"""Save user profile to KV"""
key = f"user:{user_id}:profile"
era_storage.kv.set("myapp", key, json.dumps(profile_data))
@staticmethod
def load(user_id):
"""Load user profile from KV"""
key = f"user:{user_id}:profile"
data = era_storage.kv.get("myapp", key)
return json.loads(data) if data else None
@staticmethod
def update_avatar(user_id, image_data):
"""Upload user avatar to R2"""
key = f"avatars/{user_id}.jpg"
era_storage.r2.put("myapp", key, image_data)
# Usage
UserProfile.save("123", {
"name": "Alice",
"email": "alice@example.com",
"theme": "dark"
})
profile = UserProfile.load("123")
print(f"Welcome back, {profile['name']}!")
import era_storage
# Initialize schema
era_storage.d1.exec("todo_app", """
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
completed BOOLEAN DEFAULT 0,
priority INTEGER DEFAULT 0,
due_date TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create task
def create_task(title, description, priority=0):
era_storage.d1.exec("todo_app",
"INSERT INTO tasks (title, description, priority) VALUES (?, ?, ?)",
[title, description, priority]
)
# Get pending tasks
def get_pending_tasks():
return era_storage.d1.query("todo_app",
"SELECT * FROM tasks WHERE completed = 0 ORDER BY priority DESC, created_at ASC"
)
# Complete task
def complete_task(task_id):
era_storage.d1.exec("todo_app",
"UPDATE tasks SET completed = 1 WHERE id = ?",
[task_id]
)
# Usage
create_task("Write docs", "Document storage proxy features", priority=1)
create_task("Add tests", "Test all storage operations", priority=2)
tasks = get_pending_tasks()
for task in tasks:
print(f"[Priority {task['priority']}] {task['title']}")
import era_storage
import json
from datetime import datetime
def upload_file(filename, file_data, user_id):
"""Upload file with metadata"""
# Store file in R2
key = f"uploads/{user_id}/{filename}"
era_storage.r2.put("myapp", key, file_data, metadata={
"uploaded_by": user_id,
"original_filename": filename
})
# Store file record in D1
era_storage.d1.exec("myapp",
"INSERT INTO files (filename, user_id, size, uploaded_at) VALUES (?, ?, ?, ?)",
[filename, user_id, len(file_data), datetime.now().isoformat()]
)
return key
def get_user_files(user_id):
"""List all files for a user"""
return era_storage.d1.query("myapp",
"SELECT * FROM files WHERE user_id = ? ORDER BY uploaded_at DESC",
[user_id]
)
def download_file(user_id, filename):
"""Download a file"""
key = f"uploads/{user_id}/{filename}"
return era_storage.r2.get("myapp", key)
# Usage
with open("document.pdf", "rb") as f:
upload_file("document.pdf", f.read(), "user123")
files = get_user_files("user123")
print(f"User has {len(files)} files")
import era_storage
import json
import time
def get_data_with_cache(cache_key, fetch_function, ttl_seconds=300):
"""Get data with KV caching"""
# Try cache first
cached = era_storage.kv.get("cache", cache_key)
if cached:
data = json.loads(cached)
# Check if still valid
if time.time() - data['cached_at'] < ttl_seconds:
print("Cache hit!")
return data['value']
# Cache miss - fetch fresh data
print("Cache miss - fetching...")
value = fetch_function()
# Store in cache
era_storage.kv.set("cache", cache_key, json.dumps({
'value': value,
'cached_at': time.time()
}))
return value
# Usage
def fetch_expensive_data():
# Simulate expensive operation
time.sleep(2)
return {"result": "expensive data"}
# First call - slow
data = get_data_with_cache("expensive_query", fetch_expensive_data)
# Second call - fast!
data = get_data_with_cache("expensive_query", fetch_expensive_data)
import era_storage
import json
# Auth service
class AuthService:
@staticmethod
def create_session(user_id, session_data):
session_id = generate_session_id()
era_storage.kv.set("auth", f"session:{session_id}",
json.dumps({
'user_id': user_id,
'data': session_data
})
)
return session_id
@staticmethod
def verify_session(session_id):
data = era_storage.kv.get("auth", f"session:{session_id}")
return json.loads(data) if data else None
# Analytics service
class Analytics:
@staticmethod
def track_event(event_type, user_id, metadata):
era_storage.d1.exec("analytics",
"INSERT INTO events (type, user_id, metadata, timestamp) VALUES (?, ?, ?, ?)",
[event_type, user_id, json.dumps(metadata), datetime.now().isoformat()]
)
@staticmethod
def get_user_events(user_id):
return era_storage.d1.query("analytics",
"SELECT * FROM events WHERE user_id = ? ORDER BY timestamp DESC LIMIT 100",
[user_id]
)
# Content service
class ContentService:
@staticmethod
def store_asset(asset_id, asset_data):
era_storage.r2.put("content", f"assets/{asset_id}", asset_data)
@staticmethod
def get_asset(asset_id):
return era_storage.r2.get("content", f"assets/{asset_id}")
# Usage - services work together
session_id = AuthService.create_session("user123", {"ip": "1.2.3.4"})
Analytics.track_event("login", "user123", {"session": session_id})
ContentService.store_asset("logo.png", image_data)

Here’s a complete example showing how to build a blog system with posts, comments, and images:

import era_storage
import json
from datetime import datetime
# Initialize database
def init_blog_db():
# Posts table
era_storage.d1.exec("blog", """
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
slug TEXT UNIQUE NOT NULL,
content TEXT NOT NULL,
author_id TEXT NOT NULL,
published_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Comments table
era_storage.d1.exec("blog", """
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER NOT NULL,
author_id TEXT NOT NULL,
content TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (post_id) REFERENCES posts(id)
)
""")
# Create a post
def create_post(title, slug, content, author_id, cover_image=None):
# Store post in D1
era_storage.d1.exec("blog",
"INSERT INTO posts (title, slug, content, author_id, published_at) VALUES (?, ?, ?, ?, ?)",
[title, slug, content, author_id, datetime.now().isoformat()]
)
# Upload cover image to R2 if provided
if cover_image:
era_storage.r2.put("blog", f"covers/{slug}.jpg", cover_image)
# Cache post for fast access
post_data = {
'title': title,
'slug': slug,
'content': content,
'author_id': author_id,
'published_at': datetime.now().isoformat()
}
era_storage.kv.set("blog", f"post:{slug}", json.dumps(post_data))
return slug
# Get a post (with caching)
def get_post(slug):
# Try cache first
cached = era_storage.kv.get("blog", f"post:{slug}")
if cached:
return json.loads(cached)
# Cache miss - query database
posts = era_storage.d1.query("blog",
"SELECT * FROM posts WHERE slug = ?",
[slug]
)
if posts:
post = posts[0]
# Update cache
era_storage.kv.set("blog", f"post:{slug}", json.dumps(post))
return post
return None
# Add comment
def add_comment(post_slug, author_id, content):
# Get post ID
post = get_post(post_slug)
if not post:
return None
# Insert comment
era_storage.d1.exec("blog",
"INSERT INTO comments (post_id, author_id, content) VALUES (?, ?, ?)",
[post['id'], author_id, content]
)
# Invalidate post cache (so comments show up)
era_storage.kv.delete("blog", f"post:{post_slug}")
return True
# Get post with comments
def get_post_with_comments(slug):
post = get_post(slug)
if not post:
return None
# Get comments
comments = era_storage.d1.query("blog",
"SELECT * FROM comments WHERE post_id = ? ORDER BY created_at ASC",
[post['id']]
)
# Get cover image URL if exists
objects = era_storage.r2.list("blog", prefix=f"covers/{slug}")
cover_url = f"/blog/covers/{slug}.jpg" if objects else None
return {
'post': post,
'comments': comments,
'cover_url': cover_url
}
# Usage
init_blog_db()
# Create a post with cover image
with open("cover.jpg", "rb") as f:
create_post(
"Building with ERA Storage",
"era-storage-guide",
"Learn how to use ERA's storage proxy...",
"alice",
cover_image=f.read()
)
# Add comments
add_comment("era-storage-guide", "bob", "Great tutorial!")
add_comment("era-storage-guide", "charlie", "Very helpful, thanks!")
# Display post
post_data = get_post_with_comments("era-storage-guide")
print(f"Title: {post_data['post']['title']}")
print(f"Comments: {len(post_data['comments'])}")

The SDK automatically uses the ERA_STORAGE_URL environment variable set by ERA. You can access it in your code:

import os
print(f"Storage URL: {os.getenv('ERA_STORAGE_URL')}")
# Output: http://host.docker.internal:8787
console.log(`Storage URL: ${process.env.ERA_STORAGE_URL}`);
// Output: http://host.docker.internal:8787
  • KV: Configs, sessions, simple key-value data
  • D1: Structured data with relationships and queries
  • R2: Files, images, large binary data
# Cache expensive database queries in KV
def get_user_stats(user_id):
# Check cache
cache_key = f"stats:{user_id}"
cached = era_storage.kv.get("cache", cache_key)
if cached:
return json.loads(cached)
# Query database
stats = era_storage.d1.query("myapp",
"SELECT COUNT(*) as posts, SUM(views) as total_views FROM posts WHERE user_id = ?",
[user_id]
)[0]
# Cache for 5 minutes
era_storage.kv.set("cache", cache_key, json.dumps(stats))
return stats
# Good: Organized namespaces
era_storage.kv.set("users", "profile:123", data)
era_storage.kv.set("content", "post:456", data)
era_storage.kv.set("cache", "query:789", data)
# Bad: Everything in one namespace
era_storage.kv.set("myapp", "user_profile_123", data)
era_storage.kv.set("myapp", "content_post_456", data)
era_storage.kv.set("myapp", "cache_query_789", data)
import era_storage
try:
era_storage.kv.set("myapp", "key", "value")
except Exception as e:
print(f"Storage error: {e}")
# Fallback behavior
# Clean up old data
def cleanup_old_sessions():
# Get all session keys
keys = era_storage.kv.list("sessions", prefix="session:")
for key_info in keys:
key = key_info['name']
session_data = era_storage.kv.get("sessions", key)
# Delete if older than 30 days
if is_expired(session_data):
era_storage.kv.delete("sessions", key)

List and discover all storage resources:

import requests
import os
storage_url = os.getenv('ERA_STORAGE_URL')
# Get all resources
response = requests.get(f"{storage_url}/api/resources/list")
resources = response.json()
print(f"Total resources: {resources['total']}")
for resource in resources['resources']:
print(f" {resource['type']}:{resource['namespace']}:{resource['key']}")
# Filter by namespace
response = requests.get(f"{storage_url}/api/resources/list?namespace=myapp")
app_resources = response.json()
# Get statistics
response = requests.get(f"{storage_url}/api/resources/stats")
stats = response.json()
print(f"KV keys: {stats['by_type']['kv']}")
print(f"D1 tables: {stats['by_type']['d1']}")
print(f"R2 objects: {stats['by_type']['r2']}")

Test your storage code locally:

Terminal window
# Create a session
SESSION=$(curl -s -X POST http://localhost:8787/api/sessions \
-H "Content-Type: application/json" \
-d '{"language": "python"}' | jq -r '.session_id')
# Run code with storage
curl -X POST "http://localhost:8787/api/sessions/$SESSION/run" \
-H "Content-Type: application/json" \
-d '{
"code": "import era_storage\nera_storage.kv.set(\"test\", \"hello\", \"world\")\nprint(era_storage.kv.get(\"test\", \"hello\"))"
}'
# Check resources
curl http://localhost:8787/api/resources/stats | jq '.'