Skip to content

Rust Examples

Example code is in examples/rust/.

Run examples with:

# Scan examples
cargo run --example scan_hashes
cargo run --example scan_json
cargo run --example scan_strings
cargo run --example schema_inference

# Search examples (requires Redis Stack)
cargo run --example search_example

# Write examples
cargo run --example write_hashes
cargo run --example write_json
cargo run --example write_strings

RediSearch: Server-Side Filtering

This example demonstrates RediSearch integration for server-side filtering with HashSearchIterator:

//! Example: RediSearch integration with polars-redis.
//!
//! This example demonstrates how to use the Rust API for server-side
//! filtering and aggregation with RediSearch.
//!
//! Run with: cargo run --example search_example
//!
//! Prerequisites:
//!     - Redis Stack running on localhost:6379
//!     - RediSearch module loaded (comes with Redis Stack)

use polars_redis::{HashSchema, HashSearchIterator, RedisType, SearchBatchConfig};
use redis::Commands;
use std::env;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());

    println!("Connecting to: {}", url);
    println!();

    // Set up sample data and index
    setup_sample_data(&url)?;

    // =========================================================================
    // Example 1: Basic Search with Query String
    // =========================================================================
    println!("=== Example 1: Basic Search (age > 30) ===\n");

    let schema = HashSchema::new(vec![
        ("name".to_string(), RedisType::Utf8),
        ("age".to_string(), RedisType::Int64),
        ("department".to_string(), RedisType::Utf8),
        ("salary".to_string(), RedisType::Float64),
    ])
    .with_key(true)
    .with_key_column_name("_key");

    // Use RediSearch query syntax
    let config = SearchBatchConfig::new("employees_idx", "@age:[30 +inf]").with_batch_size(100);

    let mut iterator = HashSearchIterator::new(&url, schema.clone(), config, None)?;

    let mut total_rows = 0;
    while let Some(batch) = iterator.next_batch()? {
        total_rows += batch.num_rows();
        println!("Batch: {} rows", batch.num_rows());

        // Print column names
        if total_rows == batch.num_rows() {
            println!(
                "Columns: {:?}",
                batch
                    .schema()
                    .fields()
                    .iter()
                    .map(|f| f.name())
                    .collect::<Vec<_>>()
            );
        }
    }
    println!("Total matching: {} employees over 30\n", total_rows);

    // =========================================================================
    // Example 2: Tag Query (department filter)
    // =========================================================================
    println!("=== Example 2: Tag Query (engineering department) ===\n");

    let config =
        SearchBatchConfig::new("employees_idx", "@department:{engineering}").with_batch_size(100);

    let mut iterator = HashSearchIterator::new(&url, schema.clone(), config, None)?;

    let mut total_rows = 0;
    while let Some(batch) = iterator.next_batch()? {
        total_rows += batch.num_rows();
    }
    println!("Found {} engineers\n", total_rows);

    // =========================================================================
    // Example 3: Combined Query (age AND department)
    // =========================================================================
    println!("=== Example 3: Combined Query (age > 30 AND active) ===\n");

    // Combine conditions: age > 30 AND status is active
    let config = SearchBatchConfig::new("employees_idx", "@age:[30 +inf] @status:{active}")
        .with_batch_size(100);

    let mut iterator = HashSearchIterator::new(&url, schema.clone(), config, None)?;

    let mut total_rows = 0;
    while let Some(batch) = iterator.next_batch()? {
        total_rows += batch.num_rows();
    }
    println!("Found {} active employees over 30\n", total_rows);

    // =========================================================================
    // Example 4: Sorted Search
    // =========================================================================
    println!("=== Example 4: Sorted Search (by salary descending) ===\n");

    let config = SearchBatchConfig::new("employees_idx", "@status:{active}")
        .with_batch_size(100)
        .with_sort_by("salary", false); // false = descending

    let mut iterator = HashSearchIterator::new(&url, schema.clone(), config, None)?;

    let mut total_rows = 0;
    while let Some(batch) = iterator.next_batch()? {
        total_rows += batch.num_rows();
        println!("Batch: {} rows (sorted by salary DESC)", batch.num_rows());
    }
    println!("Total: {} active employees\n", total_rows);

    // =========================================================================
    // Example 5: Projection Pushdown (select specific fields)
    // =========================================================================
    println!("=== Example 5: Projection (name and salary only) ===\n");

    let config = SearchBatchConfig::new("employees_idx", "*").with_batch_size(100);

    // Only request specific fields
    let projection = Some(vec!["name".to_string(), "salary".to_string()]);

    let mut iterator = HashSearchIterator::new(&url, schema.clone(), config, projection)?;

    if let Some(batch) = iterator.next_batch()? {
        println!(
            "Returned columns: {:?}",
            batch
                .schema()
                .fields()
                .iter()
                .map(|f| f.name())
                .collect::<Vec<_>>()
        );
        println!("Rows: {}\n", batch.num_rows());
    }

    // =========================================================================
    // Example 6: Negation Query
    // =========================================================================
    println!("=== Example 6: Negation (NOT inactive) ===\n");

    // Find employees who are NOT inactive
    let config =
        SearchBatchConfig::new("employees_idx", "-@status:{inactive}").with_batch_size(100);

    let mut iterator = HashSearchIterator::new(&url, schema.clone(), config, None)?;

    let mut total_rows = 0;
    while let Some(batch) = iterator.next_batch()? {
        total_rows += batch.num_rows();
    }
    println!("Found {} non-inactive employees\n", total_rows);

    // =========================================================================
    // Example 7: OR Query
    // =========================================================================
    println!("=== Example 7: OR Query (engineering OR product) ===\n");

    // Find employees in engineering OR product departments
    let config = SearchBatchConfig::new("employees_idx", "@department:{engineering|product}")
        .with_batch_size(100);

    let mut iterator = HashSearchIterator::new(&url, schema, config, None)?;

    let mut total_rows = 0;
    while let Some(batch) = iterator.next_batch()? {
        total_rows += batch.num_rows();
    }
    println!("Found {} employees in engineering or product\n", total_rows);

    println!("=== All search examples completed! ===");

    Ok(())
}

/// Set up sample data and RediSearch index for examples.
fn setup_sample_data(url: &str) -> Result<(), Box<dyn std::error::Error>> {
    let client = redis::Client::open(url)?;
    let mut conn = client.get_connection()?;

    // Clear existing data
    let keys: Vec<String> = redis::cmd("KEYS").arg("employee:*").query(&mut conn)?;
    for key in keys {
        let _: () = conn.del(&key)?;
    }

    // Create sample employees
    let employees = vec![
        ("1", "Alice", "32", "engineering", "120000", "active"),
        ("2", "Bob", "28", "engineering", "95000", "active"),
        ("3", "Carol", "45", "product", "140000", "active"),
        ("4", "Dave", "35", "product", "110000", "inactive"),
        ("5", "Eve", "29", "marketing", "85000", "active"),
        ("6", "Frank", "52", "engineering", "150000", "active"),
        ("7", "Grace", "38", "marketing", "95000", "active"),
        ("8", "Henry", "41", "engineering", "130000", "inactive"),
    ];

    for (id, name, age, dept, salary, status) in employees {
        let _: () = redis::cmd("HSET")
            .arg(format!("employee:{}", id))
            .arg("name")
            .arg(name)
            .arg("age")
            .arg(age)
            .arg("department")
            .arg(dept)
            .arg("salary")
            .arg(salary)
            .arg("status")
            .arg(status)
            .query(&mut conn)?;
    }

    // Drop existing index if it exists
    let _: Result<(), redis::RedisError> = redis::cmd("FT.DROPINDEX")
        .arg("employees_idx")
        .query(&mut conn);

    // Create RediSearch index
    redis::cmd("FT.CREATE")
        .arg("employees_idx")
        .arg("ON")
        .arg("HASH")
        .arg("PREFIX")
        .arg("1")
        .arg("employee:")
        .arg("SCHEMA")
        .arg("name")
        .arg("TEXT")
        .arg("SORTABLE")
        .arg("age")
        .arg("NUMERIC")
        .arg("SORTABLE")
        .arg("department")
        .arg("TAG")
        .arg("salary")
        .arg("NUMERIC")
        .arg("SORTABLE")
        .arg("status")
        .arg("TAG")
        .query(&mut conn)?;

    // Wait briefly for index to be ready
    std::thread::sleep(std::time::Duration::from_millis(100));

    println!("Created 8 employees and RediSearch index\n");

    Ok(())
}

Scan Hashes

Comprehensive example covering projection, batching, and row limits:

//! Example: Scanning Redis hashes with polars-redis.
//!
//! This example demonstrates how to use the Rust API directly to scan
//! Redis hashes and convert them to Arrow RecordBatches.
//!
//! Run with: cargo run --example scan_hashes
//!
//! Prerequisites:
//!     - Redis running on localhost:6379
//!     - Sample data loaded (run setup_sample_data.py first)

use polars_redis::{BatchConfig, HashBatchIterator, HashSchema, RedisType};
use std::env;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Allow override via environment variable
    let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());

    println!("Connecting to: {}", url);
    println!();

    // =========================================================================
    // Example 1: Basic scanning with full schema
    // =========================================================================
    println!("=== Example 1: Basic Hash Scanning ===\n");

    // Define the schema for user hashes
    // This tells the iterator what fields to expect and how to convert them
    let schema = HashSchema::new(vec![
        ("name".to_string(), RedisType::Utf8),
        ("email".to_string(), RedisType::Utf8),
        ("age".to_string(), RedisType::Int64),
        ("score".to_string(), RedisType::Float64),
        ("active".to_string(), RedisType::Boolean),
    ])
    .with_key(true) // Include the Redis key as a column
    .with_key_column_name("_key"); // Name of the key column

    // Configure the batch iterator
    let config = BatchConfig::new("user:*") // Pattern to match keys
        .with_batch_size(50) // Rows per Arrow batch
        .with_count_hint(100); // SCAN COUNT hint to Redis

    // Create the iterator
    let mut iterator = HashBatchIterator::new(&url, schema, config, None)?;

    let mut total_rows = 0;
    let mut batch_count = 0;

    // Iterate over batches
    while let Some(batch) = iterator.next_batch()? {
        batch_count += 1;
        let num_rows = batch.num_rows();
        total_rows += num_rows;

        println!("Batch {}: {} rows", batch_count, num_rows);

        // Print schema on first batch
        if batch_count == 1 {
            println!("\nSchema:");
            for field in batch.schema().fields() {
                println!("  {:<12} : {:?}", field.name(), field.data_type());
            }
            println!();
        }
    }

    println!("Total: {} rows in {} batches\n", total_rows, batch_count);

    // =========================================================================
    // Example 2: Projection pushdown - only fetch specific fields
    // =========================================================================
    println!("=== Example 2: Projection Pushdown (name, age only) ===\n");

    // Even though we define the full schema, we only request specific columns.
    // The iterator will use HMGET instead of HGETALL, reducing data transfer.
    let schema = HashSchema::new(vec![
        ("name".to_string(), RedisType::Utf8),
        ("email".to_string(), RedisType::Utf8),
        ("age".to_string(), RedisType::Int64),
        ("score".to_string(), RedisType::Float64),
        ("active".to_string(), RedisType::Boolean),
    ])
    .with_key(false); // Don't include key this time

    let config = BatchConfig::new("user:*").with_batch_size(100);

    // Request only these columns - this triggers HMGET optimization
    let projection = Some(vec!["name".to_string(), "age".to_string()]);

    let mut iterator = HashBatchIterator::new(&url, schema, config, projection)?;

    if let Some(batch) = iterator.next_batch()? {
        println!("Projected columns (only these were fetched from Redis):");
        for field in batch.schema().fields() {
            println!("  {}", field.name());
        }
        println!("\nFirst batch: {} rows", batch.num_rows());
    }
    println!();

    // =========================================================================
    // Example 3: Row limit - stop early
    // =========================================================================
    println!("=== Example 3: Row Limit (max 10 rows) ===\n");

    let schema = HashSchema::new(vec![("name".to_string(), RedisType::Utf8)]).with_key(true);

    let config = BatchConfig::new("user:*")
        .with_batch_size(100)
        .with_max_rows(10); // Stop after 10 rows total

    let mut iterator = HashBatchIterator::new(&url, schema, config, None)?;

    let mut total = 0;
    while let Some(batch) = iterator.next_batch()? {
        total += batch.num_rows();
        println!("Batch: {} rows (cumulative: {})", batch.num_rows(), total);
    }
    println!("\nTotal rows with limit: {} (requested max: 10)\n", total);

    // =========================================================================
    // Example 4: Different key patterns
    // =========================================================================
    println!("=== Example 4: Different Key Patterns ===\n");

    let patterns = vec![
        "user:*",    // All users
        "user:1*",   // Users starting with 1
        "session:*", // All sessions (may not exist)
    ];

    for pattern in patterns {
        let schema = HashSchema::new(vec![("name".to_string(), RedisType::Utf8)]);
        let config = BatchConfig::new(pattern).with_batch_size(1000);

        let mut iterator = HashBatchIterator::new(&url, schema.clone(), config, None)?;

        let mut count = 0;
        while let Some(batch) = iterator.next_batch()? {
            count += batch.num_rows();
        }

        println!("Pattern '{}': {} keys found", pattern, count);
    }
    println!();

    // =========================================================================
    // Example 5: Serialize batch to Arrow IPC (for Python interop)
    // =========================================================================
    println!("=== Example 5: Arrow IPC Serialization ===\n");

    let schema = HashSchema::new(vec![
        ("name".to_string(), RedisType::Utf8),
        ("age".to_string(), RedisType::Int64),
    ])
    .with_key(true);

    let config = BatchConfig::new("user:*")
        .with_batch_size(10)
        .with_max_rows(10);

    let mut iterator = HashBatchIterator::new(&url, schema, config, None)?;

    if let Some(batch) = iterator.next_batch()? {
        // Serialize to Arrow IPC format (what we'd send to Python)
        let ipc_bytes = polars_redis::batch_to_ipc(&batch)?;
        println!(
            "Serialized {} rows to {} bytes of Arrow IPC",
            batch.num_rows(),
            ipc_bytes.len()
        );

        // This is what Python would do:
        // df = pl.read_ipc(io.BytesIO(ipc_bytes))
    }

    println!("\n=== All examples complete ===");
    Ok(())
}

Scan JSON

Working with RedisJSON documents:

//! Example: Scanning Redis JSON documents with polars-redis.
//!
//! This example demonstrates how to use the Rust API directly to scan
//! RedisJSON documents and convert them to Arrow RecordBatches.
//!
//! Run with: cargo run --example scan_json
//!
//! Prerequisites:
//!     - Redis 8+ running on localhost:6379 (has native JSON support)
//!     - Sample data loaded (run setup_sample_data.py first)

use arrow::datatypes::DataType;
use polars_redis::{BatchConfig, JsonBatchIterator, JsonSchema};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let url = "redis://localhost:6379";

    // Define the schema for product JSON documents
    let schema = JsonSchema::new(vec![
        ("name".to_string(), DataType::Utf8),
        ("category".to_string(), DataType::Utf8),
        ("price".to_string(), DataType::Float64),
        ("quantity".to_string(), DataType::Int64),
        ("in_stock".to_string(), DataType::Boolean),
    ])
    .with_key(true)
    .with_key_column_name("_key");

    // Configure the batch iterator
    let config = BatchConfig::new("product:*")
        .with_batch_size(10)
        .with_count_hint(50);

    // Create the iterator
    let mut iterator = JsonBatchIterator::new(url, schema, config, None)?;

    println!("=== Scanning Redis JSON Documents ===\n");

    let mut total_rows = 0;
    let mut batch_count = 0;

    // Iterate over batches
    while let Some(batch) = iterator.next_batch()? {
        batch_count += 1;
        let num_rows = batch.num_rows();
        total_rows += num_rows;

        println!("Batch {}: {} rows", batch_count, num_rows);

        // Print schema on first batch
        if batch_count == 1 {
            println!("\nSchema:");
            for field in batch.schema().fields() {
                println!("  {} : {:?}", field.name(), field.data_type());
            }
            println!();
        }
    }

    println!("\nTotal: {} rows in {} batches", total_rows, batch_count);

    // Example with projection
    println!("\n=== With Projection (name, price only) ===\n");

    let schema = JsonSchema::new(vec![
        ("name".to_string(), DataType::Utf8),
        ("category".to_string(), DataType::Utf8),
        ("price".to_string(), DataType::Float64),
        ("quantity".to_string(), DataType::Int64),
        ("in_stock".to_string(), DataType::Boolean),
    ])
    .with_key(false);

    let config = BatchConfig::new("product:*").with_batch_size(50);

    let projection = Some(vec!["name".to_string(), "price".to_string()]);
    let mut iterator = JsonBatchIterator::new(url, schema, config, projection)?;

    if let Some(batch) = iterator.next_batch()? {
        println!("Projected columns:");
        for field in batch.schema().fields() {
            println!("  {}", field.name());
        }
        println!("\nFirst batch: {} rows", batch.num_rows());
    }

    // Example: Access column data
    println!("\n=== Accessing Column Data ===\n");

    let schema = JsonSchema::new(vec![
        ("name".to_string(), DataType::Utf8),
        ("price".to_string(), DataType::Float64),
    ])
    .with_key(true);

    let config = BatchConfig::new("product:*").with_batch_size(100);

    let mut iterator = JsonBatchIterator::new(url, schema, config, None)?;

    if let Some(batch) = iterator.next_batch()? {
        // Access the price column
        let price_col = batch
            .column_by_name("price")
            .expect("price column should exist");

        let prices = price_col
            .as_any()
            .downcast_ref::<arrow::array::Float64Array>()
            .expect("should be Float64Array");

        let total: f64 = prices.iter().flatten().sum();
        let count = prices.len();

        println!("Products: {}", count);
        println!("Total price sum: ${:.2}", total);
        println!("Average price: ${:.2}", total / count as f64);
    }

    Ok(())
}

Scan Strings

Redis strings with different value types:

//! Example: Scanning Redis strings with polars-redis.
//!
//! This example demonstrates how to use the Rust API to scan
//! Redis string values and convert them to Arrow RecordBatches.
//!
//! Run with: cargo run --example scan_strings
//!
//! Prerequisites:
//!     - Redis running on localhost:6379
//!     - Sample data loaded (run setup_sample_data.py first)

use polars_redis::{BatchConfig, RedisType, StringBatchIterator, StringSchema};
use std::env;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());

    println!("Connecting to: {}", url);
    println!();

    // =========================================================================
    // Example 1: Scan string values as UTF-8
    // =========================================================================
    println!("=== Example 1: Scan Strings as UTF-8 ===\n");

    let schema = StringSchema::new(RedisType::Utf8)
        .with_key(true)
        .with_key_column_name("_key")
        .with_value_column_name("value");

    let config = BatchConfig::new("cache:*").with_batch_size(100);

    let mut iterator = StringBatchIterator::new(&url, schema, config)?;

    let mut total_rows = 0;
    while let Some(batch) = iterator.next_batch()? {
        total_rows += batch.num_rows();

        if total_rows <= batch.num_rows() {
            println!("Schema:");
            for field in batch.schema().fields() {
                println!("  {:<12} : {:?}", field.name(), field.data_type());
            }
            println!();
        }
    }
    println!("Total cache entries: {}\n", total_rows);

    // =========================================================================
    // Example 2: Scan counters as integers
    // =========================================================================
    println!("=== Example 2: Scan Counters as Int64 ===\n");

    let schema = StringSchema::new(RedisType::Int64)
        .with_key(true)
        .with_value_column_name("count");

    let config = BatchConfig::new("counter:*").with_batch_size(100);

    let mut iterator = StringBatchIterator::new(&url, schema, config)?;

    let mut total: i64 = 0;
    let mut count = 0;

    while let Some(batch) = iterator.next_batch()? {
        count += batch.num_rows();

        // Sum the counter values
        let value_col = batch
            .column_by_name("count")
            .expect("count column should exist");

        let values = value_col
            .as_any()
            .downcast_ref::<arrow::array::Int64Array>()
            .expect("should be Int64Array");

        total += values.iter().flatten().sum::<i64>();
    }

    println!("Counters found: {}", count);
    println!("Sum of all counters: {}\n", total);

    // =========================================================================
    // Example 3: Scan float values
    // =========================================================================
    println!("=== Example 3: Scan Float Values ===\n");

    let schema = StringSchema::new(RedisType::Float64)
        .with_key(true)
        .with_value_column_name("score");

    let config = BatchConfig::new("score:*").with_batch_size(100);

    let mut iterator = StringBatchIterator::new(&url, schema, config)?;

    let mut sum: f64 = 0.0;
    let mut count = 0;

    while let Some(batch) = iterator.next_batch()? {
        count += batch.num_rows();

        let value_col = batch
            .column_by_name("score")
            .expect("score column should exist");

        let values = value_col
            .as_any()
            .downcast_ref::<arrow::array::Float64Array>()
            .expect("should be Float64Array");

        sum += values.iter().flatten().sum::<f64>();
    }

    if count > 0 {
        println!("Scores found: {}", count);
        println!("Average score: {:.2}\n", sum / count as f64);
    } else {
        println!("No score keys found\n");
    }

    // =========================================================================
    // Example 4: Without key column
    // =========================================================================
    println!("=== Example 4: Values Only (no key column) ===\n");

    let schema = StringSchema::new(RedisType::Utf8)
        .with_key(false)
        .with_value_column_name("data");

    let config = BatchConfig::new("cache:*")
        .with_batch_size(10)
        .with_max_rows(5);

    let mut iterator = StringBatchIterator::new(&url, schema, config)?;

    if let Some(batch) = iterator.next_batch()? {
        println!("Columns (key excluded):");
        for field in batch.schema().fields() {
            println!("  {}", field.name());
        }
        println!("\nRows: {}", batch.num_rows());
    }

    println!("\n=== All examples complete ===");
    Ok(())
}

Schema Inference

Automatic schema detection from existing data:

//! Example: Schema inference with polars-redis.
//!
//! This example demonstrates how to use schema inference to automatically
//! detect field names and types from existing Redis data.
//!
//! Run with: cargo run --example schema_inference
//!
//! Prerequisites:
//!     - Redis running on localhost:6379
//!     - Sample data loaded (run setup_sample_data.py first)

use polars_redis::io::infer::{infer_hash_schema, infer_json_schema};
use std::env;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());

    println!("Connecting to: {}", url);
    println!();

    // =========================================================================
    // Example 1: Infer schema from Redis hashes
    // =========================================================================
    println!("=== Example 1: Infer Hash Schema ===\n");

    let (fields, keys_sampled) = infer_hash_schema(&url, "user:*", 100, true)?;

    println!("Sampled {} keys", keys_sampled);
    println!("Inferred fields:");
    for (name, type_str) in &fields {
        println!("  {:<15} : {}", name, type_str);
    }
    println!();

    // =========================================================================
    // Example 2: Infer schema without type inference (all Utf8)
    // =========================================================================
    println!("=== Example 2: Infer Hash Schema (no type inference) ===\n");

    let (fields, _) = infer_hash_schema(&url, "user:*", 50, false)?;

    println!("Fields (all Utf8):");
    for (name, type_str) in &fields {
        println!("  {:<15} : {}", name, type_str);
    }
    println!();

    // =========================================================================
    // Example 3: Infer schema from RedisJSON documents
    // =========================================================================
    println!("=== Example 3: Infer JSON Schema ===\n");

    let (fields, keys_sampled) = infer_json_schema(&url, "product:*", 100)?;

    println!("Sampled {} keys", keys_sampled);
    println!("Inferred fields:");
    for (name, type_str) in &fields {
        println!("  {:<15} : {}", name, type_str);
    }
    println!();

    // =========================================================================
    // Example 4: Use inferred schema to scan data
    // =========================================================================
    println!("=== Example 4: Scan Using Inferred Schema ===\n");

    use polars_redis::{BatchConfig, HashBatchIterator, HashSchema, RedisType};

    // First infer
    let (fields, _) = infer_hash_schema(&url, "user:*", 50, true)?;

    // Convert to HashSchema
    let schema_fields: Vec<(String, RedisType)> = fields
        .into_iter()
        .map(|(name, type_str)| {
            let redis_type = match type_str.as_str() {
                "int64" => RedisType::Int64,
                "float64" => RedisType::Float64,
                "bool" => RedisType::Boolean,
                "date" => RedisType::Date,
                "datetime" => RedisType::Datetime,
                _ => RedisType::Utf8,
            };
            (name, redis_type)
        })
        .collect();

    let schema = HashSchema::new(schema_fields).with_key(true);

    let config = BatchConfig::new("user:*")
        .with_batch_size(100)
        .with_max_rows(5);

    let mut iterator = HashBatchIterator::new(&url, schema, config, None)?;

    if let Some(batch) = iterator.next_batch()? {
        println!("Scanned {} rows with inferred schema", batch.num_rows());
        println!("Columns:");
        for field in batch.schema().fields() {
            println!("  {:<15} : {:?}", field.name(), field.data_type());
        }
    }

    println!("\n=== All examples complete ===");
    Ok(())
}

Write Hashes

Writing data to Redis as hashes:

//! Example: Writing hashes to Redis with polars-redis.
//!
//! This example demonstrates how to use the Rust API to write
//! data to Redis as hashes.
//!
//! Run with: cargo run --example write_hashes
//!
//! Prerequisites:
//!     - Redis running on localhost:6379

use polars_redis::io::write::{WriteMode, write_hashes};
use std::env;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());

    println!("Connecting to: {}", url);
    println!();

    // =========================================================================
    // Example 1: Basic hash writing
    // =========================================================================
    println!("=== Example 1: Basic Hash Writing ===\n");

    let keys = vec![
        "example:user:1".to_string(),
        "example:user:2".to_string(),
        "example:user:3".to_string(),
    ];

    let fields = vec!["name".to_string(), "age".to_string(), "city".to_string()];

    let values = vec![
        vec![
            Some("Alice".to_string()),
            Some("30".to_string()),
            Some("NYC".to_string()),
        ],
        vec![
            Some("Bob".to_string()),
            Some("25".to_string()),
            Some("LA".to_string()),
        ],
        vec![
            Some("Carol".to_string()),
            Some("35".to_string()),
            Some("Chicago".to_string()),
        ],
    ];

    let result = write_hashes(&url, keys, fields, values, None, WriteMode::Replace)?;

    println!("Keys written: {}", result.keys_written);
    println!("Keys failed: {}", result.keys_failed);
    println!("Keys skipped: {}", result.keys_skipped);
    println!();

    // =========================================================================
    // Example 2: Writing with TTL
    // =========================================================================
    println!("=== Example 2: Writing with TTL ===\n");

    let keys = vec![
        "example:session:1".to_string(),
        "example:session:2".to_string(),
    ];

    let fields = vec!["user_id".to_string(), "token".to_string()];

    let values = vec![
        vec![Some("user:1".to_string()), Some("abc123".to_string())],
        vec![Some("user:2".to_string()), Some("def456".to_string())],
    ];

    // TTL of 60 seconds
    let result = write_hashes(&url, keys, fields, values, Some(60), WriteMode::Replace)?;

    println!("Keys written with 60s TTL: {}", result.keys_written);
    println!();

    // =========================================================================
    // Example 3: Fail mode (skip existing keys)
    // =========================================================================
    println!("=== Example 3: Fail Mode (Skip Existing) ===\n");

    let keys = vec![
        "example:user:1".to_string(), // Already exists from Example 1
        "example:user:4".to_string(), // New key
    ];

    let fields = vec!["name".to_string(), "age".to_string()];

    let values = vec![
        vec![Some("Alice Updated".to_string()), Some("31".to_string())],
        vec![Some("Dave".to_string()), Some("28".to_string())],
    ];

    let result = write_hashes(&url, keys, fields, values, None, WriteMode::Fail)?;

    println!("Keys written: {}", result.keys_written);
    println!("Keys skipped (already existed): {}", result.keys_skipped);
    println!();

    // =========================================================================
    // Example 4: Append mode (merge fields)
    // =========================================================================
    println!("=== Example 4: Append Mode (Merge Fields) ===\n");

    let keys = vec!["example:user:1".to_string()];

    // Add new field to existing hash
    let fields = vec!["email".to_string()];
    let values = vec![vec![Some("alice@example.com".to_string())]];

    let result = write_hashes(&url, keys, fields, values, None, WriteMode::Append)?;

    println!("Keys updated with new field: {}", result.keys_written);
    println!();

    // =========================================================================
    // Example 5: Handling null values
    // =========================================================================
    println!("=== Example 5: Handling Null Values ===\n");

    let keys = vec!["example:partial:1".to_string()];

    let fields = vec![
        "required".to_string(),
        "optional".to_string(),
        "another".to_string(),
    ];

    // Some values are None - these fields will not be written
    let values = vec![vec![
        Some("present".to_string()),
        None, // This field will be skipped
        Some("also present".to_string()),
    ]];

    let result = write_hashes(&url, keys, fields, values, None, WriteMode::Replace)?;

    println!("Keys written: {}", result.keys_written);
    println!("(null values are skipped, only non-null fields written)");
    println!();

    // Cleanup
    println!("=== Cleanup ===\n");
    cleanup_example_keys(&url)?;
    println!("Example keys deleted");

    println!("\n=== All examples complete ===");
    Ok(())
}

fn cleanup_example_keys(url: &str) -> Result<(), Box<dyn std::error::Error>> {
    let client = redis::Client::open(url)?;
    let mut conn = client.get_connection()?;

    let keys: Vec<String> = redis::cmd("KEYS")
        .arg("example:*")
        .query(&mut conn)
        .unwrap_or_default();

    if !keys.is_empty() {
        redis::cmd("DEL").arg(&keys).query::<()>(&mut conn).ok();
    }

    Ok(())
}

Write JSON

Writing data to Redis as JSON documents:

//! Example: Writing JSON documents to Redis with polars-redis.
//!
//! This example demonstrates how to use the Rust API to write
//! data to Redis as JSON documents.
//!
//! Run with: cargo run --example write_json
//!
//! Prerequisites:
//!     - Redis running on localhost:6379 with RedisJSON module

use polars_redis::io::write::{WriteMode, write_json};
use std::env;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());

    println!("Connecting to: {}", url);
    println!();

    // =========================================================================
    // Example 1: Basic JSON writing
    // =========================================================================
    println!("=== Example 1: Basic JSON Writing ===\n");

    let keys = vec![
        "example:doc:1".to_string(),
        "example:doc:2".to_string(),
        "example:doc:3".to_string(),
    ];

    let json_strings = vec![
        r#"{"title": "Hello World", "views": 100, "published": true}"#.to_string(),
        r#"{"title": "Getting Started", "views": 250, "published": true}"#.to_string(),
        r#"{"title": "Draft Post", "views": 0, "published": false}"#.to_string(),
    ];

    let result = write_json(&url, keys, json_strings, None, WriteMode::Replace)?;

    println!("Keys written: {}", result.keys_written);
    println!("Keys failed: {}", result.keys_failed);
    println!("Keys skipped: {}", result.keys_skipped);
    println!();

    // =========================================================================
    // Example 2: Writing with TTL
    // =========================================================================
    println!("=== Example 2: Writing with TTL ===\n");

    let keys = vec!["example:cache:1".to_string(), "example:cache:2".to_string()];

    let json_strings = vec![
        r#"{"data": "cached value 1", "timestamp": 1704067200}"#.to_string(),
        r#"{"data": "cached value 2", "timestamp": 1704067200}"#.to_string(),
    ];

    // TTL of 300 seconds (5 minutes)
    let result = write_json(&url, keys, json_strings, Some(300), WriteMode::Replace)?;

    println!("Keys written with 5min TTL: {}", result.keys_written);
    println!();

    // =========================================================================
    // Example 3: Fail mode (skip existing keys)
    // =========================================================================
    println!("=== Example 3: Fail Mode (Skip Existing) ===\n");

    let keys = vec![
        "example:doc:1".to_string(), // Already exists from Example 1
        "example:doc:4".to_string(), // New key
    ];

    let json_strings = vec![
        r#"{"title": "Updated Title", "views": 999}"#.to_string(),
        r#"{"title": "New Document", "views": 50}"#.to_string(),
    ];

    let result = write_json(&url, keys, json_strings, None, WriteMode::Fail)?;

    println!("Keys written: {}", result.keys_written);
    println!("Keys skipped (already existed): {}", result.keys_skipped);
    println!();

    // =========================================================================
    // Example 4: Nested JSON structures
    // =========================================================================
    println!("=== Example 4: Nested JSON Structures ===\n");

    let keys = vec!["example:complex:1".to_string()];

    let json_strings = vec![
        r#"{
        "user": {
            "name": "Alice",
            "email": "alice@example.com"
        },
        "tags": ["rust", "redis", "polars"],
        "metadata": {
            "created": "2024-01-15",
            "version": 1
        }
    }"#
        .to_string(),
    ];

    let result = write_json(&url, keys, json_strings, None, WriteMode::Replace)?;

    println!("Keys written: {}", result.keys_written);
    println!("(nested structures are stored as-is)");
    println!();

    // =========================================================================
    // Example 5: Programmatic JSON construction
    // =========================================================================
    println!("=== Example 5: Programmatic JSON Construction ===\n");

    use std::collections::HashMap;

    let mut data: Vec<(String, String)> = Vec::new();

    for i in 1..=5 {
        let key = format!("example:product:{}", i);
        let mut product: HashMap<&str, serde_json::Value> = HashMap::new();
        product.insert("id", serde_json::json!(i));
        product.insert("name", serde_json::json!(format!("Product {}", i)));
        product.insert("price", serde_json::json!(i as f64 * 9.99));
        product.insert("in_stock", serde_json::json!(i % 2 == 0));

        let json_str = serde_json::to_string(&product)?;
        data.push((key, json_str));
    }

    let (keys, json_strings): (Vec<_>, Vec<_>) = data.into_iter().unzip();

    let result = write_json(&url, keys, json_strings, None, WriteMode::Replace)?;

    println!("Keys written: {}", result.keys_written);
    println!();

    // Cleanup
    println!("=== Cleanup ===\n");
    cleanup_example_keys(&url)?;
    println!("Example keys deleted");

    println!("\n=== All examples complete ===");
    Ok(())
}

fn cleanup_example_keys(url: &str) -> Result<(), Box<dyn std::error::Error>> {
    let client = redis::Client::open(url)?;
    let mut conn = client.get_connection()?;

    let keys: Vec<String> = redis::cmd("KEYS")
        .arg("example:*")
        .query(&mut conn)
        .unwrap_or_default();

    if !keys.is_empty() {
        redis::cmd("DEL").arg(&keys).query::<()>(&mut conn).ok();
    }

    Ok(())
}

Write Strings

Writing data to Redis as strings:

//! Example: Writing strings to Redis with polars-redis.
//!
//! This example demonstrates how to use the Rust API to write
//! data to Redis as string values.
//!
//! Run with: cargo run --example write_strings
//!
//! Prerequisites:
//!     - Redis running on localhost:6379

use polars_redis::io::write::{WriteMode, write_strings};
use std::env;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());

    println!("Connecting to: {}", url);
    println!();

    // =========================================================================
    // Example 1: Basic string writing
    // =========================================================================
    println!("=== Example 1: Basic String Writing ===\n");

    let keys = vec![
        "example:counter:1".to_string(),
        "example:counter:2".to_string(),
        "example:counter:3".to_string(),
    ];

    let values = vec![
        Some("100".to_string()),
        Some("200".to_string()),
        Some("300".to_string()),
    ];

    let result = write_strings(&url, keys, values, None, WriteMode::Replace)?;

    println!("Keys written: {}", result.keys_written);
    println!("Keys failed: {}", result.keys_failed);
    println!("Keys skipped: {}", result.keys_skipped);
    println!();

    // =========================================================================
    // Example 2: Writing with TTL
    // =========================================================================
    println!("=== Example 2: Writing with TTL ===\n");

    let keys = vec!["example:temp:1".to_string(), "example:temp:2".to_string()];

    let values = vec![
        Some("temporary value 1".to_string()),
        Some("temporary value 2".to_string()),
    ];

    // TTL of 120 seconds (2 minutes)
    let result = write_strings(&url, keys, values, Some(120), WriteMode::Replace)?;

    println!("Keys written with 2min TTL: {}", result.keys_written);
    println!();

    // =========================================================================
    // Example 3: Fail mode (skip existing keys)
    // =========================================================================
    println!("=== Example 3: Fail Mode (Skip Existing) ===\n");

    let keys = vec![
        "example:counter:1".to_string(), // Already exists from Example 1
        "example:counter:4".to_string(), // New key
    ];

    let values = vec![Some("999".to_string()), Some("400".to_string())];

    let result = write_strings(&url, keys, values, None, WriteMode::Fail)?;

    println!("Keys written: {}", result.keys_written);
    println!("Keys skipped (already existed): {}", result.keys_skipped);
    println!();

    // =========================================================================
    // Example 4: Handling null values
    // =========================================================================
    println!("=== Example 4: Handling Null Values ===\n");

    let keys = vec![
        "example:value:1".to_string(),
        "example:value:2".to_string(),
        "example:value:3".to_string(),
    ];

    // Some values are None - these keys will not be written
    let values = vec![
        Some("present".to_string()),
        None, // This key will be skipped
        Some("also present".to_string()),
    ];

    let result = write_strings(&url, keys, values, None, WriteMode::Replace)?;

    println!("Keys written: {}", result.keys_written);
    println!("(null values are skipped)");
    println!();

    // =========================================================================
    // Example 5: Writing various data types as strings
    // =========================================================================
    println!("=== Example 5: Various Data Types as Strings ===\n");

    let keys = vec![
        "example:int".to_string(),
        "example:float".to_string(),
        "example:bool".to_string(),
        "example:json".to_string(),
    ];

    let values = vec![
        Some("42".to_string()),
        Some("3.14159".to_string()),
        Some("true".to_string()),
        Some(r#"{"nested": "data"}"#.to_string()),
    ];

    let result = write_strings(&url, keys, values, None, WriteMode::Replace)?;

    println!("Keys written: {}", result.keys_written);
    println!("(all values stored as strings, can be parsed on read)");
    println!();

    // =========================================================================
    // Example 6: Batch writing
    // =========================================================================
    println!("=== Example 6: Batch Writing ===\n");

    let count = 100;
    let keys: Vec<String> = (0..count).map(|i| format!("example:batch:{}", i)).collect();
    let values: Vec<Option<String>> = (0..count).map(|i| Some(format!("value_{}", i))).collect();

    let result = write_strings(&url, keys, values, None, WriteMode::Replace)?;

    println!("Batch size: {}", count);
    println!("Keys written: {}", result.keys_written);
    println!("(writes are automatically pipelined for performance)");
    println!();

    // Cleanup
    println!("=== Cleanup ===\n");
    cleanup_example_keys(&url)?;
    println!("Example keys deleted");

    println!("\n=== All examples complete ===");
    Ok(())
}

fn cleanup_example_keys(url: &str) -> Result<(), Box<dyn std::error::Error>> {
    let client = redis::Client::open(url)?;
    let mut conn = client.get_connection()?;

    let keys: Vec<String> = redis::cmd("KEYS")
        .arg("example:*")
        .query(&mut conn)
        .unwrap_or_default();

    if !keys.is_empty() {
        redis::cmd("DEL").arg(&keys).query::<()>(&mut conn).ok();
    }

    Ok(())
}