Skip to content

Secondary index

The Aerospike Rust client allows you to create and delete secondary indexes from the database.

Creating a secondary index

To create a secondary index, invoke either Client::create_index_on_bin() or Client::create_index_using_expression(). Secondary indexes are created asynchronously, so each method returns an IndexTask before the index propagates to the cluster. Call IndexTask::wait_till_complete() to block until the index is fully built.

The following example creates a numeric index idx_test_bar_baz in namespace test within set bar and bin baz, then waits for it to be ready:

use std::time::Duration;
use aerospike::Task;
match client
.create_index_on_bin(
&AdminPolicy::default(),
"test",
"bar",
"baz",
"idx_test_bar_baz",
IndexType::Numeric,
CollectionIndexType::Default,
None,
)
.await
{
Ok(index_task) => {
index_task
.wait_till_complete(Some(Duration::from_secs(30)))
.await?;
println!("Index created.");
}
Err(err) => println!("Failed to create index: {}", err),
}

📖 API Reference: Client::create_index_on_bin | IndexTask::wait_till_complete

Secondary indexes can only be created once on the server as a combination of namespace, set, and bin name with either integer or string data types. For example, if you define a secondary index to index bin x that contains integer values, then only records containing bins named x with integer values are indexed. Other records with a bin named x that contain non-integer values are not indexed.

When an index management call is made to any node in the Aerospike Server cluster, the information automatically propagates to the remaining nodes.

Removing a secondary index

To remove a secondary index using Client::drop_index():

match client.drop_index(&AdminPolicy::default(), "test", "bar", "idx_test_bar_baz").await {
Err(err) => println!("Couldn't drop index: {}", err),
_ => {}
}

📖 API Reference: Client::drop_index

Defining the query

Use query::Statement to define a query.

First, create an instance of Statement by specifying the namespace test and set bar to query. Optionally, you can specify a list of bin names to restrict which record bins are returned by the query. If you wish to return all bins, use Bins::All.

use aerospike::Statement;
let mut stmt = Statement::new("test", "bar", Bins::Some(vec!["name".into(), "age".into()]));

📖 API Reference: Statement::new

Applying filters

To query a secondary index, specify a filter on the query. It appears that multiple filters are allowed, but the server currently restricts queries to a single filter.

Filters are creating using the macros defined in the aerospike.query module. The following filters are currently supported:

  • as_eq! — Create equality filter for queries; supports integer and string values.
  • as_range! — Create range filter for queries; supports integer values.
  • as_contains! — Create contains number filter for queries on a collection index.
  • as_contains_range! — Create contains range filter for queries on a collection index.
  • as_within_region! — Create geospatial “points within region” filter for queries.
  • as_within_radius! — Create geospatial “points within radius” filter for queries.
  • as_regions_containing_point! — Create geospatial “regions containing point” filter for queries.

This example uses a numeric index on bin baz in namespace test within set bar to find all records using a filter with the range 0 to 100 inclusive:

stmt.add_filter(as_range!("baz", 0, 100));

📖 API Reference: Statement::add_filter

Executing the query

To execute the query, invoke Client::query:

pub async fn query(
&self,
policy: &QueryPolicy,
partition_filter: PartitionFilter,
statement: Statement,
) -> Result<Arc<Recordset>>

📖 API Reference: Client::query

Where,

  • policy — Query behavior definition.
  • partition_filter — Query partition filter, which acts something like a cursor. For the purposes of this tutorial, we will set this to PartitionFilter::all().
  • statement — Query to execute.

RecordSet allows you to iterate over the results of the query.

To execute the query and iterate over the results:

let qpolicy = QueryPolicy::default();
let pf = PartitionFilter::all();
match client.query(&qpolicy, pf, stmt).await {
Ok(rs) => {
let mut rs = rs.into_stream();
while let Some(res) = rs.next().await {
match res {
Ok(rec) => {
// .. process record
}
Err(err) => println!("Error fetching record: {}", err),
}
}
}
Err(err) => println!("Error with query: {}", err),
}

📖 API Reference: QueryPolicy::default | PartitionFilter::all | Client::query

Cursor-based pagination

// Paginate query results in pages of 100 records.
let mut pf = PartitionFilter::all();
let page_size: u64 = 100;
loop {
let mut stmt = Statement::new("test", "bar", Bins::All);
stmt.add_filter(as_range!("baz", 0, 100));
let mut qpolicy = QueryPolicy::default();
qpolicy.max_records = page_size;
let rs = client.query(&qpolicy, pf, stmt).await?;
let mut stream = rs.into_stream();
let mut count: u64 = 0;
while let Some(result) = stream.next().await {
match result {
Ok(record) => count += 1,
Err(err) => println!("Error: {err}"),
}
}
pf = stream.partition_filter().await
.expect("partition filter should be available after query");
if pf.done() || count < page_size {
break;
}
}

Complete example

The following complete program connects to an Aerospike instance on localhost, writes sample records, creates a secondary index on bin baz, runs a query using that index, and then closes the client connection.

use std::time::Duration;
use aerospike::{
as_bin, as_key, as_range, as_val,
AdminPolicy, Bins, Client, ClientPolicy,
CollectionIndexType, IndexType,
PartitionFilter, QueryPolicy, Statement,
Task, WritePolicy,
};
use futures::stream::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to localhost Aerospike.
let client = Client::new(&ClientPolicy::default(), &"127.0.0.1:3000").await?;
println!("Connected to Aerospike.");
// Write a few records so the query has data to return.
for (id, baz_value) in [(1, 10), (2, 55), (3, 120)] {
let key = as_key!("test", "bar", id);
let bins = vec![
as_bin!("name", format!("item-{id}")),
as_bin!("baz", baz_value),
];
client.put(&WritePolicy::default(), &key, &bins).await?;
}
// Create a secondary index on test.bar.baz (ignore if it already exists).
let index_name = "idx_test_bar_baz_complete_example";
match client
.create_index_on_bin(
&AdminPolicy::default(),
"test",
"bar",
"baz",
index_name,
IndexType::Numeric,
CollectionIndexType::Default,
None,
)
.await
{
Ok(index_task) => {
index_task
.wait_till_complete(Some(Duration::from_secs(30)))
.await?;
println!("Secondary index created: {index_name}");
}
Err(e) => {
println!("Index already exists or creation skipped: {e}");
}
}
// Query the secondary index.
let mut stmt = Statement::new("test", "bar", Bins::All);
stmt.add_filter(as_range!("baz", 0, 100));
let rs = client
.query(&QueryPolicy::default(), PartitionFilter::all(), stmt)
.await?;
let mut stream = rs.into_stream();
println!("Query results (baz between 0 and 100):");
while let Some(result) = stream.next().await {
match result {
Ok(record) => println!("{:?}", record.bins),
Err(err) => eprintln!("Error fetching record: {err}"),
}
}
// Close the connection.
client.close().await?;
println!("Connection closed.");
Ok(())
}

📖 API Reference: Client::new | Client::put | Client::create_index_on_bin | IndexTask::wait_till_complete | Statement::new | Statement::add_filter | PartitionFilter::all | Client::query | Client::close

Expected results

When run against a local Aerospike instance with namespace test and set bar, output should be similar to:

Connected to Aerospike.
Secondary index created: idx_test_bar_baz_complete_example
Query results (baz between 0 and 100):
{"name": String("item-1"), "baz": Int(10)}
{"name": String("item-2"), "baz": Int(55)}
Connection closed.
Feedback

Was this page helpful?

What type of feedback are you giving?

What would you like us to know?

+Capture screenshot

Can we reach out to you?