polars-redis

Query Redis like a database.
Transform with Polars.
Write back without ETL.

github.com/joshrotenberg/polars-redis

Is This For You?

  • You have data in Redis already (or want to put it there)
  • You want to analyze it without ETL pipelines
  • Your dataset fits in memory (10K - 1M documents)
  • You know Polars (or want to learn)

If you're already juggling Redis + Postgres + S3 to answer questions about Redis data, keep watching.

What if Redis was your data layer?

Traditional Stack

  • Redis for caching
  • PostgreSQL for queries
  • S3/Parquet for analytics
  • ETL jobs to sync them

With polars-redis

  • Redis for storage
  • Polars for compute
  • That's it.

For the datasets most teams actually have, you might not need the warehouse.

The Hero Example

import polars as pl
import polars_redis as redis

url = "redis://localhost:6379"

# Redis is just another data source
users = redis.scan_hashes(url, "user:*", {"name": pl.Utf8, "age": pl.Int64})
orders = pl.read_parquet("s3://bucket/orders.parquet")

# Full Polars transformation power
result = (
    users.join(orders, on="user_id")
    .group_by("region")
    .agg(pl.col("amount").sum().alias("revenue"))
    .sort("revenue", descending=True)
)

# Write back to Redis
redis.write_hashes(result.collect(), url, key_prefix="region_stats:")

Redis becomes a first-class connector alongside Parquet, CSV, and databases.

What You Get

DataFrame I/O

  • Scan hashes, JSON, strings, sets, lists, sorted sets, streams, time series
  • Write DataFrames back
  • Schema inference
  • Projection pushdown

RediSearch Integration

  • Server-side filtering
  • Server-side aggregation
  • Polars-like query builder
  • Smart scan (auto-detects indexes)

Beyond DataFrames

  • DataFrame caching
  • Pub/Sub collection
  • Geospatial operations
  • Pipelines & transactions

The Old Way

# Typical pattern: Fetch everything, process in app
keys = redis.scan_iter("user:*")                      # Iterate all keys
users = []
for key in keys:
    data = redis.hgetall(key)                         # N+1 queries
    users.append(data)

df = pd.DataFrame(users)                              # Manual conversion
df["age"] = df["age"].astype(int)                     # Manual type coercion
df["score"] = df["score"].astype(float)
active = df[df["status"] == "active"]                 # Filter in Python
  • N+1 queries for N keys
  • Manual type conversion
  • All data transferred, then filtered client-side

The New Way

Scan

df = redis.scan_hashes(
    url, "user:*",
    schema={
        "name": pl.Utf8,
        "age": pl.Int64,
        "status": pl.Utf8
    }
).filter(
    pl.col("status") == "active"
).collect()

Or Search (with index)

from polars_redis import col

df = redis.search_hashes(
    url, index="users_idx",
    query=col("status") == "active",
    schema={
        "name": pl.Utf8,
        "age": pl.Int64
    }
).collect()
# Only matching docs transferred!
  • Batched async operations - one call, not N+1
  • Automatic types - schema or inference
  • Predicate pushdown - filter in Redis with RediSearch

RediSearch Query Builder

Build RediSearch queries with Polars-like syntax. Requires a RediSearch index on your data.

Python

from polars_redis import col

# Numeric + tag filter
query = (col("age") > 30) & (col("status") == "active")

# Range + multi-tag
query = (
    col("price").is_between(100, 500) &
    col("category").has_any_tag(["electronics", "computers"])
)

# Text search
query = col("title").contains("python")

# Geo search
query = col("location").within_radius(-122.4, 37.7, 10, "km")

Generated RediSearch Syntax

# Numeric + tag filter
@age:[(30 +inf] @status:{active}

# Range + multi-tag
@price:[100 500] @category:{electronics|computers}


# Text search
@title:python

# Geo search
@location:[-122.4 37.7 10 km]

No need to learn RediSearch query syntax - use what you know from Polars.

Server-Side Aggregation

# Aggregate in Redis, not Python
df = redis.aggregate_hashes(
    url,
    index="users_idx",
    query="@status:{active}",
    group_by=["@department"],
    reduce=[
        ("COUNT", [], "headcount"),
        ("AVG", ["@salary"], "avg_salary"),
        ("SUM", ["@salary"], "total_payroll"),
    ],
    sort_by=[("@avg_salary", False)],
    limit=10,
)
| department  | headcount | avg_salary | total_payroll |
|-------------|-----------|------------|---------------|
| engineering | 45        | 125000.0   | 5625000.0     |
| product     | 20        | 115000.0   | 2300000.0     |
| marketing   | 15        | 95000.0    | 1425000.0     |

Only aggregated results transferred - not raw data.

Smart Scan: Automatic Optimization

from polars_redis import smart_scan, explain_scan

# Auto-detect index and use FT.SEARCH if available
df = smart_scan(
    url, "user:*",
    schema={"name": pl.Utf8, "age": pl.Int64}
).filter(pl.col("age") > 30).collect()

# Check what it will do before running
plan = explain_scan(url, "user:*", schema={"name": pl.Utf8})
print(plan.explain())
Strategy: SEARCH
Index: users_idx
  Prefixes: user:
  Type: HASH
Server Query: *

If an index exists, smart_scan uses it. Otherwise, falls back to SCAN. Same API either way.

DataFrame Caching

@cache Decorator

@redis.cache(url=url, ttl=3600)
def expensive_query(start, end):
    return (
        pl.scan_parquet("huge.parquet")
        .filter(pl.col("date").is_between(start, end))
        .group_by("category")
        .agg(pl.sum("revenue"))
        .collect()
    )

# First call: compute + cache
result = expensive_query("2024-01", "2024-12")

# Second call: instant
result = expensive_query("2024-01", "2024-12")

Direct API

# Cache with compression
redis.cache_dataframe(
    df, url, "result",
    compression="zstd",
    ttl=3600
)

# Retrieve
df = redis.get_cached_dataframe(url, "result")
  • Auto-chunking for large DataFrames
  • Only limited by Redis storage
  • Arrow IPC or Parquet format
  • lz4, zstd, gzip compression

Redis Operations While You're Here

Common Redis client commands included as a bonus.

Geospatial

# Add locations
redis.geo_add(url, "stores", [
    ("hq", -122.4, 37.7),
    ("warehouse", -118.2, 34.0),
])

# Find nearby
nearby = redis.geo_radius(
    url, "stores",
    longitude=-122.4,
    latitude=37.8,
    radius=50, unit="km"
)

Key Management

# Bulk TTL
redis.set_ttl(url, keys, 3600)

# Bulk delete by pattern
redis.delete_keys_pattern(url, "temp:*")

# Key info
info = redis.key_info(url, "user:*")
for k in info:
    print(f"{k['key']}: {k['ttl']}s")

Pipelines

pipe = redis.Pipeline(url)
pipe.set("a", "1")
pipe.incr("counter")
pipe.hset("user:1", "name", "Alice")
results = pipe.execute()  # One round-trip

Transactions

tx = redis.Transaction(url)
tx.decrby("balance:alice", 100)
tx.incrby("balance:bob", 100)
results = tx.execute()  # Atomic

Real-time Data

Pub/Sub to DataFrame

# Collect messages
df = redis.collect_pubsub(
    url,
    channels=["events", "alerts"],
    timeout_seconds=10,
    max_messages=1000,
)

# Result: DataFrame with channel, message, timestamp
print(df)
# | channel | message    | timestamp           |
# |---------|------------|---------------------|
# | events  | {"type":   | 2024-01-01 12:00:00 |

Streams

# Consumer group support
df = redis.read_stream(
    url, "mystream",
    group="workers",
    consumer="worker-1",
    schema={"user": pl.Utf8, "action": pl.Utf8}
)

# Continuous iteration
for batch in redis.iter_stream(url, "events"):
    process(batch)

When NOT to Use This

  • 100M+ documents? Use a proper data warehouse or Elasticsearch
  • Need ACID transactions? Use PostgreSQL
  • Complex joins across datasets? Use a database
  • Real-time sync to other systems? StreamConsumer and ReplicationPipeline are in progress

This is for the sweet spot: datasets that fit in memory, whether Redis is your source of truth or you're aggregating data from elsewhere into Redis for fast access.

Performance

Why It's Fast

  • Rust core - async, zero-copy Arrow
  • Batched operations - pipelined Redis calls
  • Predicate pushdown - filter in Redis
  • Projection pushdown - fetch only needed fields
  • Parallel fetching - configurable workers

Tuning

# Batch size
lf = redis.scan_hashes(
    url, "user:*", schema,
    batch_size=5000,
    count_hint=1000,
)

# Parallel workers
lf = redis.scan_hashes(
    url, "user:*", schema,
    parallel=4,
)

With RediSearch: Server-side filtering can reduce data transfer by 90%+ for selective queries.

Use Cases

Data Enrichment

# Join Redis sessions with warehouse data
sessions = redis.scan_hashes(url, "session:*", schema)
customers = pl.read_parquet("customers.parquet")

engaged = (
    sessions.join(customers, on="user_id")
    .filter(pl.col("page_views") > 10)
)

redis.write_hashes(engaged.collect(), url, key_prefix="engaged:")

ETL Staging

# Redis as fast staging area
df = extract_from_source()
redis.write_hashes(df, url, key_prefix="stage:", ttl=7200)

# Transform with RediSearch
result = redis.aggregate_hashes(url, "stage_idx", ...)

# Load to warehouse
load_to_warehouse(result)

Get Started

Install

pip install polars-redis

Redis Stack

docker run -d -p 6379:6379 \
  redis/redis-stack:latest

Try It

import polars as pl
import polars_redis as redis

url = "redis://localhost:6379"

# Write
df = pl.DataFrame({"id": [1,2], "name": ["a","b"]})
redis.write_hashes(df, url, key_column="id", key_prefix="test:")

# Read
result = redis.scan_hashes(
    url, "test:*",
    schema={"name": pl.Utf8}
).collect()
print(result)

Resources

Requirements

  • Python 3.9+
  • Redis 7.0+
  • Redis Stack for RediSearch/JSON

Also Available

  • Rust crate
  • Redis Cluster support

polars-redis

Stop copying data.
Query it where it lives.

pip install polars-redis

github.com/joshrotenberg/polars-redis

Appendix: Supported Redis Types

Redis Type Scan Write Notes
Hash Yes Yes Field-level projection pushdown
JSON Yes Yes JSONPath extraction
String Yes Yes Key-value pairs
Set / List / Sorted Set Yes Yes One row per member
Stream Yes No Consumer groups, timestamps
TimeSeries Yes No Server-side aggregation

(backup slide)

Appendix: Architecture

Python / Polars

LazyFrame API
DataFrame ops
.collect()

polars-redis

Rust + PyO3
Async batching
Arrow conversion

Redis

SCAN + HGETALL
FT.SEARCH / FT.AGGREGATE
Pub/Sub, Streams

io/ DataFrame I/O (scan, write, cache, search)

client/ Redis operations (geo, keys, pipeline, pubsub)

(backup slide)