Skip to content

Aggregation

aggregate_hashes() uses RediSearch's FT.AGGREGATE for server-side grouping and aggregation. Only aggregated results transfer - not raw documents.

Setup

Examples on this page use the test data from RediSearch Overview.

Basic Aggregation

import polars_redis as pr

url = "redis://localhost:6379"

# Count employees by department
df = pr.aggregate_hashes(
    url,
    index="employees_idx",
    query="*",
    group_by=["@department"],
    reduce=[("COUNT", [], "employee_count")],
)

print(df)
# shape: (3, 2)
# +-------------+----------------+
# | department  | employee_count |
# | ---         | ---            |
# | str         | i64            |
# +=============+================+
# | engineering | 4              |
# | product     | 2              |
# | marketing   | 2              |
# +-------------+----------------+

Multiple Reduce Functions

# Salary statistics by department
df = pr.aggregate_hashes(
    url,
    index="employees_idx",
    query="@status:{active}",  # Filter before aggregating
    group_by=["@department"],
    reduce=[
        ("COUNT", [], "headcount"),
        ("AVG", ["@salary"], "avg_salary"),
        ("MIN", ["@salary"], "min_salary"),
        ("MAX", ["@salary"], "max_salary"),
        ("SUM", ["@salary"], "total_payroll"),
    ],
)

Global Aggregation

Aggregate without grouping for overall statistics:

# Company-wide stats
df = pr.aggregate_hashes(
    url,
    index="employees_idx",
    query="@status:{active}",
    reduce=[
        ("COUNT", [], "total_employees"),
        ("AVG", ["@salary"], "company_avg_salary"),
        ("AVG", ["@age"], "avg_age"),
    ],
)

print(df)
# shape: (1, 3)
# +-----------------+--------------------+---------+
# | total_employees | company_avg_salary | avg_age |
# | ---             | ---                | ---     |
# | i64             | f64                | f64     |
# +=================+====================+=========+
# | 6               | 114166.67          | 37.33   |
# +-----------------+--------------------+---------+

Computed Fields with APPLY

Create derived fields using expressions:

df = pr.aggregate_hashes(
    url,
    index="employees_idx",
    query="*",
    group_by=["@department"],
    reduce=[
        ("SUM", ["@salary"], "total_salary"),
        ("COUNT", [], "count"),
    ],
    apply=[
        ("@total_salary / @count", "calculated_avg"),
    ],
)

Post-Aggregation Filtering

Filter results after aggregation (like SQL's HAVING):

# Only departments with more than 2 employees
df = pr.aggregate_hashes(
    url,
    index="employees_idx",
    query="*",
    group_by=["@department"],
    reduce=[("COUNT", [], "count")],
    filter_expr="@count > 2",
)

Sorting and Pagination

# Top 3 departments by average salary
df = pr.aggregate_hashes(
    url,
    index="employees_idx",
    query="*",
    group_by=["@department"],
    reduce=[("AVG", ["@salary"], "avg_salary")],
    sort_by=[("@avg_salary", False)],  # False = descending
    limit=3,
    offset=0,
)

Loading Additional Fields

Load specific fields before aggregation:

df = pr.aggregate_hashes(
    url,
    index="employees_idx",
    query="*",
    load=["@name", "@salary"],  # Load these fields
    group_by=["@department"],
    reduce=[
        ("FIRST_VALUE", ["@name"], "sample_employee"),
        ("AVG", ["@salary"], "avg_salary"),
    ],
)

Parameters Reference

Parameter Type Default Description
url str required Redis connection URL
index str required RediSearch index name
query str "*" Filter query before aggregation
group_by list None Fields to group by (with @ prefix)
reduce list None Reduce operations as (function, args, alias) tuples
apply list None Computed expressions as (expression, alias) tuples
filter_expr str None Post-aggregation filter
sort_by list None Sort by (field, ascending) tuples
limit int None Maximum results
offset int 0 Skip first N results
load list None Fields to load before aggregation

Reduce Functions Reference

Function Args Description
COUNT [] Count documents in group
SUM ["@field"] Sum of numeric field
AVG ["@field"] Average of numeric field
MIN ["@field"] Minimum value
MAX ["@field"] Maximum value
FIRST_VALUE ["@field"] First value in group
TOLIST ["@field"] Collect all values as list
QUANTILE ["@field", "0.5"] Quantile (0.5 = median)
STDDEV ["@field"] Standard deviation
COUNT_DISTINCT ["@field"] Count unique values
COUNT_DISTINCTISH ["@field"] Approximate distinct count (faster)

Example: Analytics Dashboard

import polars as pl
import polars_redis as pr
from polars_redis import col

url = "redis://localhost:6379"

# 1. Filter: active engineers
engineers = pr.search_hashes(
    url,
    index="employees_idx",
    query=(col("department") == "engineering") & (col("status") == "active"),
    schema={"name": pl.Utf8, "age": pl.Int64, "salary": pl.Float64},
).collect()

# 2. Aggregate: salary distribution by department
salary_stats = pr.aggregate_hashes(
    url,
    index="employees_idx",
    query="@status:{active}",
    group_by=["@department"],
    reduce=[
        ("COUNT", [], "headcount"),
        ("AVG", ["@salary"], "avg_salary"),
        ("STDDEV", ["@salary"], "salary_stddev"),
        ("MIN", ["@salary"], "min_salary"),
        ("MAX", ["@salary"], "max_salary"),
    ],
    sort_by=[("@headcount", False)],
)

# 3. Aggregate: age distribution
age_stats = pr.aggregate_hashes(
    url,
    index="employees_idx",
    query="*",
    reduce=[
        ("AVG", ["@age"], "avg_age"),
        ("MIN", ["@age"], "youngest"),
        ("MAX", ["@age"], "oldest"),
        ("QUANTILE", ["@age", "0.5"], "median_age"),
    ],
)

print("Engineers:", engineers)
print("Salary by dept:", salary_stats)
print("Age stats:", age_stats)