Building a Product Catalog with Search¶
This end-to-end example shows how to build a searchable product catalog using polars-redis. We'll cover:
- Loading product data into Redis
- Creating a RediSearch index
- Querying products with filters
- Aggregating for analytics
- Updating inventory in bulk
Setup¶
Start Redis Stack (includes RediSearch):
Install polars-redis:
Step 1: Load Product Data¶
Let's say you have product data in a CSV or from an API:
import polars as pl
import polars_redis as redis
url = "redis://localhost:6379"
# Sample product data
products = pl.DataFrame({
"sku": ["LAPTOP-001", "LAPTOP-002", "PHONE-001", "PHONE-002", "TABLET-001"],
"name": ["Pro Laptop 15", "Budget Laptop 14", "Flagship Phone", "Mid-range Phone", "Pro Tablet 12"],
"category": ["laptops", "laptops", "phones", "phones", "tablets"],
"brand": ["TechCorp", "ValueBrand", "TechCorp", "ValueBrand", "TechCorp"],
"price": [1299.99, 599.99, 999.99, 449.99, 799.99],
"quantity": [50, 120, 200, 350, 75],
"rating": [4.5, 4.0, 4.7, 4.2, 4.6],
})
# Write to Redis as hashes
result = redis.write_hashes(
products,
url,
key_column="sku",
key_prefix="product:",
)
print(f"Loaded {result} products")
Each product is now a Redis hash:
product:LAPTOP-001 -> {name: "Pro Laptop 15", category: "laptops", ...}
product:LAPTOP-002 -> {name: "Budget Laptop 14", category: "laptops", ...}
...
Step 2: Create a Search Index¶
Create a RediSearch index for fast querying:
from polars_redis import Index, TextField, NumericField, TagField
# Define the index schema
product_index = Index(
name="products_idx",
prefix="product:",
schema=[
TextField("name", sortable=True),
TagField("category"),
TagField("brand"),
NumericField("price", sortable=True),
NumericField("quantity", sortable=True),
NumericField("rating", sortable=True),
]
)
# Create the index
product_index.create(url)
print("Index created")
Step 3: Query Products¶
Basic Scan (No Index Required)¶
# Scan all products
all_products = redis.scan_hashes(
url,
"product:*",
schema={
"name": pl.Utf8,
"category": pl.Utf8,
"price": pl.Float64,
"quantity": pl.Int64,
}
).collect()
print(all_products)
Search with Filters (Uses Index)¶
from polars_redis import col
# Find laptops under $1000
affordable_laptops = redis.search_hashes(
url,
index="products_idx",
query=(col("category") == "laptops") & (col("price") < 1000),
schema={
"name": pl.Utf8,
"price": pl.Float64,
"quantity": pl.Int64,
}
).collect()
print(affordable_laptops)
# shape: (1, 4)
# +---------+------------------+--------+----------+
# | _key | name | price | quantity |
# +---------+------------------+--------+----------+
# | product | Budget Laptop 14 | 599.99 | 120 |
# +---------+------------------+--------+----------+
Text Search¶
# Search by name
results = redis.search_hashes(
url,
index="products_idx",
query=col("name").contains("Pro"),
schema={"name": pl.Utf8, "price": pl.Float64},
).collect()
print(results)
# Matches: "Pro Laptop 15", "Pro Tablet 12"
Complex Queries¶
# High-rated TechCorp products with good stock
query = (
(col("brand") == "TechCorp") &
(col("rating") >= 4.5) &
(col("quantity") > 50)
)
premium_products = redis.search_hashes(
url,
index="products_idx",
query=query,
schema={
"name": pl.Utf8,
"category": pl.Utf8,
"price": pl.Float64,
"rating": pl.Float64,
},
sort_by="price",
sort_ascending=False,
).collect()
print(premium_products)
Step 4: Analytics with Aggregation¶
Category Statistics¶
# Get stats by category
category_stats = redis.aggregate_hashes(
url,
index="products_idx",
query="*",
group_by=["@category"],
reduce=[
("COUNT", [], "product_count"),
("AVG", ["@price"], "avg_price"),
("SUM", ["@quantity"], "total_inventory"),
("AVG", ["@rating"], "avg_rating"),
],
sort_by=[("@avg_price", False)],
)
print(category_stats)
# shape: (3, 5)
# +----------+---------------+-----------+-----------------+------------+
# | category | product_count | avg_price | total_inventory | avg_rating |
# +----------+---------------+-----------+-----------------+------------+
# | laptops | 2 | 949.99 | 170 | 4.25 |
# | phones | 2 | 724.99 | 550 | 4.45 |
# | tablets | 1 | 799.99 | 75 | 4.6 |
# +----------+---------------+-----------+-----------------+------------+
Brand Performance¶
# Revenue potential by brand (price * quantity)
brand_stats = redis.aggregate_hashes(
url,
index="products_idx",
query="*",
group_by=["@brand"],
reduce=[
("COUNT", [], "products"),
("SUM", ["@quantity"], "total_units"),
],
apply=[
# Note: For complex calculations, you might need to do this in Polars
],
)
# For more complex analytics, combine with Polars
products_df = redis.scan_hashes(
url, "product:*",
schema={
"brand": pl.Utf8,
"price": pl.Float64,
"quantity": pl.Int64,
}
).collect()
brand_revenue = (
products_df
.with_columns((pl.col("price") * pl.col("quantity")).alias("potential_revenue"))
.group_by("brand")
.agg([
pl.len().alias("products"),
pl.col("potential_revenue").sum().alias("total_potential_revenue"),
])
.sort("total_potential_revenue", descending=True)
)
print(brand_revenue)
Step 5: Bulk Inventory Updates¶
When inventory changes, update Redis efficiently:
# Simulate inventory update from warehouse system
inventory_updates = pl.DataFrame({
"sku": ["LAPTOP-001", "PHONE-001", "TABLET-001"],
"quantity": [45, 180, 100], # New quantities
})
# Write updates (replace mode overwrites existing fields)
redis.write_hashes(
inventory_updates,
url,
key_column="sku",
key_prefix="product:",
write_mode="replace",
)
print("Inventory updated")
# Verify
updated = redis.search_hashes(
url,
index="products_idx",
query=col("sku").has_any_tag(["LAPTOP-001", "PHONE-001", "TABLET-001"]),
schema={"name": pl.Utf8, "quantity": pl.Int64},
).collect()
print(updated)
Step 6: Low Stock Alerts¶
Find products that need reordering:
# Products with less than 100 units
low_stock = redis.search_hashes(
url,
index="products_idx",
query=col("quantity") < 100,
schema={
"name": pl.Utf8,
"category": pl.Utf8,
"quantity": pl.Int64,
},
sort_by="quantity",
sort_ascending=True,
).collect()
print("Low stock alerts:")
print(low_stock)
# Write alerts to a separate key for monitoring
if len(low_stock) > 0:
redis.write_hashes(
low_stock.with_columns(pl.lit("low_stock").alias("alert_type")),
url,
key_prefix="alert:stock:",
ttl=3600, # Expire after 1 hour
)
Step 7: Smart Scan (Auto Index Detection)¶
If you're not sure whether an index exists, use smart_scan:
from polars_redis import smart_scan, explain_scan
# Check execution plan
plan = explain_scan(url, "product:*", schema={"name": pl.Utf8})
print(plan.explain())
# Strategy: SEARCH
# Index: products_idx
# Prefixes: product:
# Type: HASH
# Smart scan auto-uses the index
df = smart_scan(
url,
"product:*",
schema={"name": pl.Utf8, "price": pl.Float64},
).filter(pl.col("price") > 500).collect()
Step 8: Caching Expensive Queries¶
Cache aggregation results for dashboards:
@redis.cache(url=url, ttl=300, compression="zstd")
def get_category_dashboard():
"""Expensive aggregation cached for 5 minutes."""
return redis.aggregate_hashes(
url,
index="products_idx",
query="*",
group_by=["@category"],
reduce=[
("COUNT", [], "products"),
("AVG", ["@price"], "avg_price"),
("SUM", ["@quantity"], "inventory"),
],
)
# First call: computes and caches
dashboard = get_category_dashboard()
# Subsequent calls: instant cache hit
dashboard = get_category_dashboard()
Complete Script¶
Here's the full example in one script:
import polars as pl
import polars_redis as redis
from polars_redis import col, Index, TextField, NumericField, TagField
url = "redis://localhost:6379"
# 1. Load data
products = pl.DataFrame({
"sku": ["LAPTOP-001", "LAPTOP-002", "PHONE-001", "PHONE-002", "TABLET-001"],
"name": ["Pro Laptop 15", "Budget Laptop 14", "Flagship Phone", "Mid-range Phone", "Pro Tablet 12"],
"category": ["laptops", "laptops", "phones", "phones", "tablets"],
"brand": ["TechCorp", "ValueBrand", "TechCorp", "ValueBrand", "TechCorp"],
"price": [1299.99, 599.99, 999.99, 449.99, 799.99],
"quantity": [50, 120, 200, 350, 75],
"rating": [4.5, 4.0, 4.7, 4.2, 4.6],
})
redis.write_hashes(products, url, key_column="sku", key_prefix="product:")
# 2. Create index
Index(
name="products_idx",
prefix="product:",
schema=[
TextField("name", sortable=True),
TagField("category"),
TagField("brand"),
NumericField("price", sortable=True),
NumericField("quantity", sortable=True),
NumericField("rating", sortable=True),
]
).ensure_exists(url)
# 3. Query
affordable = redis.search_hashes(
url,
index="products_idx",
query=(col("price") < 800) & (col("rating") >= 4.0),
schema={"name": pl.Utf8, "price": pl.Float64, "rating": pl.Float64},
).collect()
print("Affordable highly-rated products:")
print(affordable)
# 4. Aggregate
stats = redis.aggregate_hashes(
url,
index="products_idx",
query="*",
group_by=["@category"],
reduce=[("COUNT", [], "count"), ("AVG", ["@price"], "avg_price")],
)
print("\nCategory stats:")
print(stats)
Key Takeaways¶
-
Write once, query many ways - Load data with
write_hashes, query withscan_hashesorsearch_hashes -
Index for performance - Create RediSearch indexes for server-side filtering on large datasets
-
Use the query builder - Polars-like syntax is more readable than raw RediSearch queries
-
Aggregate server-side - Use
aggregate_hashesfor analytics to minimize data transfer -
Cache expensive queries - The
@cachedecorator memoizes results automatically -
Smart scan for flexibility - Auto-detects indexes so code works with or without them
Next Steps¶
- Add geospatial search for store locations: Client Operations
- Set up real-time inventory updates: Pub/Sub
- Learn more about indexing: Index Management