Use Cases¶
polars-redis shines in scenarios where Redis holds data that needs analytical processing, enrichment, or transformation. Here are common patterns and examples.
DataFrame Caching¶
Cache expensive computations in Redis without manual serialization boilerplate.
The Problem¶
Caching DataFrames in Redis traditionally requires manual work:
# The old way - lots of boilerplate
import pyarrow as pa
import redis
import io
def cache_df(r, key, df):
buffer = io.BytesIO()
df.to_parquet(buffer, compression='zstd')
buffer.seek(0)
r.set(key, buffer.read())
r.expire(key, 3600)
def get_df(r, key):
data = r.get(key)
if data is None:
return None
return pl.read_parquet(io.BytesIO(data))
Plus you need to handle: - Large DataFrames exceeding Redis's 512MB limit - Compression options - TTL management - Cache invalidation - Key generation for function arguments
The polars-redis Solution¶
import polars_redis as redis
# One decorator does it all
@redis.cache(url="redis://localhost:6379", ttl=3600, compression="zstd")
def expensive_aggregation(start: str, end: str) -> pl.DataFrame:
return (
pl.scan_parquet("huge_dataset.parquet")
.filter(pl.col("date").is_between(start, end))
.group_by("category")
.agg(pl.sum("revenue"))
.collect()
)
# First call: computes and caches
result = expensive_aggregation("2024-01-01", "2024-12-31")
# Second call: instant cache hit
result = expensive_aggregation("2024-01-01", "2024-12-31")
# Force refresh
result = expensive_aggregation("2024-01-01", "2024-12-31", _cache_refresh=True)
# Invalidate
expensive_aggregation.invalidate("2024-01-01", "2024-12-31")
Sharing Results Across Workers¶
import polars_redis as redis
url = "redis://localhost:6379"
# Worker 1: Compute and cache
result = heavy_ml_feature_engineering()
redis.cache_dataframe(result, url, "features:v1", compression="zstd", ttl=86400)
# Worker 2, 3, ...: Retrieve cached result instantly
features = redis.get_cached_dataframe(url, "features:v1")
Large DataFrame Support¶
polars-redis automatically chunks large DataFrames:
# 2GB DataFrame? No problem - auto-chunked across multiple keys
redis.cache_dataframe(huge_df, url, "big_result", chunk_size_mb=100)
# Retrieval reassembles automatically
df = redis.get_cached_dataframe(url, "big_result")
ETL Pipeline Checkpoints¶
import polars_redis as redis
url = "redis://localhost:6379"
# Stage 1
raw = extract_from_source()
redis.cache_dataframe(raw, url, "etl:stage1", ttl=7200)
# Stage 2 (can run separately, even on different machine)
stage1 = redis.get_cached_dataframe(url, "etl:stage1")
transformed = transform(stage1)
redis.cache_dataframe(transformed, url, "etl:stage2", ttl=7200)
# Stage 3
final = redis.get_cached_dataframe(url, "etl:stage2")
load_to_warehouse(final)
Customer Data Enrichment¶
Combine Redis session data with external sources for real-time customer insights:
import polars as pl
import polars_redis as redis
url = "redis://localhost:6379"
# Load active sessions from Redis
sessions = redis.scan_hashes(
url,
pattern="session:*",
schema={"user_id": pl.Utf8, "started_at": pl.Datetime, "page_views": pl.Int64},
)
# Join with customer data from data warehouse
customers = pl.read_parquet("customers.parquet")
# Find high-engagement customers currently browsing
engaged = (
sessions
.join(customers, on="user_id")
.filter(pl.col("page_views") > 10)
.select(["user_id", "name", "email", "page_views"])
)
# Write to Redis for real-time targeting
redis.write_hashes(engaged.collect(), url, key_prefix="engaged:")
Leaderboard Analytics¶
Analyze gaming or competition leaderboards stored in sorted sets:
import polars as pl
import polars_redis as redis
url = "redis://localhost:6379"
# Load leaderboard scores
lf = redis.scan_zsets(
url,
pattern="leaderboard:weekly:*",
include_rank=True,
)
# Find top performers across all leaderboards
top_players = (
lf
.group_by("member")
.agg([
pl.col("score").sum().alias("total_score"),
pl.col("rank").mean().alias("avg_rank"),
pl.len().alias("games_played"),
])
.sort("total_score", descending=True)
.head(100)
.collect()
)
# Store aggregated rankings
redis.write_hashes(top_players, url, key_prefix="top100:")
Event Stream Processing¶
Process Redis Streams for event analytics:
import polars as pl
import polars_redis as redis
from datetime import datetime, timedelta
url = "redis://localhost:6379"
# Calculate timestamp for 24 hours ago
yesterday_ms = int((datetime.now() - timedelta(days=1)).timestamp() * 1000)
# Load recent events
events = redis.scan_streams(
url,
pattern="events:*",
fields=["action", "user_id", "product_id"],
start_id=f"{yesterday_ms}-0",
)
# Aggregate by action type
action_counts = (
events
.group_by(["action", pl.col("_ts").dt.hour().alias("hour")])
.agg(pl.len().alias("count"))
.sort(["hour", "count"], descending=[False, True])
.collect()
)
Time Series Downsampling¶
Aggregate high-frequency sensor data for dashboards:
import polars as pl
import polars_redis as redis
url = "redis://localhost:6379"
# Server-side aggregation: 5-minute averages
aggregated = redis.scan_timeseries(
url,
pattern="sensor:temperature:*",
aggregation="avg",
bucket_size_ms=300000, # 5 minutes
label_columns=["location", "device_id"],
)
# Further analyze with Polars
hourly_stats = (
aggregated
.with_columns(pl.col("_ts").dt.truncate("1h").alias("hour"))
.group_by(["location", "hour"])
.agg([
pl.col("value").mean().alias("avg_temp"),
pl.col("value").min().alias("min_temp"),
pl.col("value").max().alias("max_temp"),
])
.collect()
)
Cache Warming¶
Pre-compute and cache frequently accessed aggregations:
import polars as pl
import polars_redis as redis
url = "redis://localhost:6379"
# Load order data
orders = pl.read_parquet("orders.parquet")
# Compute daily summaries
daily_summaries = (
orders
.group_by(pl.col("created_at").dt.date().alias("date"))
.agg([
pl.col("total").sum().alias("revenue"),
pl.len().alias("order_count"),
pl.col("total").mean().alias("avg_order_value"),
])
)
# Cache in Redis with 1-hour TTL
redis.write_hashes(
daily_summaries,
url,
key_column="date",
key_prefix="summary:daily:",
ttl=3600,
)
Session Analysis¶
Analyze user sessions for behavior patterns:
import polars as pl
import polars_redis as redis
url = "redis://localhost:6379"
# Load all session data
sessions = redis.scan_hashes(
url,
pattern="session:*",
schema={
"user_id": pl.Utf8,
"started_at": pl.Datetime,
"last_activity": pl.Datetime,
"page_views": pl.Int64,
"cart_value": pl.Float64,
},
include_ttl=True,
)
# Find sessions likely to convert
likely_buyers = (
sessions
.filter(
(pl.col("cart_value") > 50) &
(pl.col("page_views") > 5) &
(pl.col("_ttl") > 300) # Still active
)
.select(["user_id", "cart_value", "page_views"])
.collect()
)
RediSearch-Powered Analytics¶
Use server-side filtering for efficient queries on indexed data:
import polars as pl
import polars_redis as redis
from polars_redis import col
url = "redis://localhost:6379"
# Server-side filtered search
premium_users = redis.search_hashes(
url,
index="users_idx",
query=(col("tier") == "premium") & (col("lifetime_value") > 1000),
schema={
"user_id": pl.Utf8,
"name": pl.Utf8,
"tier": pl.Utf8,
"lifetime_value": pl.Float64,
},
)
# Combine with external data for campaign targeting
campaign_data = pl.read_csv("campaign_responses.csv")
targets = (
premium_users
.join(campaign_data.lazy(), on="user_id", how="left")
.filter(pl.col("last_response").is_null()) # Haven't responded yet
.collect()
)
# Write campaign targets back to Redis
redis.write_hashes(targets, url, key_prefix="campaign:target:")
Inventory Synchronization¶
Keep Redis cache in sync with inventory systems:
import polars as pl
import polars_redis as redis
url = "redis://localhost:6379"
# Current inventory from database/warehouse
current_inventory = pl.read_parquet("inventory.parquet")
# Cached inventory in Redis
cached = redis.scan_hashes(
url,
pattern="product:*",
schema={"sku": pl.Utf8, "quantity": pl.Int64, "last_updated": pl.Datetime},
)
# Find products that need updates
needs_update = (
current_inventory.lazy()
.join(cached, on="sku", how="left", suffix="_cached")
.filter(
pl.col("quantity") != pl.col("quantity_cached")
)
.select(["sku", "quantity", "price", "name"])
.collect()
)
# Update Redis cache
redis.write_hashes(
needs_update,
url,
key_column="sku",
key_prefix="product:",
write_mode="replace",
)
Tag-Based Filtering¶
Analyze data using Redis sets for tag membership:
import polars as pl
import polars_redis as redis
url = "redis://localhost:6379"
# Load tag memberships
product_tags = redis.scan_sets(
url,
pattern="tags:*",
)
# Extract tag name from key
tagged_products = (
product_tags
.with_columns(
pl.col("_key").str.replace("tags:", "").alias("tag")
)
.group_by("member")
.agg(pl.col("tag").alias("tags"))
)
# Load product details
products = redis.scan_hashes(
url,
pattern="product:*",
schema={"name": pl.Utf8, "price": pl.Float64, "category": pl.Utf8},
)
# Join and analyze
product_analysis = (
products
.join(tagged_products, left_on="_key", right_on="member", how="left")
.with_columns(pl.col("tags").list.len().alias("tag_count"))
.sort("tag_count", descending=True)
.collect()
)
Performance Considerations¶
Use RediSearch When Available¶
For indexed data, search_hashes() is significantly faster than scanning
with client-side filtering:
# Slow: scan all, filter client-side
result = (
redis.scan_hashes(url, "user:*", schema)
.filter(pl.col("age") > 30)
.collect()
)
# Fast: filter server-side with RediSearch
result = redis.search_hashes(
url,
index="users_idx",
query=col("age") > 30,
schema=schema,
).collect()
Projection Pushdown¶
Only select the columns you need to minimize data transfer:
# Transfers all fields
df = redis.scan_hashes(url, "user:*", large_schema).collect()
# Transfers only selected fields (uses HMGET)
df = (
redis.scan_hashes(url, "user:*", large_schema)
.select(["name", "email"])
.collect()
)
Batch Size Tuning¶
Adjust batch size based on your data and memory constraints:
# Large batches for small documents
lf = redis.scan_hashes(url, "user:*", schema, batch_size=5000)
# Smaller batches for large documents
lf = redis.scan_json(url, "doc:*", schema, batch_size=100)
Parallel Fetching¶
Use parallel workers for large datasets: