Rust Examples¶
Example code is in examples/rust/.
Run examples with:
# Scan examples
cargo run --example scan_hashes
cargo run --example scan_json
cargo run --example scan_strings
cargo run --example schema_inference
# Search examples (requires Redis Stack)
cargo run --example search_example
# Write examples
cargo run --example write_hashes
cargo run --example write_json
cargo run --example write_strings
RediSearch: Server-Side Filtering¶
This example demonstrates RediSearch integration for server-side filtering with HashSearchIterator:
//! Example: RediSearch integration with polars-redis.
//!
//! This example demonstrates how to use the Rust API for server-side
//! filtering and aggregation with RediSearch.
//!
//! Run with: cargo run --example search_example
//!
//! Prerequisites:
//! - Redis Stack running on localhost:6379
//! - RediSearch module loaded (comes with Redis Stack)
use polars_redis::{HashSchema, HashSearchIterator, RedisType, SearchBatchConfig};
use redis::Commands;
use std::env;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
println!("Connecting to: {}", url);
println!();
// Set up sample data and index
setup_sample_data(&url)?;
// =========================================================================
// Example 1: Basic Search with Query String
// =========================================================================
println!("=== Example 1: Basic Search (age > 30) ===\n");
let schema = HashSchema::new(vec![
("name".to_string(), RedisType::Utf8),
("age".to_string(), RedisType::Int64),
("department".to_string(), RedisType::Utf8),
("salary".to_string(), RedisType::Float64),
])
.with_key(true)
.with_key_column_name("_key");
// Use RediSearch query syntax
let config = SearchBatchConfig::new("employees_idx", "@age:[30 +inf]").with_batch_size(100);
let mut iterator = HashSearchIterator::new(&url, schema.clone(), config, None)?;
let mut total_rows = 0;
while let Some(batch) = iterator.next_batch()? {
total_rows += batch.num_rows();
println!("Batch: {} rows", batch.num_rows());
// Print column names
if total_rows == batch.num_rows() {
println!(
"Columns: {:?}",
batch
.schema()
.fields()
.iter()
.map(|f| f.name())
.collect::<Vec<_>>()
);
}
}
println!("Total matching: {} employees over 30\n", total_rows);
// =========================================================================
// Example 2: Tag Query (department filter)
// =========================================================================
println!("=== Example 2: Tag Query (engineering department) ===\n");
let config =
SearchBatchConfig::new("employees_idx", "@department:{engineering}").with_batch_size(100);
let mut iterator = HashSearchIterator::new(&url, schema.clone(), config, None)?;
let mut total_rows = 0;
while let Some(batch) = iterator.next_batch()? {
total_rows += batch.num_rows();
}
println!("Found {} engineers\n", total_rows);
// =========================================================================
// Example 3: Combined Query (age AND department)
// =========================================================================
println!("=== Example 3: Combined Query (age > 30 AND active) ===\n");
// Combine conditions: age > 30 AND status is active
let config = SearchBatchConfig::new("employees_idx", "@age:[30 +inf] @status:{active}")
.with_batch_size(100);
let mut iterator = HashSearchIterator::new(&url, schema.clone(), config, None)?;
let mut total_rows = 0;
while let Some(batch) = iterator.next_batch()? {
total_rows += batch.num_rows();
}
println!("Found {} active employees over 30\n", total_rows);
// =========================================================================
// Example 4: Sorted Search
// =========================================================================
println!("=== Example 4: Sorted Search (by salary descending) ===\n");
let config = SearchBatchConfig::new("employees_idx", "@status:{active}")
.with_batch_size(100)
.with_sort_by("salary", false); // false = descending
let mut iterator = HashSearchIterator::new(&url, schema.clone(), config, None)?;
let mut total_rows = 0;
while let Some(batch) = iterator.next_batch()? {
total_rows += batch.num_rows();
println!("Batch: {} rows (sorted by salary DESC)", batch.num_rows());
}
println!("Total: {} active employees\n", total_rows);
// =========================================================================
// Example 5: Projection Pushdown (select specific fields)
// =========================================================================
println!("=== Example 5: Projection (name and salary only) ===\n");
let config = SearchBatchConfig::new("employees_idx", "*").with_batch_size(100);
// Only request specific fields
let projection = Some(vec!["name".to_string(), "salary".to_string()]);
let mut iterator = HashSearchIterator::new(&url, schema.clone(), config, projection)?;
if let Some(batch) = iterator.next_batch()? {
println!(
"Returned columns: {:?}",
batch
.schema()
.fields()
.iter()
.map(|f| f.name())
.collect::<Vec<_>>()
);
println!("Rows: {}\n", batch.num_rows());
}
// =========================================================================
// Example 6: Negation Query
// =========================================================================
println!("=== Example 6: Negation (NOT inactive) ===\n");
// Find employees who are NOT inactive
let config =
SearchBatchConfig::new("employees_idx", "-@status:{inactive}").with_batch_size(100);
let mut iterator = HashSearchIterator::new(&url, schema.clone(), config, None)?;
let mut total_rows = 0;
while let Some(batch) = iterator.next_batch()? {
total_rows += batch.num_rows();
}
println!("Found {} non-inactive employees\n", total_rows);
// =========================================================================
// Example 7: OR Query
// =========================================================================
println!("=== Example 7: OR Query (engineering OR product) ===\n");
// Find employees in engineering OR product departments
let config = SearchBatchConfig::new("employees_idx", "@department:{engineering|product}")
.with_batch_size(100);
let mut iterator = HashSearchIterator::new(&url, schema, config, None)?;
let mut total_rows = 0;
while let Some(batch) = iterator.next_batch()? {
total_rows += batch.num_rows();
}
println!("Found {} employees in engineering or product\n", total_rows);
println!("=== All search examples completed! ===");
Ok(())
}
/// Set up sample data and RediSearch index for examples.
fn setup_sample_data(url: &str) -> Result<(), Box<dyn std::error::Error>> {
let client = redis::Client::open(url)?;
let mut conn = client.get_connection()?;
// Clear existing data
let keys: Vec<String> = redis::cmd("KEYS").arg("employee:*").query(&mut conn)?;
for key in keys {
let _: () = conn.del(&key)?;
}
// Create sample employees
let employees = vec![
("1", "Alice", "32", "engineering", "120000", "active"),
("2", "Bob", "28", "engineering", "95000", "active"),
("3", "Carol", "45", "product", "140000", "active"),
("4", "Dave", "35", "product", "110000", "inactive"),
("5", "Eve", "29", "marketing", "85000", "active"),
("6", "Frank", "52", "engineering", "150000", "active"),
("7", "Grace", "38", "marketing", "95000", "active"),
("8", "Henry", "41", "engineering", "130000", "inactive"),
];
for (id, name, age, dept, salary, status) in employees {
let _: () = redis::cmd("HSET")
.arg(format!("employee:{}", id))
.arg("name")
.arg(name)
.arg("age")
.arg(age)
.arg("department")
.arg(dept)
.arg("salary")
.arg(salary)
.arg("status")
.arg(status)
.query(&mut conn)?;
}
// Drop existing index if it exists
let _: Result<(), redis::RedisError> = redis::cmd("FT.DROPINDEX")
.arg("employees_idx")
.query(&mut conn);
// Create RediSearch index
redis::cmd("FT.CREATE")
.arg("employees_idx")
.arg("ON")
.arg("HASH")
.arg("PREFIX")
.arg("1")
.arg("employee:")
.arg("SCHEMA")
.arg("name")
.arg("TEXT")
.arg("SORTABLE")
.arg("age")
.arg("NUMERIC")
.arg("SORTABLE")
.arg("department")
.arg("TAG")
.arg("salary")
.arg("NUMERIC")
.arg("SORTABLE")
.arg("status")
.arg("TAG")
.query(&mut conn)?;
// Wait briefly for index to be ready
std::thread::sleep(std::time::Duration::from_millis(100));
println!("Created 8 employees and RediSearch index\n");
Ok(())
}
Scan Hashes¶
Comprehensive example covering projection, batching, and row limits:
//! Example: Scanning Redis hashes with polars-redis.
//!
//! This example demonstrates how to use the Rust API directly to scan
//! Redis hashes and convert them to Arrow RecordBatches.
//!
//! Run with: cargo run --example scan_hashes
//!
//! Prerequisites:
//! - Redis running on localhost:6379
//! - Sample data loaded (run setup_sample_data.py first)
use polars_redis::{BatchConfig, HashBatchIterator, HashSchema, RedisType};
use std::env;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Allow override via environment variable
let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
println!("Connecting to: {}", url);
println!();
// =========================================================================
// Example 1: Basic scanning with full schema
// =========================================================================
println!("=== Example 1: Basic Hash Scanning ===\n");
// Define the schema for user hashes
// This tells the iterator what fields to expect and how to convert them
let schema = HashSchema::new(vec![
("name".to_string(), RedisType::Utf8),
("email".to_string(), RedisType::Utf8),
("age".to_string(), RedisType::Int64),
("score".to_string(), RedisType::Float64),
("active".to_string(), RedisType::Boolean),
])
.with_key(true) // Include the Redis key as a column
.with_key_column_name("_key"); // Name of the key column
// Configure the batch iterator
let config = BatchConfig::new("user:*") // Pattern to match keys
.with_batch_size(50) // Rows per Arrow batch
.with_count_hint(100); // SCAN COUNT hint to Redis
// Create the iterator
let mut iterator = HashBatchIterator::new(&url, schema, config, None)?;
let mut total_rows = 0;
let mut batch_count = 0;
// Iterate over batches
while let Some(batch) = iterator.next_batch()? {
batch_count += 1;
let num_rows = batch.num_rows();
total_rows += num_rows;
println!("Batch {}: {} rows", batch_count, num_rows);
// Print schema on first batch
if batch_count == 1 {
println!("\nSchema:");
for field in batch.schema().fields() {
println!(" {:<12} : {:?}", field.name(), field.data_type());
}
println!();
}
}
println!("Total: {} rows in {} batches\n", total_rows, batch_count);
// =========================================================================
// Example 2: Projection pushdown - only fetch specific fields
// =========================================================================
println!("=== Example 2: Projection Pushdown (name, age only) ===\n");
// Even though we define the full schema, we only request specific columns.
// The iterator will use HMGET instead of HGETALL, reducing data transfer.
let schema = HashSchema::new(vec![
("name".to_string(), RedisType::Utf8),
("email".to_string(), RedisType::Utf8),
("age".to_string(), RedisType::Int64),
("score".to_string(), RedisType::Float64),
("active".to_string(), RedisType::Boolean),
])
.with_key(false); // Don't include key this time
let config = BatchConfig::new("user:*").with_batch_size(100);
// Request only these columns - this triggers HMGET optimization
let projection = Some(vec!["name".to_string(), "age".to_string()]);
let mut iterator = HashBatchIterator::new(&url, schema, config, projection)?;
if let Some(batch) = iterator.next_batch()? {
println!("Projected columns (only these were fetched from Redis):");
for field in batch.schema().fields() {
println!(" {}", field.name());
}
println!("\nFirst batch: {} rows", batch.num_rows());
}
println!();
// =========================================================================
// Example 3: Row limit - stop early
// =========================================================================
println!("=== Example 3: Row Limit (max 10 rows) ===\n");
let schema = HashSchema::new(vec![("name".to_string(), RedisType::Utf8)]).with_key(true);
let config = BatchConfig::new("user:*")
.with_batch_size(100)
.with_max_rows(10); // Stop after 10 rows total
let mut iterator = HashBatchIterator::new(&url, schema, config, None)?;
let mut total = 0;
while let Some(batch) = iterator.next_batch()? {
total += batch.num_rows();
println!("Batch: {} rows (cumulative: {})", batch.num_rows(), total);
}
println!("\nTotal rows with limit: {} (requested max: 10)\n", total);
// =========================================================================
// Example 4: Different key patterns
// =========================================================================
println!("=== Example 4: Different Key Patterns ===\n");
let patterns = vec![
"user:*", // All users
"user:1*", // Users starting with 1
"session:*", // All sessions (may not exist)
];
for pattern in patterns {
let schema = HashSchema::new(vec![("name".to_string(), RedisType::Utf8)]);
let config = BatchConfig::new(pattern).with_batch_size(1000);
let mut iterator = HashBatchIterator::new(&url, schema.clone(), config, None)?;
let mut count = 0;
while let Some(batch) = iterator.next_batch()? {
count += batch.num_rows();
}
println!("Pattern '{}': {} keys found", pattern, count);
}
println!();
// =========================================================================
// Example 5: Serialize batch to Arrow IPC (for Python interop)
// =========================================================================
println!("=== Example 5: Arrow IPC Serialization ===\n");
let schema = HashSchema::new(vec![
("name".to_string(), RedisType::Utf8),
("age".to_string(), RedisType::Int64),
])
.with_key(true);
let config = BatchConfig::new("user:*")
.with_batch_size(10)
.with_max_rows(10);
let mut iterator = HashBatchIterator::new(&url, schema, config, None)?;
if let Some(batch) = iterator.next_batch()? {
// Serialize to Arrow IPC format (what we'd send to Python)
let ipc_bytes = polars_redis::batch_to_ipc(&batch)?;
println!(
"Serialized {} rows to {} bytes of Arrow IPC",
batch.num_rows(),
ipc_bytes.len()
);
// This is what Python would do:
// df = pl.read_ipc(io.BytesIO(ipc_bytes))
}
println!("\n=== All examples complete ===");
Ok(())
}
Scan JSON¶
Working with RedisJSON documents:
//! Example: Scanning Redis JSON documents with polars-redis.
//!
//! This example demonstrates how to use the Rust API directly to scan
//! RedisJSON documents and convert them to Arrow RecordBatches.
//!
//! Run with: cargo run --example scan_json
//!
//! Prerequisites:
//! - Redis 8+ running on localhost:6379 (has native JSON support)
//! - Sample data loaded (run setup_sample_data.py first)
use arrow::datatypes::DataType;
use polars_redis::{BatchConfig, JsonBatchIterator, JsonSchema};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = "redis://localhost:6379";
// Define the schema for product JSON documents
let schema = JsonSchema::new(vec![
("name".to_string(), DataType::Utf8),
("category".to_string(), DataType::Utf8),
("price".to_string(), DataType::Float64),
("quantity".to_string(), DataType::Int64),
("in_stock".to_string(), DataType::Boolean),
])
.with_key(true)
.with_key_column_name("_key");
// Configure the batch iterator
let config = BatchConfig::new("product:*")
.with_batch_size(10)
.with_count_hint(50);
// Create the iterator
let mut iterator = JsonBatchIterator::new(url, schema, config, None)?;
println!("=== Scanning Redis JSON Documents ===\n");
let mut total_rows = 0;
let mut batch_count = 0;
// Iterate over batches
while let Some(batch) = iterator.next_batch()? {
batch_count += 1;
let num_rows = batch.num_rows();
total_rows += num_rows;
println!("Batch {}: {} rows", batch_count, num_rows);
// Print schema on first batch
if batch_count == 1 {
println!("\nSchema:");
for field in batch.schema().fields() {
println!(" {} : {:?}", field.name(), field.data_type());
}
println!();
}
}
println!("\nTotal: {} rows in {} batches", total_rows, batch_count);
// Example with projection
println!("\n=== With Projection (name, price only) ===\n");
let schema = JsonSchema::new(vec![
("name".to_string(), DataType::Utf8),
("category".to_string(), DataType::Utf8),
("price".to_string(), DataType::Float64),
("quantity".to_string(), DataType::Int64),
("in_stock".to_string(), DataType::Boolean),
])
.with_key(false);
let config = BatchConfig::new("product:*").with_batch_size(50);
let projection = Some(vec!["name".to_string(), "price".to_string()]);
let mut iterator = JsonBatchIterator::new(url, schema, config, projection)?;
if let Some(batch) = iterator.next_batch()? {
println!("Projected columns:");
for field in batch.schema().fields() {
println!(" {}", field.name());
}
println!("\nFirst batch: {} rows", batch.num_rows());
}
// Example: Access column data
println!("\n=== Accessing Column Data ===\n");
let schema = JsonSchema::new(vec![
("name".to_string(), DataType::Utf8),
("price".to_string(), DataType::Float64),
])
.with_key(true);
let config = BatchConfig::new("product:*").with_batch_size(100);
let mut iterator = JsonBatchIterator::new(url, schema, config, None)?;
if let Some(batch) = iterator.next_batch()? {
// Access the price column
let price_col = batch
.column_by_name("price")
.expect("price column should exist");
let prices = price_col
.as_any()
.downcast_ref::<arrow::array::Float64Array>()
.expect("should be Float64Array");
let total: f64 = prices.iter().flatten().sum();
let count = prices.len();
println!("Products: {}", count);
println!("Total price sum: ${:.2}", total);
println!("Average price: ${:.2}", total / count as f64);
}
Ok(())
}
Scan Strings¶
Redis strings with different value types:
//! Example: Scanning Redis strings with polars-redis.
//!
//! This example demonstrates how to use the Rust API to scan
//! Redis string values and convert them to Arrow RecordBatches.
//!
//! Run with: cargo run --example scan_strings
//!
//! Prerequisites:
//! - Redis running on localhost:6379
//! - Sample data loaded (run setup_sample_data.py first)
use polars_redis::{BatchConfig, RedisType, StringBatchIterator, StringSchema};
use std::env;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
println!("Connecting to: {}", url);
println!();
// =========================================================================
// Example 1: Scan string values as UTF-8
// =========================================================================
println!("=== Example 1: Scan Strings as UTF-8 ===\n");
let schema = StringSchema::new(RedisType::Utf8)
.with_key(true)
.with_key_column_name("_key")
.with_value_column_name("value");
let config = BatchConfig::new("cache:*").with_batch_size(100);
let mut iterator = StringBatchIterator::new(&url, schema, config)?;
let mut total_rows = 0;
while let Some(batch) = iterator.next_batch()? {
total_rows += batch.num_rows();
if total_rows <= batch.num_rows() {
println!("Schema:");
for field in batch.schema().fields() {
println!(" {:<12} : {:?}", field.name(), field.data_type());
}
println!();
}
}
println!("Total cache entries: {}\n", total_rows);
// =========================================================================
// Example 2: Scan counters as integers
// =========================================================================
println!("=== Example 2: Scan Counters as Int64 ===\n");
let schema = StringSchema::new(RedisType::Int64)
.with_key(true)
.with_value_column_name("count");
let config = BatchConfig::new("counter:*").with_batch_size(100);
let mut iterator = StringBatchIterator::new(&url, schema, config)?;
let mut total: i64 = 0;
let mut count = 0;
while let Some(batch) = iterator.next_batch()? {
count += batch.num_rows();
// Sum the counter values
let value_col = batch
.column_by_name("count")
.expect("count column should exist");
let values = value_col
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
.expect("should be Int64Array");
total += values.iter().flatten().sum::<i64>();
}
println!("Counters found: {}", count);
println!("Sum of all counters: {}\n", total);
// =========================================================================
// Example 3: Scan float values
// =========================================================================
println!("=== Example 3: Scan Float Values ===\n");
let schema = StringSchema::new(RedisType::Float64)
.with_key(true)
.with_value_column_name("score");
let config = BatchConfig::new("score:*").with_batch_size(100);
let mut iterator = StringBatchIterator::new(&url, schema, config)?;
let mut sum: f64 = 0.0;
let mut count = 0;
while let Some(batch) = iterator.next_batch()? {
count += batch.num_rows();
let value_col = batch
.column_by_name("score")
.expect("score column should exist");
let values = value_col
.as_any()
.downcast_ref::<arrow::array::Float64Array>()
.expect("should be Float64Array");
sum += values.iter().flatten().sum::<f64>();
}
if count > 0 {
println!("Scores found: {}", count);
println!("Average score: {:.2}\n", sum / count as f64);
} else {
println!("No score keys found\n");
}
// =========================================================================
// Example 4: Without key column
// =========================================================================
println!("=== Example 4: Values Only (no key column) ===\n");
let schema = StringSchema::new(RedisType::Utf8)
.with_key(false)
.with_value_column_name("data");
let config = BatchConfig::new("cache:*")
.with_batch_size(10)
.with_max_rows(5);
let mut iterator = StringBatchIterator::new(&url, schema, config)?;
if let Some(batch) = iterator.next_batch()? {
println!("Columns (key excluded):");
for field in batch.schema().fields() {
println!(" {}", field.name());
}
println!("\nRows: {}", batch.num_rows());
}
println!("\n=== All examples complete ===");
Ok(())
}
Schema Inference¶
Automatic schema detection from existing data:
//! Example: Schema inference with polars-redis.
//!
//! This example demonstrates how to use schema inference to automatically
//! detect field names and types from existing Redis data.
//!
//! Run with: cargo run --example schema_inference
//!
//! Prerequisites:
//! - Redis running on localhost:6379
//! - Sample data loaded (run setup_sample_data.py first)
use polars_redis::io::infer::{infer_hash_schema, infer_json_schema};
use std::env;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
println!("Connecting to: {}", url);
println!();
// =========================================================================
// Example 1: Infer schema from Redis hashes
// =========================================================================
println!("=== Example 1: Infer Hash Schema ===\n");
let (fields, keys_sampled) = infer_hash_schema(&url, "user:*", 100, true)?;
println!("Sampled {} keys", keys_sampled);
println!("Inferred fields:");
for (name, type_str) in &fields {
println!(" {:<15} : {}", name, type_str);
}
println!();
// =========================================================================
// Example 2: Infer schema without type inference (all Utf8)
// =========================================================================
println!("=== Example 2: Infer Hash Schema (no type inference) ===\n");
let (fields, _) = infer_hash_schema(&url, "user:*", 50, false)?;
println!("Fields (all Utf8):");
for (name, type_str) in &fields {
println!(" {:<15} : {}", name, type_str);
}
println!();
// =========================================================================
// Example 3: Infer schema from RedisJSON documents
// =========================================================================
println!("=== Example 3: Infer JSON Schema ===\n");
let (fields, keys_sampled) = infer_json_schema(&url, "product:*", 100)?;
println!("Sampled {} keys", keys_sampled);
println!("Inferred fields:");
for (name, type_str) in &fields {
println!(" {:<15} : {}", name, type_str);
}
println!();
// =========================================================================
// Example 4: Use inferred schema to scan data
// =========================================================================
println!("=== Example 4: Scan Using Inferred Schema ===\n");
use polars_redis::{BatchConfig, HashBatchIterator, HashSchema, RedisType};
// First infer
let (fields, _) = infer_hash_schema(&url, "user:*", 50, true)?;
// Convert to HashSchema
let schema_fields: Vec<(String, RedisType)> = fields
.into_iter()
.map(|(name, type_str)| {
let redis_type = match type_str.as_str() {
"int64" => RedisType::Int64,
"float64" => RedisType::Float64,
"bool" => RedisType::Boolean,
"date" => RedisType::Date,
"datetime" => RedisType::Datetime,
_ => RedisType::Utf8,
};
(name, redis_type)
})
.collect();
let schema = HashSchema::new(schema_fields).with_key(true);
let config = BatchConfig::new("user:*")
.with_batch_size(100)
.with_max_rows(5);
let mut iterator = HashBatchIterator::new(&url, schema, config, None)?;
if let Some(batch) = iterator.next_batch()? {
println!("Scanned {} rows with inferred schema", batch.num_rows());
println!("Columns:");
for field in batch.schema().fields() {
println!(" {:<15} : {:?}", field.name(), field.data_type());
}
}
println!("\n=== All examples complete ===");
Ok(())
}
Write Hashes¶
Writing data to Redis as hashes:
//! Example: Writing hashes to Redis with polars-redis.
//!
//! This example demonstrates how to use the Rust API to write
//! data to Redis as hashes.
//!
//! Run with: cargo run --example write_hashes
//!
//! Prerequisites:
//! - Redis running on localhost:6379
use polars_redis::io::write::{WriteMode, write_hashes};
use std::env;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
println!("Connecting to: {}", url);
println!();
// =========================================================================
// Example 1: Basic hash writing
// =========================================================================
println!("=== Example 1: Basic Hash Writing ===\n");
let keys = vec![
"example:user:1".to_string(),
"example:user:2".to_string(),
"example:user:3".to_string(),
];
let fields = vec!["name".to_string(), "age".to_string(), "city".to_string()];
let values = vec![
vec![
Some("Alice".to_string()),
Some("30".to_string()),
Some("NYC".to_string()),
],
vec![
Some("Bob".to_string()),
Some("25".to_string()),
Some("LA".to_string()),
],
vec![
Some("Carol".to_string()),
Some("35".to_string()),
Some("Chicago".to_string()),
],
];
let result = write_hashes(&url, keys, fields, values, None, WriteMode::Replace)?;
println!("Keys written: {}", result.keys_written);
println!("Keys failed: {}", result.keys_failed);
println!("Keys skipped: {}", result.keys_skipped);
println!();
// =========================================================================
// Example 2: Writing with TTL
// =========================================================================
println!("=== Example 2: Writing with TTL ===\n");
let keys = vec![
"example:session:1".to_string(),
"example:session:2".to_string(),
];
let fields = vec!["user_id".to_string(), "token".to_string()];
let values = vec![
vec![Some("user:1".to_string()), Some("abc123".to_string())],
vec![Some("user:2".to_string()), Some("def456".to_string())],
];
// TTL of 60 seconds
let result = write_hashes(&url, keys, fields, values, Some(60), WriteMode::Replace)?;
println!("Keys written with 60s TTL: {}", result.keys_written);
println!();
// =========================================================================
// Example 3: Fail mode (skip existing keys)
// =========================================================================
println!("=== Example 3: Fail Mode (Skip Existing) ===\n");
let keys = vec![
"example:user:1".to_string(), // Already exists from Example 1
"example:user:4".to_string(), // New key
];
let fields = vec!["name".to_string(), "age".to_string()];
let values = vec![
vec![Some("Alice Updated".to_string()), Some("31".to_string())],
vec![Some("Dave".to_string()), Some("28".to_string())],
];
let result = write_hashes(&url, keys, fields, values, None, WriteMode::Fail)?;
println!("Keys written: {}", result.keys_written);
println!("Keys skipped (already existed): {}", result.keys_skipped);
println!();
// =========================================================================
// Example 4: Append mode (merge fields)
// =========================================================================
println!("=== Example 4: Append Mode (Merge Fields) ===\n");
let keys = vec!["example:user:1".to_string()];
// Add new field to existing hash
let fields = vec!["email".to_string()];
let values = vec![vec![Some("alice@example.com".to_string())]];
let result = write_hashes(&url, keys, fields, values, None, WriteMode::Append)?;
println!("Keys updated with new field: {}", result.keys_written);
println!();
// =========================================================================
// Example 5: Handling null values
// =========================================================================
println!("=== Example 5: Handling Null Values ===\n");
let keys = vec!["example:partial:1".to_string()];
let fields = vec![
"required".to_string(),
"optional".to_string(),
"another".to_string(),
];
// Some values are None - these fields will not be written
let values = vec![vec![
Some("present".to_string()),
None, // This field will be skipped
Some("also present".to_string()),
]];
let result = write_hashes(&url, keys, fields, values, None, WriteMode::Replace)?;
println!("Keys written: {}", result.keys_written);
println!("(null values are skipped, only non-null fields written)");
println!();
// Cleanup
println!("=== Cleanup ===\n");
cleanup_example_keys(&url)?;
println!("Example keys deleted");
println!("\n=== All examples complete ===");
Ok(())
}
fn cleanup_example_keys(url: &str) -> Result<(), Box<dyn std::error::Error>> {
let client = redis::Client::open(url)?;
let mut conn = client.get_connection()?;
let keys: Vec<String> = redis::cmd("KEYS")
.arg("example:*")
.query(&mut conn)
.unwrap_or_default();
if !keys.is_empty() {
redis::cmd("DEL").arg(&keys).query::<()>(&mut conn).ok();
}
Ok(())
}
Write JSON¶
Writing data to Redis as JSON documents:
//! Example: Writing JSON documents to Redis with polars-redis.
//!
//! This example demonstrates how to use the Rust API to write
//! data to Redis as JSON documents.
//!
//! Run with: cargo run --example write_json
//!
//! Prerequisites:
//! - Redis running on localhost:6379 with RedisJSON module
use polars_redis::io::write::{WriteMode, write_json};
use std::env;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
println!("Connecting to: {}", url);
println!();
// =========================================================================
// Example 1: Basic JSON writing
// =========================================================================
println!("=== Example 1: Basic JSON Writing ===\n");
let keys = vec![
"example:doc:1".to_string(),
"example:doc:2".to_string(),
"example:doc:3".to_string(),
];
let json_strings = vec![
r#"{"title": "Hello World", "views": 100, "published": true}"#.to_string(),
r#"{"title": "Getting Started", "views": 250, "published": true}"#.to_string(),
r#"{"title": "Draft Post", "views": 0, "published": false}"#.to_string(),
];
let result = write_json(&url, keys, json_strings, None, WriteMode::Replace)?;
println!("Keys written: {}", result.keys_written);
println!("Keys failed: {}", result.keys_failed);
println!("Keys skipped: {}", result.keys_skipped);
println!();
// =========================================================================
// Example 2: Writing with TTL
// =========================================================================
println!("=== Example 2: Writing with TTL ===\n");
let keys = vec!["example:cache:1".to_string(), "example:cache:2".to_string()];
let json_strings = vec![
r#"{"data": "cached value 1", "timestamp": 1704067200}"#.to_string(),
r#"{"data": "cached value 2", "timestamp": 1704067200}"#.to_string(),
];
// TTL of 300 seconds (5 minutes)
let result = write_json(&url, keys, json_strings, Some(300), WriteMode::Replace)?;
println!("Keys written with 5min TTL: {}", result.keys_written);
println!();
// =========================================================================
// Example 3: Fail mode (skip existing keys)
// =========================================================================
println!("=== Example 3: Fail Mode (Skip Existing) ===\n");
let keys = vec![
"example:doc:1".to_string(), // Already exists from Example 1
"example:doc:4".to_string(), // New key
];
let json_strings = vec![
r#"{"title": "Updated Title", "views": 999}"#.to_string(),
r#"{"title": "New Document", "views": 50}"#.to_string(),
];
let result = write_json(&url, keys, json_strings, None, WriteMode::Fail)?;
println!("Keys written: {}", result.keys_written);
println!("Keys skipped (already existed): {}", result.keys_skipped);
println!();
// =========================================================================
// Example 4: Nested JSON structures
// =========================================================================
println!("=== Example 4: Nested JSON Structures ===\n");
let keys = vec!["example:complex:1".to_string()];
let json_strings = vec![
r#"{
"user": {
"name": "Alice",
"email": "alice@example.com"
},
"tags": ["rust", "redis", "polars"],
"metadata": {
"created": "2024-01-15",
"version": 1
}
}"#
.to_string(),
];
let result = write_json(&url, keys, json_strings, None, WriteMode::Replace)?;
println!("Keys written: {}", result.keys_written);
println!("(nested structures are stored as-is)");
println!();
// =========================================================================
// Example 5: Programmatic JSON construction
// =========================================================================
println!("=== Example 5: Programmatic JSON Construction ===\n");
use std::collections::HashMap;
let mut data: Vec<(String, String)> = Vec::new();
for i in 1..=5 {
let key = format!("example:product:{}", i);
let mut product: HashMap<&str, serde_json::Value> = HashMap::new();
product.insert("id", serde_json::json!(i));
product.insert("name", serde_json::json!(format!("Product {}", i)));
product.insert("price", serde_json::json!(i as f64 * 9.99));
product.insert("in_stock", serde_json::json!(i % 2 == 0));
let json_str = serde_json::to_string(&product)?;
data.push((key, json_str));
}
let (keys, json_strings): (Vec<_>, Vec<_>) = data.into_iter().unzip();
let result = write_json(&url, keys, json_strings, None, WriteMode::Replace)?;
println!("Keys written: {}", result.keys_written);
println!();
// Cleanup
println!("=== Cleanup ===\n");
cleanup_example_keys(&url)?;
println!("Example keys deleted");
println!("\n=== All examples complete ===");
Ok(())
}
fn cleanup_example_keys(url: &str) -> Result<(), Box<dyn std::error::Error>> {
let client = redis::Client::open(url)?;
let mut conn = client.get_connection()?;
let keys: Vec<String> = redis::cmd("KEYS")
.arg("example:*")
.query(&mut conn)
.unwrap_or_default();
if !keys.is_empty() {
redis::cmd("DEL").arg(&keys).query::<()>(&mut conn).ok();
}
Ok(())
}
Write Strings¶
Writing data to Redis as strings:
//! Example: Writing strings to Redis with polars-redis.
//!
//! This example demonstrates how to use the Rust API to write
//! data to Redis as string values.
//!
//! Run with: cargo run --example write_strings
//!
//! Prerequisites:
//! - Redis running on localhost:6379
use polars_redis::io::write::{WriteMode, write_strings};
use std::env;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
println!("Connecting to: {}", url);
println!();
// =========================================================================
// Example 1: Basic string writing
// =========================================================================
println!("=== Example 1: Basic String Writing ===\n");
let keys = vec![
"example:counter:1".to_string(),
"example:counter:2".to_string(),
"example:counter:3".to_string(),
];
let values = vec![
Some("100".to_string()),
Some("200".to_string()),
Some("300".to_string()),
];
let result = write_strings(&url, keys, values, None, WriteMode::Replace)?;
println!("Keys written: {}", result.keys_written);
println!("Keys failed: {}", result.keys_failed);
println!("Keys skipped: {}", result.keys_skipped);
println!();
// =========================================================================
// Example 2: Writing with TTL
// =========================================================================
println!("=== Example 2: Writing with TTL ===\n");
let keys = vec!["example:temp:1".to_string(), "example:temp:2".to_string()];
let values = vec![
Some("temporary value 1".to_string()),
Some("temporary value 2".to_string()),
];
// TTL of 120 seconds (2 minutes)
let result = write_strings(&url, keys, values, Some(120), WriteMode::Replace)?;
println!("Keys written with 2min TTL: {}", result.keys_written);
println!();
// =========================================================================
// Example 3: Fail mode (skip existing keys)
// =========================================================================
println!("=== Example 3: Fail Mode (Skip Existing) ===\n");
let keys = vec![
"example:counter:1".to_string(), // Already exists from Example 1
"example:counter:4".to_string(), // New key
];
let values = vec![Some("999".to_string()), Some("400".to_string())];
let result = write_strings(&url, keys, values, None, WriteMode::Fail)?;
println!("Keys written: {}", result.keys_written);
println!("Keys skipped (already existed): {}", result.keys_skipped);
println!();
// =========================================================================
// Example 4: Handling null values
// =========================================================================
println!("=== Example 4: Handling Null Values ===\n");
let keys = vec![
"example:value:1".to_string(),
"example:value:2".to_string(),
"example:value:3".to_string(),
];
// Some values are None - these keys will not be written
let values = vec![
Some("present".to_string()),
None, // This key will be skipped
Some("also present".to_string()),
];
let result = write_strings(&url, keys, values, None, WriteMode::Replace)?;
println!("Keys written: {}", result.keys_written);
println!("(null values are skipped)");
println!();
// =========================================================================
// Example 5: Writing various data types as strings
// =========================================================================
println!("=== Example 5: Various Data Types as Strings ===\n");
let keys = vec![
"example:int".to_string(),
"example:float".to_string(),
"example:bool".to_string(),
"example:json".to_string(),
];
let values = vec![
Some("42".to_string()),
Some("3.14159".to_string()),
Some("true".to_string()),
Some(r#"{"nested": "data"}"#.to_string()),
];
let result = write_strings(&url, keys, values, None, WriteMode::Replace)?;
println!("Keys written: {}", result.keys_written);
println!("(all values stored as strings, can be parsed on read)");
println!();
// =========================================================================
// Example 6: Batch writing
// =========================================================================
println!("=== Example 6: Batch Writing ===\n");
let count = 100;
let keys: Vec<String> = (0..count).map(|i| format!("example:batch:{}", i)).collect();
let values: Vec<Option<String>> = (0..count).map(|i| Some(format!("value_{}", i))).collect();
let result = write_strings(&url, keys, values, None, WriteMode::Replace)?;
println!("Batch size: {}", count);
println!("Keys written: {}", result.keys_written);
println!("(writes are automatically pipelined for performance)");
println!();
// Cleanup
println!("=== Cleanup ===\n");
cleanup_example_keys(&url)?;
println!("Example keys deleted");
println!("\n=== All examples complete ===");
Ok(())
}
fn cleanup_example_keys(url: &str) -> Result<(), Box<dyn std::error::Error>> {
let client = redis::Client::open(url)?;
let mut conn = client.get_connection()?;
let keys: Vec<String> = redis::cmd("KEYS")
.arg("example:*")
.query(&mut conn)
.unwrap_or_default();
if !keys.is_empty() {
redis::cmd("DEL").arg(&keys).query::<()>(&mut conn).ok();
}
Ok(())
}