Client Operations¶
Beyond DataFrame I/O, polars-redis provides ergonomic wrappers for common Redis operations. These live in the client module and are designed for bulk operations that complement your DataFrame workflows.
Geospatial Operations¶
Work with Redis GEO commands for location-based data.
Adding Locations¶
import polars_redis as redis
url = "redis://localhost:6379"
# Add locations to a geo set
locations = [
("headquarters", -122.4194, 37.7749), # San Francisco
("warehouse", -118.2437, 34.0522), # Los Angeles
("office", -73.9857, 40.7484), # New York
]
result = redis.geo_add(url, "locations", locations)
print(f"Added {result['added']} new, updated {result['updated']} existing")
Radius Queries¶
Find locations within a radius:
# Find locations within 500km of a point
nearby = redis.geo_radius(
url,
"locations",
longitude=-122.4,
latitude=37.8,
radius=500,
unit="km",
count=10, # Max results
sort="ASC", # Nearest first
)
for loc in nearby:
print(f"{loc['name']}: {loc['distance']:.1f} km away")
Search from an existing member:
# Find locations within 1000 miles of headquarters
nearby = redis.geo_radius_by_member(
url,
"locations",
member="headquarters",
radius=1000,
unit="mi",
)
Distance Calculations¶
# Distance between two members
dist = redis.geo_dist(url, "locations", "headquarters", "warehouse", "km")
print(f"Distance: {dist:.1f} km")
# Distance matrix for multiple members
members = ["headquarters", "warehouse", "office"]
matrix = redis.geo_dist_matrix(url, "locations", members, "km")
# matrix[i][j] = distance from members[i] to members[j]
for i, m1 in enumerate(members):
for j, m2 in enumerate(members):
print(f"{m1} -> {m2}: {matrix[i][j]:.1f} km")
Position and Geohash¶
# Get coordinates
positions = redis.geo_pos(url, "locations", ["headquarters", "warehouse"])
for pos in positions:
print(f"{pos['name']}: ({pos['longitude']}, {pos['latitude']})")
# Get geohashes
hashes = redis.geo_hash(url, "locations", ["headquarters", "warehouse"])
for name, hash in hashes:
print(f"{name}: {hash}")
Combining with DataFrames¶
import polars as pl
import polars_redis as redis
url = "redis://localhost:6379"
# Get all stores within 50km of a customer
nearby_stores = redis.geo_radius(
url, "stores",
longitude=customer_lon,
latitude=customer_lat,
radius=50, unit="km"
)
# Convert to DataFrame for analysis
stores_df = pl.DataFrame(nearby_stores)
# Join with inventory data
inventory = redis.scan_hashes(url, "inventory:*", schema)
available = (
stores_df
.join(inventory.collect(), left_on="name", right_on="store_id")
.filter(pl.col("quantity") > 0)
.sort("distance")
)
Key Management¶
Bulk operations for managing Redis keys.
Key Information¶
import polars_redis as redis
url = "redis://localhost:6379"
# Get info for all keys matching a pattern
info = redis.key_info(url, "user:*", include_memory=True)
for k in info:
print(f"{k['key']}: type={k['key_type']}, ttl={k['ttl']}s, "
f"memory={k['memory_usage']} bytes")
Bulk TTL Operations¶
# Set TTL for multiple keys
keys = ["session:1", "session:2", "session:3"]
result = redis.set_ttl(url, keys, ttl_seconds=3600)
print(f"Set TTL on {result['succeeded']} keys, {result['failed']} failed")
# Set different TTLs for each key
keys_and_ttls = [
("cache:hot", 300), # 5 minutes
("cache:warm", 3600), # 1 hour
("cache:cold", 86400), # 1 day
]
result = redis.set_ttl_individual(url, keys_and_ttls)
# Remove TTL (make persistent)
result = redis.persist_keys(url, keys)
# Get current TTL
ttls = redis.get_ttl(url, keys)
for key, ttl in ttls:
if ttl == -1:
print(f"{key}: no expiry")
elif ttl == -2:
print(f"{key}: does not exist")
else:
print(f"{key}: expires in {ttl}s")
Bulk Delete¶
# Delete specific keys
result = redis.delete_keys(url, ["temp:1", "temp:2", "temp:3"])
print(f"Deleted {result['deleted']} keys")
# Delete by pattern (uses SCAN internally)
result = redis.delete_keys_pattern(url, "temp:*")
print(f"Deleted {result['deleted']} keys matching pattern")
Bulk Rename¶
# Rename multiple keys
renames = [
("old:user:1", "user:1"),
("old:user:2", "user:2"),
]
result = redis.rename_keys(url, renames)
print(f"Renamed {result['succeeded']} keys")
if result['errors']:
for key, error in result['errors']:
print(f" Failed: {key} - {error}")
Check Existence¶
keys = ["user:1", "user:2", "user:999"]
exists = redis.exists_keys(url, keys)
for key, exists in exists:
print(f"{key}: {'exists' if exists else 'missing'}")
Pipelines and Transactions¶
Batch multiple operations for efficiency or atomicity.
Pipelines¶
Pipelines send multiple commands in a single round-trip, reducing network latency:
import polars_redis as redis
url = "redis://localhost:6379"
# Create a pipeline
pipe = redis.Pipeline(url)
# Queue operations (no network calls yet)
pipe.set("counter", "0")
pipe.incr("counter")
pipe.incr("counter")
pipe.hset("user:1", "name", "Alice")
pipe.hset("user:1", "age", "30")
pipe.get("counter")
# Execute all at once (single round-trip)
results = pipe.execute()
print(f"Executed {len(results)} commands")
print(f"Counter value: {results.get(5).value}") # "2"
Transactions¶
Transactions use MULTI/EXEC for atomic execution:
# Create a transaction
tx = redis.Transaction(url)
# Queue atomic operations
tx.set("balance:alice", "100")
tx.set("balance:bob", "50")
tx.decrby("balance:alice", 25)
tx.incrby("balance:bob", 25)
# Execute atomically - all succeed or all fail
results = tx.execute()
if results.all_succeeded():
print("Transfer complete")
else:
print("Transfer failed")
Available Commands¶
Both Pipeline and Transaction support:
String operations:
- set(key, value), set_ex(key, value, seconds)
- get(key), mget(keys)
- incr(key), incrby(key, amount), decr(key)
Hash operations:
- hset(key, field, value), hmset(key, fields_dict)
- hget(key, field), hgetall(key)
- hdel(key, fields), hincrby(key, field, amount)
List operations:
- lpush(key, values), rpush(key, values)
- lrange(key, start, stop), llen(key)
Set operations:
- sadd(key, members), smembers(key)
- sismember(key, member), scard(key)
Sorted set operations:
- zadd(key, members_with_scores)
- zrange(key, start, stop), zscore(key, member), zcard(key)
Key operations:
- delete(keys), exists(keys)
- expire(key, seconds), ttl(key)
- rename(key, new_key), key_type(key)
Raw commands:
- raw(command, args) - Execute any Redis command
Pipeline with DataFrame Data¶
import polars as pl
import polars_redis as redis
url = "redis://localhost:6379"
# DataFrame with updates
updates = pl.DataFrame({
"user_id": ["user:1", "user:2", "user:3"],
"score": [100, 85, 92],
})
# Batch update using pipeline
pipe = redis.Pipeline(url)
for row in updates.iter_rows(named=True):
pipe.hset(row["user_id"], "score", str(row["score"]))
results = pipe.execute()
print(f"Updated {results.succeeded} users")
Pub/Sub¶
Collect messages from Redis Pub/Sub channels into DataFrames.
Basic Collection¶
import polars_redis as redis
url = "redis://localhost:6379"
# Collect messages for 10 seconds
config = redis.PubSubConfig(
channels=["events", "alerts"],
timeout_seconds=10,
)
df = redis.collect_pubsub(url, config)
print(df)
# shape: (N, 3)
# +---------+---------+---------------------------+
# | channel | message | timestamp |
# +---------+---------+---------------------------+
# | events | {...} | 2024-01-01 12:00:00.123 |
# | alerts | {...} | 2024-01-01 12:00:00.456 |
# +---------+---------+---------------------------+
Pattern Subscriptions¶
# Subscribe to patterns
config = redis.PubSubConfig(
patterns=["sensor:*", "device:*"],
timeout_seconds=30,
max_messages=1000, # Stop after 1000 messages
)
df = redis.collect_pubsub(url, config)
Processing Messages¶
import polars as pl
import json
# Collect JSON messages
df = redis.collect_pubsub(url, config)
# Parse JSON payloads
parsed = df.with_columns(
pl.col("message").str.json_decode().alias("data")
).unnest("data")
# Aggregate by channel
stats = (
parsed
.group_by("channel")
.agg([
pl.len().alias("message_count"),
pl.col("value").mean().alias("avg_value"),
])
)
When to Use What¶
| Task | Use |
|---|---|
| Bulk location queries | geo_radius(), geo_radius_by_member() |
| Distance calculations | geo_dist(), geo_dist_matrix() |
| Key inventory/cleanup | key_info(), delete_keys_pattern() |
| Bulk TTL management | set_ttl(), persist_keys() |
| Batch operations (speed) | Pipeline |
| Atomic operations | Transaction |
| Real-time event collection | collect_pubsub() |
Performance Tips¶
-
Use pipelines for bulk operations - A pipeline with 1000 commands is much faster than 1000 individual calls.
-
Transactions have overhead - Only use transactions when atomicity is required. Pipelines are faster for non-atomic batches.
-
Geo queries are efficient - Redis GEO uses sorted sets with geohashing. Radius queries are O(N+log(M)) where N is results and M is total members.
-
Key info is expensive -
key_info()withinclude_memory=Trueruns MEMORY USAGE for each key. Use sparingly on large keyspaces.