github.com/joshrotenberg/polars-redis
If you're already juggling Redis + Postgres + S3 to answer questions about Redis data, keep watching.
For the datasets most teams actually have, you might not need the warehouse.
import polars as pl
import polars_redis as redis
url = "redis://localhost:6379"
# Redis is just another data source
users = redis.scan_hashes(url, "user:*", {"name": pl.Utf8, "age": pl.Int64})
orders = pl.read_parquet("s3://bucket/orders.parquet")
# Full Polars transformation power
result = (
users.join(orders, on="user_id")
.group_by("region")
.agg(pl.col("amount").sum().alias("revenue"))
.sort("revenue", descending=True)
)
# Write back to Redis
redis.write_hashes(result.collect(), url, key_prefix="region_stats:")
Redis becomes a first-class connector alongside Parquet, CSV, and databases.
# Typical pattern: Fetch everything, process in app
keys = redis.scan_iter("user:*") # Iterate all keys
users = []
for key in keys:
data = redis.hgetall(key) # N+1 queries
users.append(data)
df = pd.DataFrame(users) # Manual conversion
df["age"] = df["age"].astype(int) # Manual type coercion
df["score"] = df["score"].astype(float)
active = df[df["status"] == "active"] # Filter in Python
df = redis.scan_hashes(
url, "user:*",
schema={
"name": pl.Utf8,
"age": pl.Int64,
"status": pl.Utf8
}
).filter(
pl.col("status") == "active"
).collect()
from polars_redis import col
df = redis.search_hashes(
url, index="users_idx",
query=col("status") == "active",
schema={
"name": pl.Utf8,
"age": pl.Int64
}
).collect()
# Only matching docs transferred!
Build RediSearch queries with Polars-like syntax. Requires a RediSearch index on your data.
from polars_redis import col
# Numeric + tag filter
query = (col("age") > 30) & (col("status") == "active")
# Range + multi-tag
query = (
col("price").is_between(100, 500) &
col("category").has_any_tag(["electronics", "computers"])
)
# Text search
query = col("title").contains("python")
# Geo search
query = col("location").within_radius(-122.4, 37.7, 10, "km")
# Numeric + tag filter
@age:[(30 +inf] @status:{active}
# Range + multi-tag
@price:[100 500] @category:{electronics|computers}
# Text search
@title:python
# Geo search
@location:[-122.4 37.7 10 km]
No need to learn RediSearch query syntax - use what you know from Polars.
# Aggregate in Redis, not Python
df = redis.aggregate_hashes(
url,
index="users_idx",
query="@status:{active}",
group_by=["@department"],
reduce=[
("COUNT", [], "headcount"),
("AVG", ["@salary"], "avg_salary"),
("SUM", ["@salary"], "total_payroll"),
],
sort_by=[("@avg_salary", False)],
limit=10,
)
| department | headcount | avg_salary | total_payroll |
|-------------|-----------|------------|---------------|
| engineering | 45 | 125000.0 | 5625000.0 |
| product | 20 | 115000.0 | 2300000.0 |
| marketing | 15 | 95000.0 | 1425000.0 |
Only aggregated results transferred - not raw data.
from polars_redis import smart_scan, explain_scan
# Auto-detect index and use FT.SEARCH if available
df = smart_scan(
url, "user:*",
schema={"name": pl.Utf8, "age": pl.Int64}
).filter(pl.col("age") > 30).collect()
# Check what it will do before running
plan = explain_scan(url, "user:*", schema={"name": pl.Utf8})
print(plan.explain())
Strategy: SEARCH
Index: users_idx
Prefixes: user:
Type: HASH
Server Query: *
If an index exists, smart_scan uses it.
Otherwise, falls back to SCAN. Same API either way.
@redis.cache(url=url, ttl=3600)
def expensive_query(start, end):
return (
pl.scan_parquet("huge.parquet")
.filter(pl.col("date").is_between(start, end))
.group_by("category")
.agg(pl.sum("revenue"))
.collect()
)
# First call: compute + cache
result = expensive_query("2024-01", "2024-12")
# Second call: instant
result = expensive_query("2024-01", "2024-12")
# Cache with compression
redis.cache_dataframe(
df, url, "result",
compression="zstd",
ttl=3600
)
# Retrieve
df = redis.get_cached_dataframe(url, "result")
Common Redis client commands included as a bonus.
# Add locations
redis.geo_add(url, "stores", [
("hq", -122.4, 37.7),
("warehouse", -118.2, 34.0),
])
# Find nearby
nearby = redis.geo_radius(
url, "stores",
longitude=-122.4,
latitude=37.8,
radius=50, unit="km"
)
# Bulk TTL
redis.set_ttl(url, keys, 3600)
# Bulk delete by pattern
redis.delete_keys_pattern(url, "temp:*")
# Key info
info = redis.key_info(url, "user:*")
for k in info:
print(f"{k['key']}: {k['ttl']}s")
pipe = redis.Pipeline(url)
pipe.set("a", "1")
pipe.incr("counter")
pipe.hset("user:1", "name", "Alice")
results = pipe.execute() # One round-trip
tx = redis.Transaction(url)
tx.decrby("balance:alice", 100)
tx.incrby("balance:bob", 100)
results = tx.execute() # Atomic
# Collect messages
df = redis.collect_pubsub(
url,
channels=["events", "alerts"],
timeout_seconds=10,
max_messages=1000,
)
# Result: DataFrame with channel, message, timestamp
print(df)
# | channel | message | timestamp |
# |---------|------------|---------------------|
# | events | {"type": | 2024-01-01 12:00:00 |
# Consumer group support
df = redis.read_stream(
url, "mystream",
group="workers",
consumer="worker-1",
schema={"user": pl.Utf8, "action": pl.Utf8}
)
# Continuous iteration
for batch in redis.iter_stream(url, "events"):
process(batch)
This is for the sweet spot: datasets that fit in memory, whether Redis is your source of truth or you're aggregating data from elsewhere into Redis for fast access.
# Batch size
lf = redis.scan_hashes(
url, "user:*", schema,
batch_size=5000,
count_hint=1000,
)
# Parallel workers
lf = redis.scan_hashes(
url, "user:*", schema,
parallel=4,
)
With RediSearch: Server-side filtering can reduce data transfer by 90%+ for selective queries.
# Join Redis sessions with warehouse data
sessions = redis.scan_hashes(url, "session:*", schema)
customers = pl.read_parquet("customers.parquet")
engaged = (
sessions.join(customers, on="user_id")
.filter(pl.col("page_views") > 10)
)
redis.write_hashes(engaged.collect(), url, key_prefix="engaged:")
# Redis as fast staging area
df = extract_from_source()
redis.write_hashes(df, url, key_prefix="stage:", ttl=7200)
# Transform with RediSearch
result = redis.aggregate_hashes(url, "stage_idx", ...)
# Load to warehouse
load_to_warehouse(result)
pip install polars-redis
docker run -d -p 6379:6379 \
redis/redis-stack:latest
import polars as pl
import polars_redis as redis
url = "redis://localhost:6379"
# Write
df = pl.DataFrame({"id": [1,2], "name": ["a","b"]})
redis.write_hashes(df, url, key_column="id", key_prefix="test:")
# Read
result = redis.scan_hashes(
url, "test:*",
schema={"name": pl.Utf8}
).collect()
print(result)
pip install polars-redis
github.com/joshrotenberg/polars-redis
| Redis Type | Scan | Write | Notes |
|---|---|---|---|
| Hash | Yes | Yes | Field-level projection pushdown |
| JSON | Yes | Yes | JSONPath extraction |
| String | Yes | Yes | Key-value pairs |
| Set / List / Sorted Set | Yes | Yes | One row per member |
| Stream | Yes | No | Consumer groups, timestamps |
| TimeSeries | Yes | No | Server-side aggregation |
(backup slide)
LazyFrame API
DataFrame ops
.collect()
Rust + PyO3
Async batching
Arrow
conversion
SCAN + HGETALL
FT.SEARCH / FT.AGGREGATE
Pub/Sub,
Streams
io/ DataFrame I/O (scan, write, cache, search)
client/ Redis operations (geo, keys, pipeline, pubsub)
(backup slide)