Secondary index
The Aerospike Rust client allows you to create and delete secondary indexes from the database.
Creating a secondary index
To create a secondary index, invoke either Client::create_index_on_bin() or Client::create_index_using_expression().
Secondary indexes are created asynchronously, so each method returns an IndexTask before the
index propagates to the cluster. Call IndexTask::wait_till_complete() to block until the index
is fully built.
The following example creates a numeric index idx_test_bar_baz in
namespace test within set bar and bin baz, then waits for it to be ready:
use std::time::Duration;use aerospike::Task;
match client .create_index_on_bin( &AdminPolicy::default(), "test", "bar", "baz", "idx_test_bar_baz", IndexType::Numeric, CollectionIndexType::Default, None, ) .await{ Ok(index_task) => { index_task .wait_till_complete(Some(Duration::from_secs(30))) .await?; println!("Index created."); } Err(err) => println!("Failed to create index: {}", err),}📖 API Reference:
Client::create_index_on_bin|IndexTask::wait_till_complete
Secondary indexes can only be created once on the server as a combination of
namespace, set, and bin name with either integer or string data types. For
example, if you define a secondary index to index bin x that contains integer
values, then only records containing bins named x with integer values are
indexed. Other records with a bin named x that contain non-integer values are
not indexed.
When an index management call is made to any node in the Aerospike Server cluster, the information automatically propagates to the remaining nodes.
Removing a secondary index
To remove a secondary index using Client::drop_index():
match client.drop_index(&AdminPolicy::default(), "test", "bar", "idx_test_bar_baz").await { Err(err) => println!("Couldn't drop index: {}", err), _ => {}}📖 API Reference:
Client::drop_index
Defining the query
Use query::Statement to define a query.
First, create an instance of Statement by specifying the namespace test and
set bar to query. Optionally, you can specify a list of bin names to
restrict which record bins are returned by the query. If you wish to return all
bins, use Bins::All.
use aerospike::Statement;
let mut stmt = Statement::new("test", "bar", Bins::Some(vec!["name".into(), "age".into()]));📖 API Reference:
Statement::new
Applying filters
To query a secondary index, specify a filter on the query. It appears that multiple filters are allowed, but the server currently restricts queries to a single filter.
Filters are creating using the macros defined in the aerospike.query module.
The following filters are currently supported:
as_eq!— Create equality filter for queries; supports integer and string values.as_range!— Create range filter for queries; supports integer values.as_contains!— Create contains number filter for queries on a collection index.as_contains_range!— Create contains range filter for queries on a collection index.as_within_region!— Create geospatial “points within region” filter for queries.as_within_radius!— Create geospatial “points within radius” filter for queries.as_regions_containing_point!— Create geospatial “regions containing point” filter for queries.
This example uses a numeric index on bin baz in namespace test within set bar to find all records using a filter with the range 0 to 100 inclusive:
stmt.add_filter(as_range!("baz", 0, 100));📖 API Reference:
Statement::add_filter
Executing the query
To execute the query, invoke Client::query:
pub async fn query( &self, policy: &QueryPolicy, partition_filter: PartitionFilter, statement: Statement, ) -> Result<Arc<Recordset>>📖 API Reference:
Client::query
Where,
policy— Query behavior definition.partition_filter— Query partition filter, which acts something like a cursor. For the purposes of this tutorial, we will set this toPartitionFilter::all().statement— Query to execute.
RecordSet allows you to iterate over the results of the query.
To execute the query and iterate over the results:
let qpolicy = QueryPolicy::default();let pf = PartitionFilter::all();match client.query(&qpolicy, pf, stmt).await { Ok(rs) => { let mut rs = rs.into_stream(); while let Some(res) = rs.next().await { match res { Ok(rec) => { // .. process record }
Err(err) => println!("Error fetching record: {}", err), } } }
Err(err) => println!("Error with query: {}", err),}📖 API Reference:
QueryPolicy::default|PartitionFilter::all|Client::query
Cursor-based pagination
// Paginate query results in pages of 100 records.let mut pf = PartitionFilter::all();let page_size: u64 = 100;
loop { let mut stmt = Statement::new("test", "bar", Bins::All); stmt.add_filter(as_range!("baz", 0, 100));
let mut qpolicy = QueryPolicy::default(); qpolicy.max_records = page_size;
let rs = client.query(&qpolicy, pf, stmt).await?; let mut stream = rs.into_stream(); let mut count: u64 = 0;
while let Some(result) = stream.next().await { match result { Ok(record) => count += 1, Err(err) => println!("Error: {err}"), } }
pf = stream.partition_filter().await .expect("partition filter should be available after query");
if pf.done() || count < page_size { break; }}Complete example
The following complete program connects to an Aerospike instance on localhost, writes sample records, creates a secondary index on bin baz, runs a query using that index, and then closes the client connection.
use std::time::Duration;
use aerospike::{ as_bin, as_key, as_range, as_val, AdminPolicy, Bins, Client, ClientPolicy, CollectionIndexType, IndexType, PartitionFilter, QueryPolicy, Statement, Task, WritePolicy,};use futures::stream::StreamExt;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { // Connect to localhost Aerospike. let client = Client::new(&ClientPolicy::default(), &"127.0.0.1:3000").await?; println!("Connected to Aerospike.");
// Write a few records so the query has data to return. for (id, baz_value) in [(1, 10), (2, 55), (3, 120)] { let key = as_key!("test", "bar", id); let bins = vec![ as_bin!("name", format!("item-{id}")), as_bin!("baz", baz_value), ]; client.put(&WritePolicy::default(), &key, &bins).await?; }
// Create a secondary index on test.bar.baz (ignore if it already exists). let index_name = "idx_test_bar_baz_complete_example"; match client .create_index_on_bin( &AdminPolicy::default(), "test", "bar", "baz", index_name, IndexType::Numeric, CollectionIndexType::Default, None, ) .await { Ok(index_task) => { index_task .wait_till_complete(Some(Duration::from_secs(30))) .await?; println!("Secondary index created: {index_name}"); } Err(e) => { println!("Index already exists or creation skipped: {e}"); } }
// Query the secondary index. let mut stmt = Statement::new("test", "bar", Bins::All); stmt.add_filter(as_range!("baz", 0, 100));
let rs = client .query(&QueryPolicy::default(), PartitionFilter::all(), stmt) .await?; let mut stream = rs.into_stream();
println!("Query results (baz between 0 and 100):"); while let Some(result) = stream.next().await { match result { Ok(record) => println!("{:?}", record.bins), Err(err) => eprintln!("Error fetching record: {err}"), } }
// Close the connection. client.close().await?; println!("Connection closed.");
Ok(())}📖 API Reference:
Client::new|Client::put|Client::create_index_on_bin|IndexTask::wait_till_complete|Statement::new|Statement::add_filter|PartitionFilter::all|Client::query|Client::close
Expected results
When run against a local Aerospike instance with namespace test and set bar, output should be similar to:
Connected to Aerospike.Secondary index created: idx_test_bar_baz_complete_exampleQuery results (baz between 0 and 100):{"name": String("item-1"), "baz": Int(10)}{"name": String("item-2"), "baz": Int(55)}Connection closed.