Incompatible API changes
Version 2.0.0 — Async runtime, unified query, new batch and error APIs
Version 2.0.0 is a major rewrite of the Rust client. All database operations are now async, the scan and query APIs are unified, batch operations support writes and deletes, and the error model has been replaced with a flat enum.
Cargo.toml dependency changes
Depending on how your application is configured, v2.0.0 uses either tokio or rt-async runtime crates for its asynchronous operation, and the futures crate for stream iteration.
Old:
[dependencies]aerospike = "1.3"New:
[dependencies]aerospike = "2.0.0"tokio = { version = "1", features = ["macros", "rt-multi-thread"] }futures = "0.3"📖 API Reference:
Client
All Client methods are now async
When using Tokio, every Client method that performs I/O is now an async fn and must use
.await inside a Tokio runtime. Standalone programs require
#[tokio::main].
Old:
fn main() { let client = Client::new(&ClientPolicy::default(), &hosts) .expect("Failed to connect"); client.put(&wpolicy, &key, &bins).unwrap(); let record = client.get(&rpolicy, &key, Bins::All).unwrap(); client.close().unwrap();}New:
#[tokio::main]async fn main() -> RResult<(), Box<dyn std::error::Error>> { let client = Client::new(&ClientPolicy::default(), &hosts).await?; client.put(&wpolicy, &key, &bins).await?; let record = client.get(&rpolicy, &key, Bins::All).await?; client.close().await?; Ok(())}📖 API Reference:
Client::new()|Client::close()
With async-std, the usage is:
async pub fn my_async_main() -> Result<X, Y> {// ...}
pub fn main() -> Result<X, Y> { task::block_on(my_async_main())}See the async-std documentation for more help.
Macro imports
The #[macro_use] extern crate aerospike pattern is no longer needed.
Import macros explicitly.
Old:
#[macro_use]extern crate aerospike;
use aerospike::*;New:
use aerospike::{as_key, as_bin, as_val, Client, ClientPolicy, ReadPolicy, WritePolicy};Error model: error-chain replaced by Error enum
The v1.3.0 error-chain pattern Error(ErrorKind::..., _) is replaced by a flat
aerospike::Error enum with named variants.
Old:
use aerospike::errors::{Error, ErrorKind};
match client.get(&rpolicy, &key, Bins::All) { Ok(record) => println!("{}", record), Err(Error(ErrorKind::ServerError(ResultCode::KeyNotFoundError), _)) => { println!("Not found"); } Err(err) => println!("Error: {}", err),}New:
use aerospike::{Error, ResultCode};
match client.get(&rpolicy, &key, Bins::All).await { Ok(record) => println!("{}", record), Err(Error::ServerError(ResultCode::KeyNotFoundError, ..)) => { println!("Not found"); } Err(err) => println!("Error: {}", err),}The following Error variants are available in v2.0.0:
| Variant | Description |
|---|---|
ServerError(ResultCode, bool, String) | Server returned an error result code |
Connection(String) | Cluster connection failure |
Timeout(String) | Client-side timeout |
InvalidArgument(String) | Invalid method argument |
InvalidNode(String) | Node not found or unavailable |
NoMoreConnections | Connection pool exhausted |
ClientError(String) | General client-side error |
Io(std::io::Error) | Underlying I/O error |
📖 API Reference:
Error|ResultCode
Scan and query unified into client.query()
client.scan() and ScanPolicy have been removed. Use client.query() with
a Statement that has no secondary index filter and PartitionFilter::all().
Old:
use aerospike::ScanPolicy;
let policy = ScanPolicy::default();match client.scan(&policy, "test", "demo", Bins::All) { Ok(records) => { for record in &*records { match record { Ok(rec) => println!("{}", rec), Err(err) => eprintln!("Error: {}", err), } } } Err(err) => eprintln!("Scan failed: {}", err),}New:
use aerospike::{QueryPolicy, Statement, Bins, PartitionFilter};use futures::stream::StreamExt;
let policy = QueryPolicy::default();let stmt = Statement::new("test", "demo", Bins::All);
let rs = client.query(&policy, PartitionFilter::all(), stmt).await?;let mut stream = rs.into_stream();
while let Some(result) = stream.next().await { match result { Ok(record) => println!("{}", record), Err(err) => eprintln!("Error: {}", err), }}The client.scan_node() and client.query_node() methods have also been
removed. Use PartitionFilter to target specific partitions instead.
📖 API Reference:
Client::query()|Statement::new()|PartitionFilter
Query signature change: PartitionFilter parameter added
client.query() now requires a PartitionFilter argument between the policy
and the statement. This enables partition-based pagination.
Old:
let rs = client.query(&QueryPolicy::default(), stmt)?;New:
let rs = client.query(&QueryPolicy::default(), PartitionFilter::all(), stmt).await?;📖 API Reference:
Client::query()|PartitionFilter::all()
Recordset iteration: for loop replaced by async stream
v1.3.0 returned Arc<Recordset> which implemented IntoIterator for synchronous
for loops. v2.0.0 requires converting the Recordset into an async
RecordStream and consuming it with StreamExt::next().
Old:
let records = client.query(&policy, stmt)?;for record in &*records { match record { Ok(rec) => println!("{}", rec), Err(err) => eprintln!("Error: {}", err), }}New:
use futures::stream::StreamExt;
let rs = client.query(&policy, PartitionFilter::all(), stmt).await?;let mut stream = rs.into_stream();
while let Some(result) = stream.next().await { match result { Ok(record) => println!("{}", record), Err(err) => eprintln!("Error: {}", err), }}📖 API Reference:
Recordset::into_stream()
Batch API: batch_get() replaced by batch()
The v1.3.0 read-only client.batch_get(Vec<BatchRead>) is replaced by the v2.0.0
client.batch(&BatchPolicy, &[BatchOperation]), which supports reads,
writes, deletes, and UDF calls in a single batch.
Old:
use aerospike::BatchRead;
let bins = Bins::from(["name", "age"]);let mut batch_reads = vec![];for i in 0..10 { let key = as_key!("test", "test", i); batch_reads.push(BatchRead::new(key, &bins));}
match client.batch_get(&BatchPolicy::default(), batch_reads) { Ok(results) => { for result in &results { match &result.record { Some(record) => println!("{:?} => {:?}", result.key, record.bins), None => println!("Not found: {:?}", result.key), } } } Err(err) => eprintln!("Batch error: {}", err),}New:
use aerospike::{BatchPolicy, BatchOperation, BatchReadPolicy, Bins};
let bins = Bins::from(["name", "age"]);let bpr = BatchReadPolicy::default();
let ops: Vec<BatchOperation> = (0..10) .map(|i| BatchOperation::read(&bpr, as_key!("test", "test", i), bins.clone())) .collect();
match client.batch(&BatchPolicy::default(), &ops).await { Ok(results) => { for result in &results { match &result.record { Some(record) => println!("{:?} => {:?}", result.key, record.bins), None => println!("Not found: {:?}", result.key), } } } Err(err) => eprintln!("Batch error: {}", err),}New batch operation types:
| v2.0.0 method | Description |
|---|---|
BatchOperation::read() | Read bins from a record |
BatchOperation::read_ops() | Read with specific operations |
BatchOperation::write() | Write operations to a record |
BatchOperation::delete() | Delete a record |
BatchOperation::udf() | Execute a UDF on a record |
Each operation type has its own policy:
BatchReadPolicy, BatchWritePolicy, BatchDeletePolicy, BatchUDFPolicy.
📖 API Reference:
Client::batch()|BatchOperation|BatchRecord
Bin passing: reference wrappers removed from put(), add(), append(), prepend()
v1.3.0 accepted bins by reference wrapper (&[A] where A: AsRef<Bin<'b>>). v2.0.0
accepts &[Bin] directly, so bins no longer need to be wrapped in an extra &.
Old:
let bin = as_bin!("name", "Alice");client.put(&wpolicy, &key, &vec![&bin])?;New:
let bin = as_bin!("name", "Alice");client.put(&wpolicy, &key, &[bin]).await?;📖 API Reference:
Client::put()
exists() now takes ReadPolicy instead of WritePolicy
Old:
let exists = client.exists(&WritePolicy::default(), &key)?;New:
let exists = client.exists(&ReadPolicy::default(), &key).await?;📖 API Reference:
Client::exists()
Index creation: create_index() replaced by create_index_on_bin()
The v1.3.0 methods create_index() and create_complex_index() are replaced by
create_index_on_bin(), which combines both and adds support for CDT context.
Old:
let task = client.create_index( &WritePolicy::default(), "test", "demo", "age", "idx_demo_age", IndexType::Numeric,)?;New:
use aerospike::{AdminPolicy, IndexType, CollectionIndexType};
let task = client.create_index_on_bin( &AdminPolicy::default(), "test", "demo", "age", "idx_demo_age", IndexType::Numeric, CollectionIndexType::Default, None,).await?;task.wait_till_complete(None).await?;v2.0.0 also adds create_index_using_expression() for expression-based secondary
indices.
📖 API Reference:
Client::create_index_on_bin()|Client::create_index_using_expression()
drop_index() now returns DropIndexTask
Old:
client.drop_index(&WritePolicy::default(), "test", "demo", "idx_demo_age")?;New:
let task = client.drop_index( &AdminPolicy::default(), "test", "demo", "idx_demo_age",).await?;📖 API Reference:
Client::drop_index()|DropIndexTask
Admin operations now take AdminPolicy
register_udf(), remove_udf(), truncate(), create_index_on_bin(), and
drop_index() now accept &AdminPolicy instead of &WritePolicy.
Old:
let task = client.register_udf( &WritePolicy::default(), code.as_bytes(), "example.lua", UDFLang::Lua,)?;New:
let task = client.register_udf( &AdminPolicy::default(), code.as_bytes(), "example.lua", UDFLang::Lua,).await?;task.wait_till_complete(None).await?;📖 API Reference:
Client::register_udf()|AdminPolicy
remove_udf() simplified and returns UdfRemoveTask
The language parameter has been removed, and the method now returns a task
that can be awaited.
Old:
client.remove_udf(&WritePolicy::default(), "example.lua", UDFLang::Lua)?;New:
let task = client.remove_udf(&AdminPolicy::default(), "example.lua").await?;📖 API Reference:
Client::remove_udf()|UdfRemoveTask
Thread-based concurrency replaced by async tasks
v1.3.0 used std::thread with Arc<Client>. v2.0.0 uses tokio::spawn with
Arc<Client>, since Client implements Send + Sync.
Old:
use std::sync::Arc;use std::thread;
let client = Arc::new(Client::new(&cpolicy, &hosts)?);
let mut threads = vec![];for i in 0..4 { let client = client.clone(); threads.push(thread::spawn(move || { client.put(&wpolicy, &key, &bins).unwrap(); }));}for t in threads { t.join().unwrap();}New:
use std::sync::Arc;
let client = Arc::new(Client::new(&cpolicy, &hosts).await?);
let mut handles = vec![];for i in 0..4 { let client = client.clone(); handles.push(tokio::spawn(async move { client.put(&wpolicy, &key, &bins).await.unwrap(); }));}futures::future::join_all(handles).await;📖 API Reference:
Client
TLS support added via rustls
v2.0.0 adds optional TLS support through the tls Cargo feature. Configure TLS
by setting ClientPolicy.tls_config to a rustls::ClientConfig.
[dependencies]aerospike = { version = "2.0.0", features = ["tls"] }rustls = "0.23"let mut policy = ClientPolicy::default();policy.tls_config = Some(rustls_client_config);
let client = Client::new(&policy, &"tls-host.example.com:4333").await?;📖 API Reference:
ClientPolicy
Summary of removed and renamed types
| v1.3.0 | v2.0.0 | Notes |
|---|---|---|
ScanPolicy | QueryPolicy | Scans use client.query() with no filter |
client.scan() | client.query() | Pass PartitionFilter::all() for full-set scan |
client.scan_node() | Removed | Use PartitionFilter to scope by partition |
client.query_node() | Removed | Use PartitionFilter to scope by partition |
client.batch_get() | client.batch() | Now accepts &[BatchOperation] |
BatchRead | BatchRecord (result) / BatchOperation (input) | Supports read, write, delete, UDF |
client.create_index() | client.create_index_on_bin() | Adds CollectionIndexType and ctx params |
client.create_complex_index() | client.create_index_on_bin() | Merged into single method |
errors::Error (struct) | Error (enum) | Flat enum, no error-chain |
errors::ErrorKind | Removed | Variants moved to Error enum directly |
for record in &*records | rs.into_stream() + stream.next().await | Requires futures::stream::StreamExt |
Summary of new types in v2.0.0
| Type | Description |
|---|---|
PartitionFilter | Partition-based cursor for query/scan pagination |
BatchOperation | Batch operation enum (read, write, delete, UDF) |
BatchReadPolicy | Per-operation policy for batch reads |
BatchWritePolicy | Per-operation policy for batch writes |
BatchDeletePolicy | Per-operation policy for batch deletes |
BatchUDFPolicy | Per-operation policy for batch UDF calls |
BatchRecord | Result record from a batch operation |
DropIndexTask | Async task for tracking index deletion |
UdfRemoveTask | Async task for tracking UDF removal |
QueryDuration | Hint for expected query duration |
ReadTouchTTL | Controls read-based TTL refresh behavior |
Recordset::into_stream() | Converts recordset to async RecordStream |
Version 1.0.0
Replace predicate expressions with Aerospike Expressions
The old predicate expression functionality has been replaced by Aerospike Expressions. Aerospike Expressions include all previous predicate expression functionality (with a different, improved syntax) plus support for record metadata expressions and list/map/bit/hyperloglog expression methods analogous to those used in operations.
This version of the client requires server version 5.2.0.4+. Code using old PredExp must be converted to the new syntax. Here is an example:
OLD:
let mut statement = Statement::new(namespace, &set_name, Bins::All);statement.add_predicate(as_predexp_integer_bin!("age".to_string()));statement.add_predicate(as_predexp_integer_value!(35));statement.add_predicate(as_predexp_integer_lesseq!());
let qpolicy = QueryPolicy::default();let results = client.query(&qpolicy, statement)?;📖 API Reference:
Statement::new|QueryPolicy::default|Client::query
NEW:
use aerospike::expressions as exp;
let mut qpolicy = QueryPolicy::default();qpolicy.filter_expression = Some( exp::le( exp::int_bin("age".to_string()), exp::int_val(35) ));
let statement = Statement::new(namespace, &set_name, Bins::All);let results = client.query(&qpolicy, PartitionFilter::all(), statement).await?;📖 API Reference:
Statement::new|QueryPolicy::default|Client::query