diff --git a/Cargo.toml b/Cargo.toml index c2989f8..6621912 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmqtt-storage" -version = "0.6.0" +version = "0.7.3" authors = ["rmqtt "] edition = "2021" license = "MIT OR Apache-2.0" @@ -8,23 +8,29 @@ repository = "https://github.com/rmqtt/rmqtt-storage" homepage = "https://github.com/rmqtt/rmqtt-storage" description = "rmqtt-storage - Is a simple wrapper around some key-value storages" keywords = ["storage", "async"] -categories = ["Database interfaces"] +categories = ["database"] exclude = ["examples", ".gitignore", ".cargo/config"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[package.metadata.docs.rs] +all-features = true + [features] default = [] ttl = [] map_len = [] len = [] +sled = ["dep:sled"] +redis = ["dep:redis"] +redis-cluster = ["redis", "redis/cluster", "redis/cluster-async"] [dependencies] -sled = "0.34" -tokio = { version = "1", features = ["sync", "rt"] } -redis = { version = "0.27", features = [ "tokio-comp", "connection-manager", "cluster", "cluster-async" ] } +sled = { version = "0.34", optional = true } +redis = { version = "0.32", features = [ "tokio-comp", "safe_iterators", "connection-manager"], optional = true } +tokio = { version = "1", features = ["sync", "rt"] } futures = "0.3" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -39,7 +45,7 @@ convert = { package = "box-convert", version = "0.1", features = ["bytesize"] } [dev-dependencies] tokio = { version = "1", features = ["sync", "time", "macros", "rt", "rt-multi-thread"] } - +cfg-if = "1" diff --git a/README.md b/README.md index 143a807..7f2e218 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Add this to your `Cargo.toml`: ```toml [dependencies] -rmqtt-storage = "0.6" +rmqtt-storage = "0.7" ``` ## Features diff --git a/src/lib.rs b/src/lib.rs index 5c9d6b0..28e6832 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,79 +1,114 @@ -#[macro_use] -extern crate serde; +//! Provides a unified storage abstraction with multiple backend implementations (sled, redis, redis-cluster). +//! +//! This module defines generic storage interfaces (`StorageDB`, `Map`, `List`) and implements them +//! for different storage backends. It includes configuration handling, initialization functions, +//! and common storage operations with support for expiration and batch operations. -use anyhow::Error; -use serde::de; +#![deny(unsafe_code)] +#[allow(unused_imports)] +use serde::{de, Deserialize, Serialize}; + +// Conditionally include storage modules based on enabled features +#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))] mod storage; +#[cfg(feature = "redis")] mod storage_redis; +#[cfg(feature = "redis-cluster")] mod storage_redis_cluster; +#[cfg(feature = "sled")] mod storage_sled; +// Re-export public storage interfaces and implementations +#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))] pub use storage::{DefaultStorageDB, List, Map, StorageDB, StorageList, StorageMap}; +#[cfg(feature = "redis")] pub use storage_redis::{RedisConfig, RedisStorageDB}; +#[cfg(feature = "redis-cluster")] pub use storage_redis_cluster::{ RedisConfig as RedisClusterConfig, RedisStorageDB as RedisClusterStorageDB, }; +#[cfg(feature = "sled")] pub use storage_sled::{SledConfig, SledStorageDB}; +/// Custom result type for storage operations pub type Result = anyhow::Result; +/// Initializes the database based on provided configuration +/// +/// # Arguments +/// * `cfg` - Storage configuration specifying backend type and parameters +/// +/// # Returns +/// Instance of `DefaultStorageDB` configured with the selected backend +#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))] pub async fn init_db(cfg: &Config) -> Result { match cfg.typ { + #[cfg(feature = "sled")] StorageType::Sled => { let db = SledStorageDB::new(cfg.sled.clone()).await?; Ok(DefaultStorageDB::Sled(db)) } + #[cfg(feature = "redis")] StorageType::Redis => { let db = RedisStorageDB::new(cfg.redis.clone()).await?; Ok(DefaultStorageDB::Redis(db)) } + #[cfg(feature = "redis-cluster")] StorageType::RedisCluster => { let db = RedisClusterStorageDB::new(cfg.redis_cluster.clone()).await?; Ok(DefaultStorageDB::RedisCluster(db)) } } } + +/// Configuration structure for storage system +/// +/// Contains backend-specific configurations and is conditionally compiled +/// based on enabled storage features. #[derive(Debug, Clone, Deserialize, Serialize)] +#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))] pub struct Config { - #[serde(default = "Config::storage_type_default")] + /// Storage backend type (Sled, Redis, or RedisCluster) + // #[serde(default = "Config::storage_type_default")] #[serde(alias = "type")] pub typ: StorageType, + + /// Configuration for Sled backend (feature-gated) #[serde(default)] + #[cfg(feature = "sled")] pub sled: SledConfig, + + /// Configuration for Redis backend (feature-gated) #[serde(default)] + #[cfg(feature = "redis")] pub redis: RedisConfig, + + /// Configuration for Redis Cluster backend (feature-gated) #[serde(default, rename = "redis-cluster")] + #[cfg(feature = "redis-cluster")] pub redis_cluster: RedisClusterConfig, } -impl Default for Config { - fn default() -> Self { - Config { - typ: Config::storage_type_default(), - sled: SledConfig::default(), - redis: RedisConfig::default(), - redis_cluster: RedisClusterConfig::default(), - } - } -} - -impl Config { - fn storage_type_default() -> StorageType { - StorageType::Sled - } -} - +/// Enum representing available storage backend types +/// +/// Variants are conditionally included based on enabled features #[derive(Debug, Clone, Serialize)] +#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))] pub enum StorageType { - //sled: high-performance embedded database with BTreeMap-like API for stateful systems. + /// Embedded database with BTreeMap-like API + #[cfg(feature = "sled")] Sled, - //redis: + /// Single-node Redis storage + #[cfg(feature = "redis")] Redis, - //redis cluster: + /// Redis Cluster distributed storage + #[cfg(feature = "redis-cluster")] RedisCluster, } +/// Deserialization implementation for StorageType +#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))] impl<'de> de::Deserialize<'de> for StorageType { #[inline] fn deserialize(deserializer: D) -> core::result::Result @@ -84,18 +119,29 @@ impl<'de> de::Deserialize<'de> for StorageType { .to_ascii_lowercase() .as_str() { + #[cfg(feature = "sled")] "sled" => StorageType::Sled, + #[cfg(feature = "redis")] "redis" => StorageType::Redis, + #[cfg(feature = "redis-cluster")] "redis-cluster" => StorageType::RedisCluster, - _ => StorageType::Sled, + _ => { + return Err(de::Error::custom( + "invalid storage type, expected one of: 'sled', 'redis', 'redis-cluster'", + )) + } }; Ok(t) } } +/// Timestamp type in milliseconds #[allow(dead_code)] pub(crate) type TimestampMillis = i64; +/// Gets current timestamp in milliseconds +/// +/// Uses system time if available, falls back to chrono if system time is before UNIX_EPOCH #[allow(dead_code)] #[inline] pub(crate) fn timestamp_millis() -> TimestampMillis { @@ -110,6 +156,7 @@ pub(crate) fn timestamp_millis() -> TimestampMillis { } #[cfg(test)] +#[cfg(any(feature = "redis", feature = "redis-cluster", feature = "sled"))] mod tests { use super::storage::*; use super::*; @@ -119,16 +166,31 @@ mod tests { fn get_cfg(name: &str) -> Config { let cfg = Config { - typ: StorageType::RedisCluster, + typ: { + cfg_if::cfg_if! { + if #[cfg(feature = "sled")] { + StorageType::Sled + } else if #[cfg(feature = "redis-cluster")] { + StorageType::RedisCluster + } else if #[cfg(feature = "redis")] { + StorageType::Redis + } else { + compile_error!("No storage backend feature enabled!"); + } + } + }, + #[cfg(feature = "sled")] sled: SledConfig { path: format!("./.catch/{}", name), cleanup_f: |_db| {}, ..Default::default() }, + #[cfg(feature = "redis")] redis: RedisConfig { url: "redis://127.0.0.1:6379/".into(), prefix: name.to_owned(), }, + #[cfg(feature = "redis-cluster")] redis_cluster: RedisClusterConfig { urls: [ "redis://127.0.0.1:6380/".into(), @@ -1662,6 +1724,7 @@ mod tests { #[tokio::main] #[allow(dead_code)] + #[cfg(feature = "sled")] // #[test] async fn test_map_expire_list() { use super::{SledStorageDB, StorageDB}; @@ -1707,10 +1770,12 @@ mod tests { }, ..Default::default() }, + #[cfg(feature = "redis")] redis: RedisConfig { url: "redis://127.0.0.1:6379/".into(), prefix: "map_expire_list".to_owned(), }, + #[cfg(feature = "redis-cluster")] redis_cluster: RedisClusterConfig { urls: [ "redis://127.0.0.1:6380/".into(), diff --git a/src/storage.rs b/src/storage.rs index 4ddc232..d535a9c 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,35 +1,67 @@ +//! Abstract storage layer with support for multiple backends (sled, Redis, Redis Cluster) +//! +//! Defines core storage traits and unified interfaces for: +//! - Key-value storage (StorageDB) +//! - Map structures (Map) +//! - List structures (List) +//! +//! Provides backend-agnostic enums (DefaultStorageDB, StorageMap, StorageList) +//! that dispatch operations to concrete implementations based on enabled features. + use core::fmt; use async_trait::async_trait; use serde::de::DeserializeOwned; use serde::Serialize; +#[cfg(feature = "redis")] use crate::storage_redis::{RedisStorageDB, RedisStorageList, RedisStorageMap}; +#[cfg(feature = "redis-cluster")] use crate::storage_redis_cluster::{ RedisStorageDB as RedisClusterStorageDB, RedisStorageList as RedisClusterStorageList, RedisStorageMap as RedisClusterStorageMap, }; -use crate::storage_sled::SledStorageDB; -use crate::storage_sled::{SledStorageList, SledStorageMap}; +#[cfg(feature = "sled")] +use crate::storage_sled::{SledStorageDB, SledStorageList, SledStorageMap}; use crate::Result; #[allow(unused_imports)] use crate::TimestampMillis; +#[allow(unused)] +pub(crate) const SEPARATOR: &[u8] = b"@"; +#[allow(unused)] +pub(crate) const KEY_PREFIX: &[u8] = b"__rmqtt@"; +#[allow(unused)] +pub(crate) const KEY_PREFIX_LEN: &[u8] = b"__rmqtt_len@"; +#[allow(unused)] +pub(crate) const MAP_NAME_PREFIX: &[u8] = b"__rmqtt_map@"; +#[allow(unused)] +pub(crate) const LIST_NAME_PREFIX: &[u8] = b"__rmqtt_list@"; + +/// Type alias for storage keys pub type Key = Vec; +/// Result type for iteration items (key-value pair) pub type IterItem = Result<(Key, V)>; +/// Asynchronous iterator trait for storage operations #[async_trait] pub trait AsyncIterator { type Item; + + /// Fetches the next item from the iterator async fn next(&mut self) -> Option; } +/// Trait for splitting byte slices (used in sled backend) +#[cfg(feature = "sled")] pub trait SplitSubslice { + /// Splits slice at the first occurrence of given subslice fn split_subslice(&self, subslice: &[u8]) -> Option<(&[u8], &[u8])>; } +#[cfg(feature = "sled")] impl SplitSubslice for [u8] { fn split_subslice(&self, subslice: &[u8]) -> Option<(&[u8], &[u8])> { self.windows(subslice.len()) @@ -38,103 +70,130 @@ impl SplitSubslice for [u8] { } } +/// Core storage database operations #[async_trait] #[allow(clippy::len_without_is_empty)] pub trait StorageDB: Send + Sync { + /// Concrete Map type for this storage type MapType: Map; + + /// Concrete List type for this storage type ListType: List; + /// Creates or accesses a named map async fn map + Sync + Send>( &self, name: N, expire: Option, ) -> Result; + /// Removes an entire map async fn map_remove(&self, name: K) -> Result<()> where K: AsRef<[u8]> + Sync + Send; + /// Checks if a map exists async fn map_contains_key + Sync + Send>(&self, key: K) -> Result; + /// Creates or accesses a named list async fn list + Sync + Send>( &self, name: V, expire: Option, ) -> Result; + /// Removes an entire list async fn list_remove(&self, name: K) -> Result<()> where K: AsRef<[u8]> + Sync + Send; + /// Checks if a list exists async fn list_contains_key + Sync + Send>(&self, key: K) -> Result; + /// Inserts a key-value pair async fn insert(&self, key: K, val: &V) -> Result<()> where K: AsRef<[u8]> + Sync + Send, V: serde::ser::Serialize + Sync + Send; + /// Retrieves a value by key async fn get(&self, key: K) -> Result> where K: AsRef<[u8]> + Sync + Send, V: DeserializeOwned + Sync + Send; + /// Removes a key-value pair async fn remove(&self, key: K) -> Result<()> where K: AsRef<[u8]> + Sync + Send; + /// Batch insert of multiple key-value pairs async fn batch_insert(&self, key_vals: Vec<(Key, V)>) -> Result<()> where V: serde::ser::Serialize + Sync + Send; + /// Batch removal of keys async fn batch_remove(&self, keys: Vec) -> Result<()>; + /// Increments a counter value async fn counter_incr(&self, key: K, increment: isize) -> Result<()> where K: AsRef<[u8]> + Sync + Send; + /// Decrements a counter value async fn counter_decr(&self, key: K, increment: isize) -> Result<()> where K: AsRef<[u8]> + Sync + Send; + /// Gets current counter value async fn counter_get(&self, key: K) -> Result> where K: AsRef<[u8]> + Sync + Send; + /// Sets counter to specific value async fn counter_set(&self, key: K, val: isize) -> Result<()> where K: AsRef<[u8]> + Sync + Send; + /// Checks if key exists async fn contains_key + Sync + Send>(&self, key: K) -> Result; + /// Gets number of items in storage (requires "len" feature) #[cfg(feature = "len")] async fn len(&self) -> Result; + /// Gets total storage size in bytes async fn db_size(&self) -> Result; + /// Sets expiration timestamp for a key (requires "ttl" feature) #[cfg(feature = "ttl")] async fn expire_at(&self, key: K, at: TimestampMillis) -> Result where K: AsRef<[u8]> + Sync + Send; + /// Sets expiration duration for a key (requires "ttl" feature) #[cfg(feature = "ttl")] async fn expire(&self, key: K, dur: TimestampMillis) -> Result where K: AsRef<[u8]> + Sync + Send; + /// Gets remaining time-to-live for a key (requires "ttl" feature) #[cfg(feature = "ttl")] async fn ttl(&self, key: K) -> Result> where K: AsRef<[u8]> + Sync + Send; + /// Iterates over all maps in storage async fn map_iter<'a>( &'a mut self, ) -> Result> + Send + 'a>>; + /// Iterates over all lists in storage async fn list_iter<'a>( &'a mut self, ) -> Result> + Send + 'a>>; - //pattern - * or ? + /// Scans keys matching pattern (supports * and ? wildcards) async fn scan<'a, P>( &'a mut self, pattern: P, @@ -142,61 +201,78 @@ pub trait StorageDB: Send + Sync { where P: AsRef<[u8]> + Send + Sync; + /// Gets storage backend information async fn info(&self) -> Result; } +/// Map (dictionary) storage operations #[async_trait] pub trait Map: Sync + Send { + /// Gets the name of this map fn name(&self) -> &[u8]; + /// Inserts a key-value pair into the map async fn insert(&self, key: K, val: &V) -> Result<()> where K: AsRef<[u8]> + Sync + Send, V: serde::ser::Serialize + Sync + Send + ?Sized; + /// Retrieves a value from the map async fn get(&self, key: K) -> Result> where K: AsRef<[u8]> + Sync + Send, V: DeserializeOwned + Sync + Send; + /// Removes a key from the map async fn remove(&self, key: K) -> Result<()> where K: AsRef<[u8]> + Sync + Send; + /// Checks if key exists in the map async fn contains_key + Sync + Send>(&self, key: K) -> Result; + /// Gets number of items in map (requires "map_len" feature) #[cfg(feature = "map_len")] async fn len(&self) -> Result; + /// Checks if map is empty async fn is_empty(&self) -> Result; + /// Clears all entries in the map async fn clear(&self) -> Result<()>; + /// Removes a key and returns its value async fn remove_and_fetch(&self, key: K) -> Result> where K: AsRef<[u8]> + Sync + Send, V: DeserializeOwned + Sync + Send; + /// Removes all keys with given prefix async fn remove_with_prefix(&self, prefix: K) -> Result<()> where K: AsRef<[u8]> + Sync + Send; + /// Batch insert of key-value pairs async fn batch_insert(&self, key_vals: Vec<(Key, V)>) -> Result<()> where V: serde::ser::Serialize + Sync + Send; + /// Batch removal of keys async fn batch_remove(&self, keys: Vec) -> Result<()>; + /// Iterates over all key-value pairs async fn iter<'a, V>( &'a mut self, ) -> Result> + Send + 'a>> where V: DeserializeOwned + Sync + Send + 'a + 'static; + /// Iterates over all keys async fn key_iter<'a>( &'a mut self, ) -> Result> + Send + 'a>>; + /// Iterates over key-value pairs with given prefix async fn prefix_iter<'a, P, V>( &'a mut self, prefix: P, @@ -205,28 +281,36 @@ pub trait Map: Sync + Send { P: AsRef<[u8]> + Send + Sync, V: DeserializeOwned + Sync + Send + 'a + 'static; + /// Sets expiration timestamp for the entire map (requires "ttl" feature) #[cfg(feature = "ttl")] async fn expire_at(&self, at: TimestampMillis) -> Result; + /// Sets expiration duration for the entire map (requires "ttl" feature) #[cfg(feature = "ttl")] async fn expire(&self, dur: TimestampMillis) -> Result; + /// Gets remaining time-to-live for the map (requires "ttl" feature) #[cfg(feature = "ttl")] async fn ttl(&self) -> Result>; } +/// List storage operations #[async_trait] pub trait List: Sync + Send { + /// Gets the name of this list fn name(&self) -> &[u8]; + /// Appends a value to the end of the list async fn push(&self, val: &V) -> Result<()> where V: serde::ser::Serialize + Sync + Send; + /// Appends multiple values to the list async fn pushs(&self, vals: Vec) -> Result<()> where V: serde::ser::Serialize + Sync + Send; + /// Pushes with size limit and optional pop-front behavior async fn push_limit( &self, val: &V, @@ -237,48 +321,66 @@ pub trait List: Sync + Send { V: serde::ser::Serialize + Sync + Send, V: DeserializeOwned; + /// Removes and returns the first value in the list async fn pop(&self) -> Result> where V: DeserializeOwned + Sync + Send; + /// Retrieves all values in the list async fn all(&self) -> Result> where V: DeserializeOwned + Sync + Send; + /// Gets value by index async fn get_index(&self, idx: usize) -> Result> where V: DeserializeOwned + Sync + Send; + /// Gets number of items in the list async fn len(&self) -> Result; + /// Checks if list is empty async fn is_empty(&self) -> Result; + /// Clears all items from the list async fn clear(&self) -> Result<()>; + /// Iterates over all values async fn iter<'a, V>( &'a mut self, ) -> Result> + Send + 'a>> where V: DeserializeOwned + Sync + Send + 'a + 'static; + /// Sets expiration timestamp for the entire list (requires "ttl" feature) #[cfg(feature = "ttl")] async fn expire_at(&self, at: TimestampMillis) -> Result; + /// Sets expiration duration for the entire list (requires "ttl" feature) #[cfg(feature = "ttl")] async fn expire(&self, dur: TimestampMillis) -> Result; + /// Gets remaining time-to-live for the list (requires "ttl" feature) #[cfg(feature = "ttl")] async fn ttl(&self) -> Result>; } +/// Unified storage backend enum (dispatches to concrete implementations) #[derive(Clone)] pub enum DefaultStorageDB { + #[cfg(feature = "sled")] + /// Sled database backend Sled(SledStorageDB), + #[cfg(feature = "redis")] + /// Redis backend Redis(RedisStorageDB), + #[cfg(feature = "redis-cluster")] + /// Redis Cluster backend RedisCluster(RedisClusterStorageDB), } impl DefaultStorageDB { + /// Accesses a named map #[inline] pub async fn map + Sync + Send>( &self, @@ -286,35 +388,47 @@ impl DefaultStorageDB { expire: Option, ) -> Result { Ok(match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => StorageMap::Sled(db.map(name, expire).await?), + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => StorageMap::Redis(db.map(name, expire).await?), + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => { StorageMap::RedisCluster(db.map(name, expire).await?) } }) } + /// Removes a named map #[inline] pub async fn map_remove(&self, name: K) -> Result<()> where K: AsRef<[u8]> + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.map_remove(name).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.map_remove(name).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.map_remove(name).await, } } + /// Checks if map exists #[inline] pub async fn map_contains_key + Sync + Send>(&self, key: K) -> Result { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.map_contains_key(key).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.map_contains_key(key).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.map_contains_key(key).await, } } + /// Accesses a named list #[inline] pub async fn list + Sync + Send>( &self, @@ -322,35 +436,47 @@ impl DefaultStorageDB { expire: Option, ) -> Result { Ok(match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => StorageList::Sled(db.list(name, expire).await?), + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => StorageList::Redis(db.list(name, expire).await?), + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => { StorageList::RedisCluster(db.list(name, expire).await?) } }) } + /// Removes a named list #[inline] pub async fn list_remove(&self, name: K) -> Result<()> where K: AsRef<[u8]> + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.list_remove(name).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.list_remove(name).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.list_remove(name).await, } } + /// Checks if list exists #[inline] pub async fn list_contains_key + Sync + Send>(&self, key: K) -> Result { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.list_contains_key(key).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.list_contains_key(key).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.list_contains_key(key).await, } } + /// Inserts a key-value pair #[inline] pub async fn insert(&self, key: K, val: &V) -> Result<()> where @@ -358,12 +484,16 @@ impl DefaultStorageDB { V: Serialize + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.insert(key, val).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.insert(key, val).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.insert(key, val).await, } } + /// Retrieves a value by key #[inline] pub async fn get(&self, key: K) -> Result> where @@ -371,121 +501,165 @@ impl DefaultStorageDB { V: DeserializeOwned + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.get(key).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.get(key).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.get(key).await, } } + /// Removes a key-value pair #[inline] pub async fn remove(&self, key: K) -> Result<()> where K: AsRef<[u8]> + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.remove(key).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.remove(key).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.remove(key).await, } } + /// Batch insert of key-value pairs #[inline] pub async fn batch_insert(&self, key_vals: Vec<(Key, V)>) -> Result<()> where V: serde::ser::Serialize + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.batch_insert(key_vals).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.batch_insert(key_vals).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.batch_insert(key_vals).await, } } + /// Batch removal of keys #[inline] pub async fn batch_remove(&self, keys: Vec) -> Result<()> { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.batch_remove(keys).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.batch_remove(keys).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.batch_remove(keys).await, } } + /// Increments a counter #[inline] pub async fn counter_incr(&self, key: K, increment: isize) -> Result<()> where K: AsRef<[u8]> + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.counter_incr(key, increment).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.counter_incr(key, increment).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.counter_incr(key, increment).await, } } + /// Decrements a counter #[inline] pub async fn counter_decr(&self, key: K, decrement: isize) -> Result<()> where K: AsRef<[u8]> + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.counter_decr(key, decrement).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.counter_decr(key, decrement).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.counter_decr(key, decrement).await, } } + /// Gets counter value #[inline] pub async fn counter_get(&self, key: K) -> Result> where K: AsRef<[u8]> + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.counter_get(key).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.counter_get(key).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.counter_get(key).await, } } + /// Sets counter value #[inline] pub async fn counter_set(&self, key: K, val: isize) -> Result<()> where K: AsRef<[u8]> + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.counter_set(key, val).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.counter_set(key, val).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.counter_set(key, val).await, } } + /// Gets number of items (requires "len" feature) #[inline] #[cfg(feature = "len")] pub async fn len(&self) -> Result { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.len().await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.len().await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.len().await, } } + /// Gets total storage size in bytes #[inline] pub async fn db_size(&self) -> Result { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.db_size().await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.db_size().await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.db_size().await, } } + /// Checks if key exists #[inline] pub async fn contains_key + Sync + Send>(&self, key: K) -> Result { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.contains_key(key).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.contains_key(key).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.contains_key(key).await, } } + /// Sets expiration timestamp (requires "ttl" feature) #[inline] #[cfg(feature = "ttl")] pub async fn expire_at(&self, key: K, at: TimestampMillis) -> Result @@ -493,12 +667,16 @@ impl DefaultStorageDB { K: AsRef<[u8]> + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.expire_at(key, at).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.expire_at(key, at).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.expire_at(key, at).await, } } + /// Sets expiration duration (requires "ttl" feature) #[inline] #[cfg(feature = "ttl")] pub async fn expire(&self, key: K, dur: TimestampMillis) -> Result @@ -506,12 +684,16 @@ impl DefaultStorageDB { K: AsRef<[u8]> + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.expire(key, dur).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.expire(key, dur).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.expire(key, dur).await, } } + /// Gets time-to-live (requires "ttl" feature) #[inline] #[cfg(feature = "ttl")] pub async fn ttl(&self, key: K) -> Result> @@ -519,34 +701,46 @@ impl DefaultStorageDB { K: AsRef<[u8]> + Sync + Send, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.ttl(key).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.ttl(key).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.ttl(key).await, } } + /// Iterates over maps #[inline] pub async fn map_iter<'a>( &'a mut self, ) -> Result> + Send + 'a>> { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.map_iter().await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.map_iter().await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.map_iter().await, } } + /// Iterates over lists #[inline] pub async fn list_iter<'a>( &'a mut self, ) -> Result> + Send + 'a>> { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.list_iter().await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.list_iter().await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.list_iter().await, } } + /// Scans keys matching pattern #[inline] pub async fn scan<'a, P>( &'a mut self, @@ -556,26 +750,40 @@ impl DefaultStorageDB { P: AsRef<[u8]> + Send + Sync, { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.scan(pattern).await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.scan(pattern).await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.scan(pattern).await, } } + /// Gets storage information #[inline] pub async fn info(&self) -> Result { match self { + #[cfg(feature = "sled")] DefaultStorageDB::Sled(db) => db.info().await, + #[cfg(feature = "redis")] DefaultStorageDB::Redis(db) => db.info().await, + #[cfg(feature = "redis-cluster")] DefaultStorageDB::RedisCluster(db) => db.info().await, } } } +/// Unified map implementation enum #[derive(Clone)] pub enum StorageMap { + #[cfg(feature = "sled")] + /// Sled map implementation Sled(SledStorageMap), + #[cfg(feature = "redis")] + /// Redis map implementation Redis(RedisStorageMap), + #[cfg(feature = "redis-cluster")] + /// Redis Cluster map implementation RedisCluster(RedisClusterStorageMap), } @@ -583,8 +791,11 @@ pub enum StorageMap { impl Map for StorageMap { fn name(&self) -> &[u8] { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.name(), + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.name(), + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.name(), } } @@ -595,8 +806,11 @@ impl Map for StorageMap { V: Serialize + Sync + Send + ?Sized, { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.insert(key, val).await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.insert(key, val).await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.insert(key, val).await, } } @@ -607,8 +821,11 @@ impl Map for StorageMap { V: DeserializeOwned + Sync + Send, { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.get(key).await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.get(key).await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.get(key).await, } } @@ -618,16 +835,22 @@ impl Map for StorageMap { K: AsRef<[u8]> + Sync + Send, { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.remove(key).await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.remove(key).await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.remove(key).await, } } async fn contains_key + Sync + Send>(&self, key: K) -> Result { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.contains_key(key).await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.contains_key(key).await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.contains_key(key).await, } } @@ -635,24 +858,33 @@ impl Map for StorageMap { #[cfg(feature = "map_len")] async fn len(&self) -> Result { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.len().await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.len().await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.len().await, } } async fn is_empty(&self) -> Result { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.is_empty().await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.is_empty().await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.is_empty().await, } } async fn clear(&self) -> Result<()> { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.clear().await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.clear().await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.clear().await, } } @@ -663,8 +895,11 @@ impl Map for StorageMap { V: DeserializeOwned + Sync + Send, { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.remove_and_fetch(key).await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.remove_and_fetch(key).await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.remove_and_fetch(key).await, } } @@ -674,8 +909,11 @@ impl Map for StorageMap { K: AsRef<[u8]> + Sync + Send, { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.remove_with_prefix(prefix).await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.remove_with_prefix(prefix).await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.remove_with_prefix(prefix).await, } } @@ -685,16 +923,22 @@ impl Map for StorageMap { V: Serialize + Sync + Send, { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.batch_insert(key_vals).await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.batch_insert(key_vals).await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.batch_insert(key_vals).await, } } async fn batch_remove(&self, keys: Vec) -> Result<()> { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.batch_remove(keys).await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.batch_remove(keys).await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.batch_remove(keys).await, } } @@ -706,8 +950,11 @@ impl Map for StorageMap { V: DeserializeOwned + Sync + Send + 'a + 'static, { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.iter().await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.iter().await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.iter().await, } } @@ -716,8 +963,11 @@ impl Map for StorageMap { &'a mut self, ) -> Result> + Send + 'a>> { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.key_iter().await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.key_iter().await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.key_iter().await, } } @@ -731,8 +981,11 @@ impl Map for StorageMap { V: DeserializeOwned + Sync + Send + 'a + 'static, { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.prefix_iter(prefix).await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.prefix_iter(prefix).await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.prefix_iter(prefix).await, } } @@ -740,8 +993,11 @@ impl Map for StorageMap { #[cfg(feature = "ttl")] async fn expire_at(&self, at: TimestampMillis) -> Result { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.expire_at(at).await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.expire_at(at).await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.expire_at(at).await, } } @@ -749,8 +1005,11 @@ impl Map for StorageMap { #[cfg(feature = "ttl")] async fn expire(&self, dur: TimestampMillis) -> Result { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.expire(dur).await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.expire(dur).await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.expire(dur).await, } } @@ -758,25 +1017,38 @@ impl Map for StorageMap { #[cfg(feature = "ttl")] async fn ttl(&self) -> Result> { match self { + #[cfg(feature = "sled")] StorageMap::Sled(m) => m.ttl().await, + #[cfg(feature = "redis")] StorageMap::Redis(m) => m.ttl().await, + #[cfg(feature = "redis-cluster")] StorageMap::RedisCluster(m) => m.ttl().await, } } } +/// Unified list implementation enum #[derive(Clone)] pub enum StorageList { + #[cfg(feature = "sled")] + /// Sled list implementation Sled(SledStorageList), + #[cfg(feature = "redis")] + /// Redis list implementation Redis(RedisStorageList), + #[cfg(feature = "redis-cluster")] + /// Redis Cluster list implementation RedisCluster(RedisClusterStorageList), } impl fmt::Debug for StorageList { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let name = match self { + #[cfg(feature = "sled")] StorageList::Sled(list) => list.name(), + #[cfg(feature = "redis")] StorageList::Redis(list) => list.name(), + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(list) => list.name(), }; @@ -789,8 +1061,11 @@ impl fmt::Debug for StorageList { impl List for StorageList { fn name(&self) -> &[u8] { match self { + #[cfg(feature = "sled")] StorageList::Sled(m) => m.name(), + #[cfg(feature = "redis")] StorageList::Redis(m) => m.name(), + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(m) => m.name(), } } @@ -800,8 +1075,11 @@ impl List for StorageList { V: Serialize + Sync + Send, { match self { + #[cfg(feature = "sled")] StorageList::Sled(list) => list.push(val).await, + #[cfg(feature = "redis")] StorageList::Redis(list) => list.push(val).await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(list) => list.push(val).await, } } @@ -811,8 +1089,11 @@ impl List for StorageList { V: serde::ser::Serialize + Sync + Send, { match self { + #[cfg(feature = "sled")] StorageList::Sled(list) => list.pushs(vals).await, + #[cfg(feature = "redis")] StorageList::Redis(list) => list.pushs(vals).await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(list) => list.pushs(vals).await, } } @@ -828,8 +1109,11 @@ impl List for StorageList { V: DeserializeOwned, { match self { + #[cfg(feature = "sled")] StorageList::Sled(list) => list.push_limit(val, limit, pop_front_if_limited).await, + #[cfg(feature = "redis")] StorageList::Redis(list) => list.push_limit(val, limit, pop_front_if_limited).await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(list) => { list.push_limit(val, limit, pop_front_if_limited).await } @@ -841,8 +1125,11 @@ impl List for StorageList { V: DeserializeOwned + Sync + Send, { match self { + #[cfg(feature = "sled")] StorageList::Sled(list) => list.pop().await, + #[cfg(feature = "redis")] StorageList::Redis(list) => list.pop().await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(list) => list.pop().await, } } @@ -852,8 +1139,11 @@ impl List for StorageList { V: DeserializeOwned + Sync + Send, { match self { + #[cfg(feature = "sled")] StorageList::Sled(list) => list.all().await, + #[cfg(feature = "redis")] StorageList::Redis(list) => list.all().await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(list) => list.all().await, } } @@ -863,32 +1153,44 @@ impl List for StorageList { V: DeserializeOwned + Sync + Send, { match self { + #[cfg(feature = "sled")] StorageList::Sled(list) => list.get_index(idx).await, + #[cfg(feature = "redis")] StorageList::Redis(list) => list.get_index(idx).await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(list) => list.get_index(idx).await, } } async fn len(&self) -> Result { match self { + #[cfg(feature = "sled")] StorageList::Sled(list) => list.len().await, + #[cfg(feature = "redis")] StorageList::Redis(list) => list.len().await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(list) => list.len().await, } } async fn is_empty(&self) -> Result { match self { + #[cfg(feature = "sled")] StorageList::Sled(list) => list.is_empty().await, + #[cfg(feature = "redis")] StorageList::Redis(list) => list.is_empty().await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(list) => list.is_empty().await, } } async fn clear(&self) -> Result<()> { match self { + #[cfg(feature = "sled")] StorageList::Sled(list) => list.clear().await, + #[cfg(feature = "redis")] StorageList::Redis(list) => list.clear().await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(list) => list.clear().await, } } @@ -900,8 +1202,11 @@ impl List for StorageList { V: DeserializeOwned + Sync + Send + 'a + 'static, { match self { + #[cfg(feature = "sled")] StorageList::Sled(list) => list.iter().await, + #[cfg(feature = "redis")] StorageList::Redis(list) => list.iter().await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(list) => list.iter().await, } } @@ -909,8 +1214,11 @@ impl List for StorageList { #[cfg(feature = "ttl")] async fn expire_at(&self, at: TimestampMillis) -> Result { match self { + #[cfg(feature = "sled")] StorageList::Sled(l) => l.expire_at(at).await, + #[cfg(feature = "redis")] StorageList::Redis(l) => l.expire_at(at).await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(l) => l.expire_at(at).await, } } @@ -918,8 +1226,11 @@ impl List for StorageList { #[cfg(feature = "ttl")] async fn expire(&self, dur: TimestampMillis) -> Result { match self { + #[cfg(feature = "sled")] StorageList::Sled(l) => l.expire(dur).await, + #[cfg(feature = "redis")] StorageList::Redis(l) => l.expire(dur).await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(l) => l.expire(dur).await, } } @@ -927,8 +1238,11 @@ impl List for StorageList { #[cfg(feature = "ttl")] async fn ttl(&self) -> Result> { match self { + #[cfg(feature = "sled")] StorageList::Sled(l) => l.ttl().await, + #[cfg(feature = "redis")] StorageList::Redis(l) => l.ttl().await, + #[cfg(feature = "redis-cluster")] StorageList::RedisCluster(l) => l.ttl().await, } } diff --git a/src/storage_redis.rs b/src/storage_redis.rs index e5ab050..002a294 100644 --- a/src/storage_redis.rs +++ b/src/storage_redis.rs @@ -1,13 +1,25 @@ +//! Redis storage implementation for key-value, map, and list data structures +//! +//! This module provides a Redis-backed storage system with support for: +//! - Key-value storage with expiration +//! - Map (hash) data structures +//! - List data structures +//! - Counters with atomic operations +//! - Iteration and scanning capabilities +//! - Standalone Redis connection support + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + use anyhow::anyhow; use async_trait::async_trait; use redis::aio::{ConnectionManager, ConnectionManagerConfig}; use redis::{pipe, AsyncCommands}; use serde::de::DeserializeOwned; +use serde::Deserialize; use serde::Serialize; use serde_json::Value; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::time::Duration; use crate::storage::{AsyncIterator, IterItem, Key, List, Map, StorageDB}; use crate::{Result, StorageList, StorageMap}; @@ -15,17 +27,17 @@ use crate::{Result, StorageList, StorageMap}; #[allow(unused_imports)] use crate::{timestamp_millis, TimestampMillis}; -pub(crate) const SEPARATOR: &[u8] = b"@"; -pub(crate) const KEY_PREFIX: &[u8] = b"__rmqtt@"; -pub(crate) const KEY_PREFIX_LEN: &[u8] = b"__rmqtt_len@"; -pub(crate) const MAP_NAME_PREFIX: &[u8] = b"__rmqtt_map@"; -pub(crate) const LIST_NAME_PREFIX: &[u8] = b"__rmqtt_list@"; +use crate::storage::{KEY_PREFIX, KEY_PREFIX_LEN, LIST_NAME_PREFIX, MAP_NAME_PREFIX, SEPARATOR}; +/// Type alias for Redis connection manager type RedisConnection = ConnectionManager; +/// Configuration for Redis storage #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RedisConfig { + /// Redis server URL pub url: String, + /// Key prefix for all storage operations pub prefix: String, } @@ -38,13 +50,17 @@ impl Default for RedisConfig { } } +/// Redis storage database implementation #[derive(Clone)] pub struct RedisStorageDB { + /// Prefix for all keys prefix: Key, + /// Asynchronous connection manager async_conn: RedisConnection, } impl RedisStorageDB { + /// Creates a new Redis storage instance #[inline] pub(crate) async fn new(cfg: RedisConfig) -> Result { let prefix = [cfg.prefix.as_bytes(), SEPARATOR].concat(); @@ -55,12 +71,16 @@ impl RedisStorageDB { return Err(anyhow!(e)); } }; + + // Configure connection manager with retry settings let mgr_cfg = ConnectionManagerConfig::default() .set_exponent_base(100) .set_factor(2) .set_number_of_retries(2) .set_connection_timeout(Duration::from_secs(15)) .set_response_timeout(Duration::from_secs(10)); + + // Create connection manager let async_conn = match client.get_connection_manager_with_config(mgr_cfg).await { Ok(conn) => conn, Err(e) => { @@ -68,10 +88,13 @@ impl RedisStorageDB { return Err(anyhow!(e)); } }; + + // Create database instance and start cleanup task let db = Self { prefix, async_conn }.cleanup(); Ok(db) } + /// Starts background cleanup task fn cleanup(self) -> Self { let db = self.clone(); tokio::spawn(async move { @@ -90,22 +113,26 @@ impl RedisStorageDB { self } + /// Gets a clone of the async connection #[inline] fn async_conn(&self) -> RedisConnection { self.async_conn.clone() } + /// Gets a mutable reference to the async connection #[inline] fn async_conn_mut(&mut self) -> &mut RedisConnection { &mut self.async_conn } + /// Creates key for length tracking sorted set #[inline] #[allow(dead_code)] fn make_len_sortedset_key(&self) -> Key { [KEY_PREFIX_LEN, self.prefix.as_slice()].concat() } + /// Creates full key with prefix #[inline] fn make_full_key(&self, key: K) -> Key where @@ -114,11 +141,13 @@ impl RedisStorageDB { [KEY_PREFIX, self.prefix.as_slice(), key.as_ref()].concat() } + /// Creates scan pattern with prefix #[inline] fn make_scan_pattern_match>(&self, pattern: P) -> Key { [KEY_PREFIX, self.prefix.as_slice(), pattern.as_ref()].concat() } + /// Creates full map name with prefix #[inline] fn make_map_full_name(&self, name: K) -> Key where @@ -127,6 +156,7 @@ impl RedisStorageDB { [MAP_NAME_PREFIX, self.prefix.as_slice(), name.as_ref()].concat() } + /// Creates full list name with prefix #[inline] fn make_list_full_name(&self, name: K) -> Key where @@ -135,26 +165,31 @@ impl RedisStorageDB { [LIST_NAME_PREFIX, self.prefix.as_slice(), name.as_ref()].concat() } + /// Creates map prefix pattern for scanning #[inline] fn make_map_prefix_match(&self) -> Key { [MAP_NAME_PREFIX, self.prefix.as_slice(), b"*"].concat() } + /// Creates list prefix pattern for scanning #[inline] fn make_list_prefix_match(&self) -> Key { [LIST_NAME_PREFIX, self.prefix.as_slice(), b"*"].concat() } + /// Extracts map key from full name #[inline] fn map_full_name_to_key<'a>(&self, full_name: &'a [u8]) -> &'a [u8] { full_name[MAP_NAME_PREFIX.len() + self.prefix.len()..].as_ref() } + /// Extracts list key from full name #[inline] fn list_full_name_to_key<'a>(&self, full_name: &'a [u8]) -> &'a [u8] { full_name[LIST_NAME_PREFIX.len() + self.prefix.len()..].as_ref() } + /// Gets full key name for a given key #[inline] async fn _get_full_name(&self, key: &[u8]) -> Result { let map_full_name = self.make_map_full_name(key); @@ -172,6 +207,7 @@ impl RedisStorageDB { Ok(full_name) } + /// Internal method to insert a key-value pair #[inline] async fn _insert( &self, @@ -227,12 +263,12 @@ impl RedisStorageDB { Ok(()) } + /// Internal method for batch insertion #[inline] async fn _batch_insert( &self, key_val_expires: Vec<(Key, Vec, Option)>, ) -> Result<()> { - // let full_key = self.make_full_key(k); #[cfg(not(feature = "len"))] { let keys_vals: Vec<(Key, &Vec)> = key_val_expires @@ -284,6 +320,7 @@ impl RedisStorageDB { Ok(()) } + /// Internal method for batch removal #[inline] async fn _batch_remove(&self, keys: Vec) -> Result<()> { let full_keys = keys @@ -308,6 +345,7 @@ impl RedisStorageDB { Ok(()) } + /// Internal method to increment a counter #[inline] async fn _counter_incr( &self, @@ -357,6 +395,7 @@ impl RedisStorageDB { Ok(()) } + /// Internal method to decrement a counter #[inline] async fn _counter_decr( &self, @@ -407,6 +446,7 @@ impl RedisStorageDB { Ok(()) } + /// Internal method to set a counter value #[inline] async fn _counter_set( &self, @@ -457,6 +497,7 @@ impl RedisStorageDB { Ok(()) } + /// Internal method to remove a key #[inline] async fn _remove(&self, key: K) -> Result<()> where @@ -488,6 +529,7 @@ impl StorageDB for RedisStorageDB { type MapType = RedisStorageMap; type ListType = RedisStorageList; + /// Creates a new map with optional expiration #[inline] async fn map + Sync + Send>( &self, @@ -501,6 +543,7 @@ impl StorageDB for RedisStorageDB { ) } + /// Removes a map #[inline] async fn map_remove(&self, name: K) -> Result<()> where @@ -511,12 +554,14 @@ impl StorageDB for RedisStorageDB { Ok(()) } + /// Checks if a map exists #[inline] async fn map_contains_key + Sync + Send>(&self, key: K) -> Result { let map_full_name = self.make_map_full_name(key.as_ref()); Ok(self.async_conn().exists(map_full_name).await?) } + /// Creates a new list with optional expiration #[inline] async fn list + Sync + Send>( &self, @@ -530,6 +575,7 @@ impl StorageDB for RedisStorageDB { ) } + /// Removes a list #[inline] async fn list_remove(&self, name: K) -> Result<()> where @@ -540,12 +586,14 @@ impl StorageDB for RedisStorageDB { Ok(()) } + /// Checks if a list exists #[inline] async fn list_contains_key + Sync + Send>(&self, key: K) -> Result { let list_full_name = self.make_list_full_name(key.as_ref()); Ok(self.async_conn().exists(list_full_name).await?) } + /// Inserts a key-value pair #[inline] async fn insert(&self, key: K, val: &V) -> Result<()> where @@ -555,6 +603,7 @@ impl StorageDB for RedisStorageDB { self._insert(key, val, None).await } + /// Gets a value by key #[inline] async fn get(&self, key: K) -> Result> where @@ -573,6 +622,7 @@ impl StorageDB for RedisStorageDB { } } + /// Removes a key #[inline] async fn remove(&self, key: K) -> Result<()> where @@ -581,6 +631,7 @@ impl StorageDB for RedisStorageDB { self._remove(key).await } + /// Batch insertion of key-value pairs #[inline] async fn batch_insert(&self, key_vals: Vec<(Key, V)>) -> Result<()> where @@ -600,6 +651,7 @@ impl StorageDB for RedisStorageDB { Ok(()) } + /// Batch removal of keys #[inline] async fn batch_remove(&self, keys: Vec) -> Result<()> { if !keys.is_empty() { @@ -608,6 +660,7 @@ impl StorageDB for RedisStorageDB { Ok(()) } + /// Increments a counter #[inline] async fn counter_incr(&self, key: K, increment: isize) -> Result<()> where @@ -616,6 +669,7 @@ impl StorageDB for RedisStorageDB { self._counter_incr(key, increment, None).await } + /// Decrements a counter #[inline] async fn counter_decr(&self, key: K, decrement: isize) -> Result<()> where @@ -624,6 +678,7 @@ impl StorageDB for RedisStorageDB { self._counter_decr(key, decrement, None).await } + /// Gets a counter value #[inline] async fn counter_get(&self, key: K) -> Result> where @@ -633,6 +688,7 @@ impl StorageDB for RedisStorageDB { Ok(self.async_conn().get::<_, Option>(full_key).await?) } + /// Sets a counter value #[inline] async fn counter_set(&self, key: K, val: isize) -> Result<()> where @@ -641,13 +697,14 @@ impl StorageDB for RedisStorageDB { self._counter_set(key, val, None).await } + /// Checks if a key exists #[inline] async fn contains_key + Sync + Send>(&self, key: K) -> Result { - //HEXISTS key field let full_key = self.make_full_key(key.as_ref()); Ok(self.async_conn().exists(full_key).await?) } + /// Gets the number of keys in the database #[inline] #[cfg(feature = "len")] async fn len(&self) -> Result { @@ -661,6 +718,7 @@ impl StorageDB for RedisStorageDB { Ok(count) } + /// Gets the total database size #[inline] async fn db_size(&self) -> Result { let mut async_conn = self.async_conn(); @@ -681,6 +739,7 @@ impl StorageDB for RedisStorageDB { Ok(dbsize.unwrap_or(0) as usize) } + /// Sets expiration time for a key #[inline] #[cfg(feature = "ttl")] async fn expire_at(&self, key: K, at: TimestampMillis) -> Result @@ -710,6 +769,7 @@ impl StorageDB for RedisStorageDB { } } + /// Sets expiration duration for a key #[inline] #[cfg(feature = "ttl")] async fn expire(&self, key: K, dur: TimestampMillis) -> Result @@ -737,6 +797,7 @@ impl StorageDB for RedisStorageDB { } } + /// Gets time-to-live for a key #[inline] #[cfg(feature = "ttl")] async fn ttl(&self, key: K) -> Result> @@ -753,6 +814,7 @@ impl StorageDB for RedisStorageDB { } } + /// Creates an iterator for all maps #[inline] async fn map_iter<'a>( &'a mut self, @@ -765,6 +827,7 @@ impl StorageDB for RedisStorageDB { Ok(Box::new(iter)) } + /// Creates an iterator for all lists #[inline] async fn list_iter<'a>( &'a mut self, @@ -777,6 +840,7 @@ impl StorageDB for RedisStorageDB { Ok(Box::new(iter)) } + /// Creates an iterator for keys matching a pattern async fn scan<'a, P>( &'a mut self, pattern: P, @@ -796,6 +860,7 @@ impl StorageDB for RedisStorageDB { Ok(Box::new(iter)) } + /// Gets database information #[inline] async fn info(&self) -> Result { let mut conn = self.async_conn(); @@ -819,17 +884,24 @@ impl StorageDB for RedisStorageDB { } } +/// Redis-backed map storage implementation #[derive(Clone)] pub struct RedisStorageMap { + /// Name of the map name: Key, + /// Full key name with prefix full_name: Key, + /// Optional expiration time in milliseconds #[allow(dead_code)] expire: Option, + /// Flag indicating if the map is empty empty: Arc, + /// Reference to the parent database pub(crate) db: RedisStorageDB, } impl RedisStorageMap { + /// Creates a new map without expiration #[inline] pub(crate) fn new(name: Key, full_name: Key, db: RedisStorageDB) -> Self { Self { @@ -841,6 +913,7 @@ impl RedisStorageMap { } } + /// Creates a new map with expiration #[inline] pub(crate) async fn new_expire( name: Key, @@ -863,19 +936,21 @@ impl RedisStorageMap { }) } + /// Gets a clone of the async connection #[inline] fn async_conn(&self) -> RedisConnection { self.db.async_conn() } + /// Gets a mutable reference to the async connection #[inline] fn async_conn_mut(&mut self) -> &mut RedisConnection { self.db.async_conn_mut() } + /// Checks if the map is empty #[inline] async fn _is_empty(async_conn: &mut RedisConnection, full_name: &[u8]) -> Result { - //HSCAN key cursor [MATCH pattern] [COUNT count] let res = async_conn .hscan::<_, Vec>(full_name) .await? @@ -885,6 +960,7 @@ impl RedisStorageMap { Ok(res) } + /// Internal method to insert with expiration handling #[inline] async fn _insert_expire(&self, key: &[u8], val: Vec) -> Result<()> { let mut async_conn = self.async_conn(); @@ -893,8 +969,6 @@ impl RedisStorageMap { #[cfg(feature = "ttl")] if self.empty.load(Ordering::SeqCst) { if let Some(expire) = self.expire.as_ref() { - //HSET key field value - //PEXPIRE key ms let _: () = redis::pipe() .atomic() .hset(name, key, val) @@ -906,11 +980,11 @@ impl RedisStorageMap { } } - //HSET key field value let _: () = async_conn.hset(name, key.as_ref(), val).await?; Ok(()) } + /// Internal method for batch insertion with expiration #[inline] async fn _batch_insert_expire(&self, key_vals: Vec<(Key, Vec)>) -> Result<()> { let mut async_conn = self.async_conn(); @@ -919,8 +993,6 @@ impl RedisStorageMap { #[cfg(feature = "ttl")] if self.empty.load(Ordering::SeqCst) { if let Some(expire) = self.expire.as_ref() { - //HMSET key field value - //PEXPIRE key ms let _: () = redis::pipe() .atomic() .hset_multiple(name, key_vals.as_slice()) @@ -933,7 +1005,6 @@ impl RedisStorageMap { } } - //HSET key field value let _: () = async_conn.hset_multiple(name, key_vals.as_slice()).await?; Ok(()) } @@ -941,11 +1012,13 @@ impl RedisStorageMap { #[async_trait] impl Map for RedisStorageMap { + /// Gets the map name #[inline] fn name(&self) -> &[u8] { self.name.as_slice() } + /// Inserts a key-value pair into the map #[inline] async fn insert(&self, key: K, val: &V) -> Result<()> where @@ -956,13 +1029,13 @@ impl Map for RedisStorageMap { .await } + /// Gets a value from the map #[inline] async fn get(&self, key: K) -> Result> where K: AsRef<[u8]> + Sync + Send, V: DeserializeOwned + Sync + Send, { - //HSET key field value let res: Option> = self .async_conn() .hget(self.full_name.as_slice(), key.as_ref()) @@ -974,12 +1047,12 @@ impl Map for RedisStorageMap { } } + /// Removes a key from the map #[inline] async fn remove(&self, key: K) -> Result<()> where K: AsRef<[u8]> + Sync + Send, { - //HDEL key field [field ...] let _: () = self .async_conn() .hdel(self.full_name.as_slice(), key.as_ref()) @@ -987,9 +1060,9 @@ impl Map for RedisStorageMap { Ok(()) } + /// Checks if a key exists in the map #[inline] async fn contains_key + Sync + Send>(&self, key: K) -> Result { - //HEXISTS key field let res = self .async_conn() .hexists(self.full_name.as_slice(), key.as_ref()) @@ -997,16 +1070,16 @@ impl Map for RedisStorageMap { Ok(res) } + /// Gets the number of elements in the map #[cfg(feature = "map_len")] #[inline] async fn len(&self) -> Result { - //HLEN key Ok(self.async_conn().hlen(self.full_name.as_slice()).await?) } + /// Checks if the map is empty #[inline] async fn is_empty(&self) -> Result { - //HSCAN key cursor [MATCH pattern] [COUNT count] let res = self .async_conn() .hscan::<_, Vec>(self.full_name.as_slice()) @@ -1017,22 +1090,21 @@ impl Map for RedisStorageMap { Ok(res) } + /// Clears all elements from the map #[inline] async fn clear(&self) -> Result<()> { - //DEL key [key ...] let _: () = self.async_conn().del(self.full_name.as_slice()).await?; self.empty.store(true, Ordering::SeqCst); Ok(()) } + /// Removes and returns a value from the map #[inline] async fn remove_and_fetch(&self, key: K) -> Result> where K: AsRef<[u8]> + Sync + Send, V: DeserializeOwned + Sync + Send, { - //HSET key field value - //HDEL key field [field ...] let name = self.full_name.as_slice(); let mut conn = self.async_conn(); let (res, _): (Option>, isize) = redis::pipe() @@ -1049,6 +1121,7 @@ impl Map for RedisStorageMap { } } + /// Removes all keys with a given prefix #[inline] async fn remove_with_prefix(&self, prefix: K) -> Result<()> where @@ -1066,7 +1139,7 @@ impl Map for RedisStorageMap { .next_item() .await { - removeds.push(key); + removeds.push(key?); if removeds.len() > 20 { let _: () = conn2.hdel(name, removeds.as_slice()).await?; removeds.clear(); @@ -1078,6 +1151,7 @@ impl Map for RedisStorageMap { Ok(()) } + /// Batch insertion of key-value pairs #[inline] async fn batch_insert(&self, key_vals: Vec<(Key, V)>) -> Result<()> where @@ -1098,6 +1172,7 @@ impl Map for RedisStorageMap { Ok(()) } + /// Batch removal of keys #[inline] async fn batch_remove(&self, keys: Vec) -> Result<()> { if !keys.is_empty() { @@ -1109,6 +1184,7 @@ impl Map for RedisStorageMap { Ok(()) } + /// Creates an iterator over key-value pairs #[inline] async fn iter<'a, V>( &'a mut self, @@ -1127,6 +1203,7 @@ impl Map for RedisStorageMap { Ok(Box::new(iter)) } + /// Creates an iterator over keys #[inline] async fn key_iter<'a>( &'a mut self, @@ -1141,6 +1218,7 @@ impl Map for RedisStorageMap { Ok(Box::new(iter)) } + /// Creates an iterator over key-value pairs with a prefix #[inline] async fn prefix_iter<'a, P, V>( &'a mut self, @@ -1163,6 +1241,7 @@ impl Map for RedisStorageMap { Ok(Box::new(iter)) } + /// Sets expiration time for the map #[cfg(feature = "ttl")] async fn expire_at(&self, at: TimestampMillis) -> Result { let res = self @@ -1172,6 +1251,7 @@ impl Map for RedisStorageMap { Ok(res) } + /// Sets expiration duration for the map #[cfg(feature = "ttl")] async fn expire(&self, dur: TimestampMillis) -> Result { let res = self @@ -1181,6 +1261,7 @@ impl Map for RedisStorageMap { Ok(res) } + /// Gets time-to-live for the map #[cfg(feature = "ttl")] async fn ttl(&self) -> Result> { let mut async_conn = self.async_conn(); @@ -1195,17 +1276,24 @@ impl Map for RedisStorageMap { } } +/// Redis-backed list storage implementation #[derive(Clone)] pub struct RedisStorageList { + /// Name of the list name: Key, + /// Full key name with prefix full_name: Key, + /// Optional expiration time in milliseconds #[allow(dead_code)] expire: Option, + /// Flag indicating if the list is empty empty: Arc, + /// Reference to the parent database pub(crate) db: RedisStorageDB, } impl RedisStorageList { + /// Creates a new list without expiration #[inline] pub(crate) fn new(name: Key, full_name: Key, db: RedisStorageDB) -> Self { Self { @@ -1217,6 +1305,7 @@ impl RedisStorageList { } } + /// Creates a new list with expiration #[inline] pub(crate) async fn new_expire( name: Key, @@ -1239,16 +1328,19 @@ impl RedisStorageList { }) } + /// Gets a clone of the async connection #[inline] pub(crate) fn async_conn(&self) -> RedisConnection { self.db.async_conn() } + /// Checks if the list is empty #[inline] async fn _is_empty(async_conn: &mut RedisConnection, full_name: &[u8]) -> Result { Ok(async_conn.llen::<_, usize>(full_name).await? == 0) } + /// Internal method to push with expiration handling #[inline] async fn _push_expire(&self, val: Vec) -> Result<()> { let mut async_conn = self.async_conn(); @@ -1257,8 +1349,6 @@ impl RedisStorageList { #[cfg(feature = "ttl")] if self.empty.load(Ordering::SeqCst) { if let Some(expire) = self.expire.as_ref() { - //RPUSH key value [value ...] - //PEXPIRE key ms let _: () = redis::pipe() .atomic() .rpush(name, val) @@ -1270,11 +1360,11 @@ impl RedisStorageList { } } - //RPUSH key value [value ...] let _: () = async_conn.rpush(name, val).await?; Ok(()) } + /// Internal method for batch push with expiration #[inline] async fn _pushs_expire(&self, vals: Vec>) -> Result<()> { let mut async_conn = self.async_conn(); @@ -1283,8 +1373,6 @@ impl RedisStorageList { if self.empty.load(Ordering::SeqCst) { if let Some(expire) = self.expire.as_ref() { let name = self.full_name.as_slice(); - //RPUSH key value [value ...] - //PEXPIRE key ms let _: () = redis::pipe() .atomic() .rpush(name, vals) @@ -1296,11 +1384,11 @@ impl RedisStorageList { } } - //RPUSH key value [value ...] let _: () = async_conn.rpush(self.full_name.as_slice(), vals).await?; Ok(()) } + /// Internal method for push with limit and expiration #[inline] async fn _push_limit_expire( &self, @@ -1345,6 +1433,7 @@ impl RedisStorageList { .await } + /// Internal method for push with limit #[inline] async fn _push_limit( &self, @@ -1375,11 +1464,13 @@ impl RedisStorageList { #[async_trait] impl List for RedisStorageList { + /// Gets the list name #[inline] fn name(&self) -> &[u8] { self.name.as_slice() } + /// Pushes a value to the end of the list #[inline] async fn push(&self, val: &V) -> Result<()> where @@ -1388,12 +1479,12 @@ impl List for RedisStorageList { self._push_expire(bincode::serialize(val)?).await } + /// Pushes multiple values to the end of the list #[inline] async fn pushs(&self, vals: Vec) -> Result<()> where V: Serialize + Sync + Send, { - //RPUSH key value [value ...] let vals = vals .into_iter() .map(|v| bincode::serialize(&v).map_err(|e| anyhow!(e))) @@ -1401,6 +1492,7 @@ impl List for RedisStorageList { self._pushs_expire(vals).await } + /// Pushes a value with size limit handling #[inline] async fn push_limit( &self, @@ -1426,12 +1518,12 @@ impl List for RedisStorageList { } } + /// Pops a value from the front of the list #[inline] async fn pop(&self) -> Result> where V: DeserializeOwned + Sync + Send, { - //LPOP key let removed = self .async_conn() .lpop::<_, Option>>(self.full_name.as_slice(), None) @@ -1446,12 +1538,12 @@ impl List for RedisStorageList { Ok(removed) } + /// Gets all values in the list #[inline] async fn all(&self) -> Result> where V: DeserializeOwned + Sync + Send, { - //LRANGE key 0 -1 let all = self .async_conn() .lrange::<_, Vec>>(self.full_name.as_slice(), 0, -1) @@ -1461,12 +1553,12 @@ impl List for RedisStorageList { .collect::>>() } + /// Gets a value by index #[inline] async fn get_index(&self, idx: usize) -> Result> where V: DeserializeOwned + Sync + Send, { - //LINDEX key index let val = self .async_conn() .lindex::<_, Option>>(self.full_name.as_slice(), idx as isize) @@ -1479,17 +1571,19 @@ impl List for RedisStorageList { }) } + /// Gets the length of the list #[inline] async fn len(&self) -> Result { - //LLEN key Ok(self.async_conn().llen(self.full_name.as_slice()).await?) } + /// Checks if the list is empty #[inline] async fn is_empty(&self) -> Result { Ok(self.len().await? == 0) } + /// Clears the list #[inline] async fn clear(&self) -> Result<()> { let _: () = self.async_conn().del(self.full_name.as_slice()).await?; @@ -1497,6 +1591,7 @@ impl List for RedisStorageList { Ok(()) } + /// Creates an iterator over list values #[inline] async fn iter<'a, V>( &'a mut self, @@ -1510,6 +1605,7 @@ impl List for RedisStorageList { ))) } + /// Sets expiration time for the list #[cfg(feature = "ttl")] async fn expire_at(&self, at: TimestampMillis) -> Result { let res = self @@ -1519,6 +1615,7 @@ impl List for RedisStorageList { Ok(res) } + /// Sets expiration duration for the list #[cfg(feature = "ttl")] async fn expire(&self, dur: TimestampMillis) -> Result { let res = self @@ -1528,6 +1625,7 @@ impl List for RedisStorageList { Ok(res) } + /// Gets time-to-live for the list #[cfg(feature = "ttl")] async fn ttl(&self) -> Result> { let mut async_conn = self.async_conn(); @@ -1542,6 +1640,7 @@ impl List for RedisStorageList { } } +/// Iterator for list values pub struct AsyncListValIter<'a, V> { name: &'a [u8], conn: RedisConnection, @@ -1552,6 +1651,7 @@ pub struct AsyncListValIter<'a, V> { } impl<'a, V> AsyncListValIter<'a, V> { + /// Creates a new list value iterator fn new(name: &'a [u8], conn: RedisConnection) -> Self { let start = 0; let limit = 20; @@ -1567,7 +1667,7 @@ impl<'a, V> AsyncListValIter<'a, V> { } #[async_trait] -impl<'a, V> AsyncIterator for AsyncListValIter<'a, V> +impl AsyncIterator for AsyncListValIter<'_, V> where V: DeserializeOwned + Sync + Send + 'static, { @@ -1601,6 +1701,7 @@ where } } +/// Iterator for map entries pub struct AsyncIter<'a, V> { iter: redis::AsyncIter<'a, (Key, Vec)>, _m: std::marker::PhantomData, @@ -1614,82 +1715,96 @@ where type Item = IterItem; async fn next(&mut self) -> Option { - let item = self.iter.next_item().await; - item.map(|(key, v)| match bincode::deserialize::(v.as_ref()) { - Ok(v) => Ok((key, v)), - Err(e) => Err(anyhow::Error::new(e)), - }) + match self.iter.next_item().await { + None => None, + Some(Err(e)) => Some(Err(anyhow::Error::new(e))), + Some(Ok((key, v))) => match bincode::deserialize::(v.as_ref()) { + Ok(v) => Some(Ok((key, v))), + Err(e) => Some(Err(anyhow::Error::new(e))), + }, + } } } +/// Iterator for database keys pub struct AsyncDbKeyIter<'a> { prefix_len: usize, iter: redis::AsyncIter<'a, Key>, } #[async_trait] -impl<'a> AsyncIterator for AsyncDbKeyIter<'a> { +impl AsyncIterator for AsyncDbKeyIter<'_> { type Item = Result; async fn next(&mut self) -> Option { - self.iter - .next_item() - .await - .map(|key| Ok(key[self.prefix_len..].to_vec())) + match self.iter.next_item().await { + None => None, + Some(Err(e)) => Some(Err(anyhow::Error::new(e))), + Some(Ok(key)) => Some(Ok(key[self.prefix_len..].to_vec())), + } } } +/// Iterator for map keys pub struct AsyncKeyIter<'a> { iter: redis::AsyncIter<'a, (Key, ())>, } #[async_trait] -impl<'a> AsyncIterator for AsyncKeyIter<'a> { +impl AsyncIterator for AsyncKeyIter<'_> { type Item = Result; async fn next(&mut self) -> Option { - self.iter.next_item().await.map(|(key, _)| Ok(key)) + match self.iter.next_item().await { + None => None, + Some(Err(e)) => Some(Err(anyhow::Error::new(e))), + Some(Ok((key, _))) => Some(Ok(key)), + } } } +/// Iterator for maps pub struct AsyncMapIter<'a> { db: RedisStorageDB, iter: redis::AsyncIter<'a, Key>, } #[async_trait] -impl<'a> AsyncIterator for AsyncMapIter<'a> { +impl AsyncIterator for AsyncMapIter<'_> { type Item = Result; async fn next(&mut self) -> Option { - let full_name = self.iter.next_item().await; - if let Some(full_name) = full_name { - let name = self.db.map_full_name_to_key(full_name.as_slice()).to_vec(); - let m = RedisStorageMap::new(name, full_name, self.db.clone()); - Some(Ok(StorageMap::Redis(m))) - } else { - None - } + let full_name = match self.iter.next_item().await { + None => return None, + Some(Err(e)) => return Some(Err(anyhow::Error::new(e))), + Some(Ok(key)) => key, + }; + + let name = self.db.map_full_name_to_key(full_name.as_slice()).to_vec(); + let m = RedisStorageMap::new(name, full_name, self.db.clone()); + Some(Ok(StorageMap::Redis(m))) } } +/// Iterator for lists pub struct AsyncListIter<'a> { db: RedisStorageDB, iter: redis::AsyncIter<'a, Key>, } #[async_trait] -impl<'a> AsyncIterator for AsyncListIter<'a> { +impl AsyncIterator for AsyncListIter<'_> { type Item = Result; async fn next(&mut self) -> Option { - let full_name = self.iter.next_item().await; - if let Some(full_name) = full_name { - let name = self.db.list_full_name_to_key(full_name.as_slice()).to_vec(); - let l = RedisStorageList::new(name, full_name, self.db.clone()); - Some(Ok(StorageList::Redis(l))) - } else { - None - } + let full_name = match self.iter.next_item().await { + None => return None, + Some(Err(e)) => return Some(Err(anyhow::Error::new(e))), + Some(Ok(key)) => key, + }; + + let name = self.db.list_full_name_to_key(full_name.as_slice()).to_vec(); + let l = RedisStorageList::new(name, full_name, self.db.clone()); + Some(Ok(StorageList::Redis(l))) } } diff --git a/src/storage_redis_cluster.rs b/src/storage_redis_cluster.rs index 25fde88..099889c 100644 --- a/src/storage_redis_cluster.rs +++ b/src/storage_redis_cluster.rs @@ -1,3 +1,13 @@ +//! Redis-based storage implementation for key-value, map, and list data structures. +//! +//! This module provides a Redis-backed storage system with support for: +//! - Key-value storage with expiration +//! - Map (hash) data structures +//! - List data structures +//! - Counters with atomic operations +//! - Iteration and scanning capabilities +//! - Cluster-aware operations + use std::collections::BTreeMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -5,12 +15,15 @@ use std::time::Duration; use anyhow::anyhow; use async_trait::async_trait; -use redis::aio::{ConnectionLike, ConnectionManager, ConnectionManagerConfig}; -use redis::cluster::ClusterClient; -use redis::cluster_async::ClusterConnection; -use redis::cluster_routing::get_slot; -use redis::{pipe, AsyncCommands, Cmd}; +use redis::{ + aio::{ConnectionLike, ConnectionManager, ConnectionManagerConfig}, + cluster::ClusterClient, + cluster_async::ClusterConnection, + cluster_routing::get_slot, + pipe, AsyncCommands, Cmd, +}; use serde::de::DeserializeOwned; +use serde::Deserialize; use serde::Serialize; use serde_json::Value; @@ -20,15 +33,18 @@ use crate::{Result, StorageList, StorageMap}; #[allow(unused_imports)] use crate::{timestamp_millis, TimestampMillis}; -use crate::storage_redis::{ - KEY_PREFIX, KEY_PREFIX_LEN, LIST_NAME_PREFIX, MAP_NAME_PREFIX, SEPARATOR, -}; +use crate::storage::{KEY_PREFIX, KEY_PREFIX_LEN, LIST_NAME_PREFIX, MAP_NAME_PREFIX, SEPARATOR}; +/// Type alias for Redis cluster connection type RedisConnection = ClusterConnection; +/// Configuration for Redis storage #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RedisConfig { + /// Redis server URLs pub urls: Vec, + + /// Key prefix for all storage operations pub prefix: String, } @@ -41,15 +57,21 @@ impl Default for RedisConfig { } } +/// Redis storage database implementation #[derive(Clone)] pub struct RedisStorageDB { + /// Prefix for all keys prefix: Key, + /// Asynchronous connection to Redis cluster async_conn: RedisConnection, + /// Connection managers for cluster nodes nodes: Vec, + /// Last update time for cluster nodes nodes_update_time: TimestampMillis, } impl RedisStorageDB { + /// Creates a new Redis storage instance #[inline] pub(crate) async fn new(cfg: RedisConfig) -> Result { let prefix = [cfg.prefix.as_bytes(), SEPARATOR].concat(); @@ -74,6 +96,7 @@ impl RedisStorageDB { Ok(db) } + /// Starts background cleanup task fn cleanup(self) -> Self { let db = self.clone(); tokio::spawn(async move { @@ -92,16 +115,19 @@ impl RedisStorageDB { self } + /// Gets a clone of the async connection #[inline] fn async_conn(&self) -> RedisConnection { self.async_conn.clone() } + /// Gets a mutable reference to the async connection #[inline] fn async_conn_mut(&mut self) -> &mut RedisConnection { &mut self.async_conn } + /// Refreshes cluster node information #[inline] async fn refresh_cluster_nodes(&mut self) -> Result<()> { let slots = self @@ -166,6 +192,7 @@ impl RedisStorageDB { Ok(()) } + /// Gets mutable reference to cluster nodes with refresh #[inline] async fn nodes_mut(&mut self) -> Result<&mut Vec> { //Refresh after a certain interval. @@ -175,12 +202,14 @@ impl RedisStorageDB { Ok(&mut self.nodes) } + /// Creates key for length tracking sorted set #[inline] #[allow(dead_code)] fn make_len_sortedset_key(&self) -> Key { [KEY_PREFIX_LEN, self.prefix.as_slice()].concat() } + /// Creates full key with prefix #[inline] fn make_full_key(&self, key: K) -> Key where @@ -189,11 +218,13 @@ impl RedisStorageDB { [KEY_PREFIX, self.prefix.as_slice(), key.as_ref()].concat() } + /// Creates scan pattern with prefix #[inline] fn make_scan_pattern_match>(&self, pattern: P) -> Key { [KEY_PREFIX, self.prefix.as_slice(), pattern.as_ref()].concat() } + /// Creates full map name with prefix #[inline] fn make_map_full_name(&self, name: K) -> Key where @@ -202,6 +233,7 @@ impl RedisStorageDB { [MAP_NAME_PREFIX, self.prefix.as_slice(), name.as_ref()].concat() } + /// Creates full list name with prefix #[inline] fn make_list_full_name(&self, name: K) -> Key where @@ -210,26 +242,31 @@ impl RedisStorageDB { [LIST_NAME_PREFIX, self.prefix.as_slice(), name.as_ref()].concat() } + /// Creates map prefix pattern for scanning #[inline] fn make_map_prefix_match(&self) -> Key { [MAP_NAME_PREFIX, self.prefix.as_slice(), b"*"].concat() } + /// Creates list prefix pattern for scanning #[inline] fn make_list_prefix_match(&self) -> Key { [LIST_NAME_PREFIX, self.prefix.as_slice(), b"*"].concat() } + /// Extracts map key from full name #[inline] fn map_full_name_to_key<'a>(&self, full_name: &'a [u8]) -> &'a [u8] { full_name[MAP_NAME_PREFIX.len() + self.prefix.len()..].as_ref() } + /// Extracts list key from full name #[inline] fn list_full_name_to_key<'a>(&self, full_name: &'a [u8]) -> &'a [u8] { full_name[LIST_NAME_PREFIX.len() + self.prefix.len()..].as_ref() } + /// Gets full key name for a given key #[inline] async fn _get_full_name(&self, key: &[u8]) -> Result { let map_full_name = self.make_map_full_name(key); @@ -247,6 +284,7 @@ impl RedisStorageDB { Ok(full_name) } + /// Internal method to insert a key-value pair #[inline] async fn _insert( &self, @@ -327,6 +365,7 @@ impl RedisStorageDB { Ok(()) } + /// Internal method for batch insertion #[inline] async fn _batch_insert( &self, @@ -372,6 +411,7 @@ impl RedisStorageDB { } } + /// Internal method for batch removal #[inline] async fn _batch_remove(&self, _keys: Vec) -> Result<()> { #[cfg(not(feature = "len"))] @@ -396,6 +436,7 @@ impl RedisStorageDB { Ok(()) } + /// Internal method to increment a counter #[inline] async fn _counter_incr( &self, @@ -469,6 +510,7 @@ impl RedisStorageDB { Ok(()) } + /// Internal method to decrement a counter #[inline] async fn _counter_decr( &self, @@ -542,6 +584,7 @@ impl RedisStorageDB { Ok(()) } + /// Internal method to set a counter value #[inline] async fn _counter_set( &self, @@ -616,6 +659,7 @@ impl RedisStorageDB { Ok(()) } + /// Internal method to remove a key #[inline] async fn _remove(&self, _key: K) -> Result<()> where @@ -659,6 +703,7 @@ impl StorageDB for RedisStorageDB { type MapType = RedisStorageMap; type ListType = RedisStorageList; + /// Creates a new map with optional expiration #[inline] async fn map + Sync + Send>( &self, @@ -672,6 +717,7 @@ impl StorageDB for RedisStorageDB { ) } + /// Removes a map #[inline] async fn map_remove(&self, name: K) -> Result<()> where @@ -682,12 +728,14 @@ impl StorageDB for RedisStorageDB { Ok(()) } + /// Checks if a map exists #[inline] async fn map_contains_key + Sync + Send>(&self, key: K) -> Result { let map_full_name = self.make_map_full_name(key.as_ref()); Ok(self.async_conn().exists(map_full_name).await?) } + /// Creates a new list with optional expiration #[inline] async fn list + Sync + Send>( &self, @@ -701,6 +749,7 @@ impl StorageDB for RedisStorageDB { ) } + /// Removes a list #[inline] async fn list_remove(&self, name: K) -> Result<()> where @@ -711,12 +760,14 @@ impl StorageDB for RedisStorageDB { Ok(()) } + /// Checks if a list exists #[inline] async fn list_contains_key + Sync + Send>(&self, key: K) -> Result { let list_full_name = self.make_list_full_name(key.as_ref()); Ok(self.async_conn().exists(list_full_name).await?) } + /// Inserts a key-value pair #[inline] async fn insert(&self, key: K, val: &V) -> Result<()> where @@ -726,6 +777,7 @@ impl StorageDB for RedisStorageDB { self._insert(key, val, None).await } + /// Gets a value by key #[inline] async fn get(&self, key: K) -> Result> where @@ -744,6 +796,7 @@ impl StorageDB for RedisStorageDB { } } + /// Removes a key #[inline] async fn remove(&self, key: K) -> Result<()> where @@ -752,6 +805,7 @@ impl StorageDB for RedisStorageDB { self._remove(key).await } + /// Batch insertion of key-value pairs #[inline] async fn batch_insert(&self, key_vals: Vec<(Key, V)>) -> Result<()> where @@ -777,6 +831,7 @@ impl StorageDB for RedisStorageDB { Ok(()) } + /// Batch removal of keys #[inline] async fn batch_remove(&self, keys: Vec) -> Result<()> { if !keys.is_empty() { @@ -785,6 +840,7 @@ impl StorageDB for RedisStorageDB { Ok(()) } + /// Increments a counter #[inline] async fn counter_incr(&self, key: K, increment: isize) -> Result<()> where @@ -793,6 +849,7 @@ impl StorageDB for RedisStorageDB { self._counter_incr(key, increment, None).await } + /// Decrements a counter #[inline] async fn counter_decr(&self, key: K, decrement: isize) -> Result<()> where @@ -801,6 +858,7 @@ impl StorageDB for RedisStorageDB { self._counter_decr(key, decrement, None).await } + /// Gets a counter value #[inline] async fn counter_get(&self, key: K) -> Result> where @@ -810,6 +868,7 @@ impl StorageDB for RedisStorageDB { Ok(self.async_conn().get::<_, Option>(full_key).await?) } + /// Sets a counter value #[inline] async fn counter_set(&self, key: K, val: isize) -> Result<()> where @@ -818,6 +877,7 @@ impl StorageDB for RedisStorageDB { self._counter_set(key, val, None).await } + /// Checks if a key exists #[inline] async fn contains_key + Sync + Send>(&self, key: K) -> Result { //HEXISTS key field @@ -825,6 +885,7 @@ impl StorageDB for RedisStorageDB { Ok(self.async_conn().exists(full_key).await?) } + /// Gets the number of keys in the database #[inline] #[cfg(feature = "len")] async fn len(&self) -> Result { @@ -843,6 +904,7 @@ impl StorageDB for RedisStorageDB { } } + /// Gets the total database size #[inline] async fn db_size(&self) -> Result { let mut dbsize = 0; @@ -868,6 +930,7 @@ impl StorageDB for RedisStorageDB { Ok(dbsize as usize) } + /// Sets expiration time for a key #[inline] #[cfg(feature = "ttl")] async fn expire_at(&self, _key: K, _at: TimestampMillis) -> Result @@ -909,6 +972,7 @@ impl StorageDB for RedisStorageDB { } } + /// Sets expiration duration for a key #[inline] #[cfg(feature = "ttl")] async fn expire(&self, _key: K, _dur: TimestampMillis) -> Result @@ -954,6 +1018,7 @@ impl StorageDB for RedisStorageDB { } } + /// Gets time-to-live for a key #[inline] #[cfg(feature = "ttl")] async fn ttl(&self, key: K) -> Result> @@ -970,6 +1035,7 @@ impl StorageDB for RedisStorageDB { } } + /// Creates an iterator for all maps #[inline] async fn map_iter<'a>( &'a mut self, @@ -987,6 +1053,7 @@ impl StorageDB for RedisStorageDB { Ok(Box::new(iter)) } + /// Creates an iterator for all lists #[inline] async fn list_iter<'a>( &'a mut self, @@ -1004,6 +1071,7 @@ impl StorageDB for RedisStorageDB { Ok(Box::new(iter)) } + /// Creates an iterator for keys matching a pattern async fn scan<'a, P>( &'a mut self, pattern: P, @@ -1022,6 +1090,7 @@ impl StorageDB for RedisStorageDB { Ok(Box::new(AsyncDbKeyIter { prefix_len, iters })) } + /// Gets database information #[inline] async fn info(&self) -> Result { Ok(serde_json::json!({ @@ -1031,17 +1100,24 @@ impl StorageDB for RedisStorageDB { } } +/// Redis-backed map storage implementation #[derive(Clone)] pub struct RedisStorageMap { + /// Name of the map name: Key, + /// Full key name with prefix full_name: Key, + /// Optional expiration time in milliseconds #[allow(dead_code)] expire: Option, + /// Flag indicating if the map is empty empty: Arc, + /// Reference to the parent database pub(crate) db: RedisStorageDB, } impl RedisStorageMap { + /// Creates a new map without expiration #[inline] pub(crate) fn new(name: Key, full_name: Key, db: RedisStorageDB) -> Self { Self { @@ -1053,6 +1129,7 @@ impl RedisStorageMap { } } + /// Creates a new map with expiration #[inline] pub(crate) async fn new_expire( name: Key, @@ -1075,16 +1152,19 @@ impl RedisStorageMap { }) } + /// Gets a clone of the async connection #[inline] fn async_conn(&self) -> RedisConnection { self.db.async_conn() } + /// Gets a mutable reference to the async connection #[inline] fn async_conn_mut(&mut self) -> &mut RedisConnection { self.db.async_conn_mut() } + /// Checks if the map is empty #[inline] async fn _is_empty(async_conn: &mut RedisConnection, full_name: &[u8]) -> Result { //HSCAN key cursor [MATCH pattern] [COUNT count] @@ -1097,6 +1177,7 @@ impl RedisStorageMap { Ok(res) } + /// Internal method to insert with expiration handling #[inline] async fn _insert_expire(&self, key: &[u8], val: Vec) -> Result<()> { let mut async_conn = self.async_conn(); @@ -1123,6 +1204,7 @@ impl RedisStorageMap { Ok(()) } + /// Internal method for batch insertion with expiration #[inline] async fn _batch_insert_expire(&self, key_vals: Vec<(Key, Vec)>) -> Result<()> { let mut async_conn = self.async_conn(); @@ -1153,11 +1235,13 @@ impl RedisStorageMap { #[async_trait] impl Map for RedisStorageMap { + /// Gets the map name #[inline] fn name(&self) -> &[u8] { self.name.as_slice() } + /// Inserts a key-value pair into the map #[inline] async fn insert(&self, key: K, val: &V) -> Result<()> where @@ -1168,6 +1252,7 @@ impl Map for RedisStorageMap { .await } + /// Gets a value from the map #[inline] async fn get(&self, key: K) -> Result> where @@ -1186,6 +1271,7 @@ impl Map for RedisStorageMap { } } + /// Removes a key from the map #[inline] async fn remove(&self, key: K) -> Result<()> where @@ -1199,6 +1285,7 @@ impl Map for RedisStorageMap { Ok(()) } + /// Checks if a key exists in the map #[inline] async fn contains_key + Sync + Send>(&self, key: K) -> Result { //HEXISTS key field @@ -1209,6 +1296,7 @@ impl Map for RedisStorageMap { Ok(res) } + /// Gets the number of elements in the map #[cfg(feature = "map_len")] #[inline] async fn len(&self) -> Result { @@ -1216,6 +1304,7 @@ impl Map for RedisStorageMap { Ok(self.async_conn().hlen(self.full_name.as_slice()).await?) } + /// Checks if the map is empty #[inline] async fn is_empty(&self) -> Result { //HSCAN key cursor [MATCH pattern] [COUNT count] @@ -1229,6 +1318,7 @@ impl Map for RedisStorageMap { Ok(res) } + /// Clears all elements from the map #[inline] async fn clear(&self) -> Result<()> { //DEL key [key ...] @@ -1237,6 +1327,7 @@ impl Map for RedisStorageMap { Ok(()) } + /// Removes and returns a value from the map #[inline] async fn remove_and_fetch(&self, key: K) -> Result> where @@ -1261,6 +1352,7 @@ impl Map for RedisStorageMap { } } + /// Removes all keys with a given prefix #[inline] async fn remove_with_prefix(&self, prefix: K) -> Result<()> where @@ -1278,7 +1370,7 @@ impl Map for RedisStorageMap { .next_item() .await { - removeds.push(key); + removeds.push(key?); if removeds.len() > 20 { let _: () = conn2.hdel(name, removeds.as_slice()).await?; removeds.clear(); @@ -1290,6 +1382,7 @@ impl Map for RedisStorageMap { Ok(()) } + /// Batch insertion of key-value pairs #[inline] async fn batch_insert(&self, key_vals: Vec<(Key, V)>) -> Result<()> where @@ -1310,6 +1403,7 @@ impl Map for RedisStorageMap { Ok(()) } + /// Batch removal of keys #[inline] async fn batch_remove(&self, keys: Vec) -> Result<()> { if !keys.is_empty() { @@ -1321,6 +1415,7 @@ impl Map for RedisStorageMap { Ok(()) } + /// Creates an iterator over key-value pairs #[inline] async fn iter<'a, V>( &'a mut self, @@ -1339,6 +1434,7 @@ impl Map for RedisStorageMap { Ok(Box::new(iter)) } + /// Creates an iterator over keys #[inline] async fn key_iter<'a>( &'a mut self, @@ -1353,6 +1449,7 @@ impl Map for RedisStorageMap { Ok(Box::new(iter)) } + /// Creates an iterator over key-value pairs with a prefix #[inline] async fn prefix_iter<'a, P, V>( &'a mut self, @@ -1375,6 +1472,7 @@ impl Map for RedisStorageMap { Ok(Box::new(iter)) } + /// Sets expiration time for the map #[cfg(feature = "ttl")] async fn expire_at(&self, at: TimestampMillis) -> Result { let res = self @@ -1384,6 +1482,7 @@ impl Map for RedisStorageMap { Ok(res) } + /// Sets expiration duration for the map #[cfg(feature = "ttl")] async fn expire(&self, dur: TimestampMillis) -> Result { let res = self @@ -1393,6 +1492,7 @@ impl Map for RedisStorageMap { Ok(res) } + /// Gets time-to-live for the map #[cfg(feature = "ttl")] async fn ttl(&self) -> Result> { let mut async_conn = self.async_conn(); @@ -1407,17 +1507,24 @@ impl Map for RedisStorageMap { } } +/// Redis-backed list storage implementation #[derive(Clone)] pub struct RedisStorageList { + /// Name of the list name: Key, + /// Full key name with prefix full_name: Key, + /// Optional expiration time in milliseconds #[allow(dead_code)] expire: Option, + /// Flag indicating if the list is empty empty: Arc, + /// Reference to the parent database pub(crate) db: RedisStorageDB, } impl RedisStorageList { + /// Creates a new list without expiration #[inline] pub(crate) fn new(name: Key, full_name: Key, db: RedisStorageDB) -> Self { Self { @@ -1429,6 +1536,7 @@ impl RedisStorageList { } } + /// Creates a new list with expiration #[inline] pub(crate) async fn new_expire( name: Key, @@ -1451,16 +1559,19 @@ impl RedisStorageList { }) } + /// Gets a clone of the async connection #[inline] pub(crate) fn async_conn(&self) -> RedisConnection { self.db.async_conn() } + /// Checks if the list is empty #[inline] async fn _is_empty(async_conn: &mut RedisConnection, full_name: &[u8]) -> Result { Ok(async_conn.llen::<_, usize>(full_name).await? == 0) } + /// Internal method to push with expiration handling #[inline] async fn _push_expire(&self, val: Vec) -> Result<()> { let mut async_conn = self.async_conn(); @@ -1487,6 +1598,7 @@ impl RedisStorageList { Ok(()) } + /// Internal method for batch push with expiration #[inline] async fn _pushs_expire(&self, vals: Vec>) -> Result<()> { let mut async_conn = self.async_conn(); @@ -1513,6 +1625,7 @@ impl RedisStorageList { Ok(()) } + /// Internal method for push with limit and expiration #[inline] async fn _push_limit_expire( &self, @@ -1557,6 +1670,7 @@ impl RedisStorageList { .await } + /// Internal method for push with limit #[inline] async fn _push_limit( &self, @@ -1587,11 +1701,13 @@ impl RedisStorageList { #[async_trait] impl List for RedisStorageList { + /// Gets the list name #[inline] fn name(&self) -> &[u8] { self.name.as_slice() } + /// Pushes a value to the end of the list #[inline] async fn push(&self, val: &V) -> Result<()> where @@ -1600,6 +1716,7 @@ impl List for RedisStorageList { self._push_expire(bincode::serialize(val)?).await } + /// Pushes multiple values to the end of the list #[inline] async fn pushs(&self, vals: Vec) -> Result<()> where @@ -1613,6 +1730,7 @@ impl List for RedisStorageList { self._pushs_expire(vals).await } + /// Pushes a value with size limit handling #[inline] async fn push_limit( &self, @@ -1638,6 +1756,7 @@ impl List for RedisStorageList { } } + /// Pops a value from the front of the list #[inline] async fn pop(&self) -> Result> where @@ -1658,6 +1777,7 @@ impl List for RedisStorageList { Ok(removed) } + /// Gets all values in the list #[inline] async fn all(&self) -> Result> where @@ -1673,6 +1793,7 @@ impl List for RedisStorageList { .collect::>>() } + /// Gets a value by index #[inline] async fn get_index(&self, idx: usize) -> Result> where @@ -1691,17 +1812,20 @@ impl List for RedisStorageList { }) } + /// Gets the length of the list #[inline] async fn len(&self) -> Result { //LLEN key Ok(self.async_conn().llen(self.full_name.as_slice()).await?) } + /// Checks if the list is empty #[inline] async fn is_empty(&self) -> Result { Ok(self.len().await? == 0) } + /// Clears the list #[inline] async fn clear(&self) -> Result<()> { let _: () = self.async_conn().del(self.full_name.as_slice()).await?; @@ -1709,6 +1833,7 @@ impl List for RedisStorageList { Ok(()) } + /// Creates an iterator over list values #[inline] async fn iter<'a, V>( &'a mut self, @@ -1722,6 +1847,7 @@ impl List for RedisStorageList { ))) } + /// Sets expiration time for the list #[cfg(feature = "ttl")] async fn expire_at(&self, at: TimestampMillis) -> Result { let res = self @@ -1731,6 +1857,7 @@ impl List for RedisStorageList { Ok(res) } + /// Sets expiration duration for the list #[cfg(feature = "ttl")] async fn expire(&self, dur: TimestampMillis) -> Result { let res = self @@ -1740,6 +1867,7 @@ impl List for RedisStorageList { Ok(res) } + /// Gets time-to-live for the list #[cfg(feature = "ttl")] async fn ttl(&self) -> Result> { let mut async_conn = self.async_conn(); @@ -1754,6 +1882,7 @@ impl List for RedisStorageList { } } +/// Iterator for list values pub struct AsyncListValIter<'a, V> { name: &'a [u8], conn: RedisConnection, @@ -1779,9 +1908,9 @@ impl<'a, V> AsyncListValIter<'a, V> { } #[async_trait] -impl<'a, V> AsyncIterator for AsyncListValIter<'a, V> +impl AsyncIterator for AsyncListValIter<'_, V> where - V: DeserializeOwned + Sync + Send + 'static, + V: DeserializeOwned + Sync + Send, { type Item = Result; @@ -1813,20 +1942,25 @@ where } } +/// Iterator for map entries pub struct AsyncIter<'a, V> { iter: redis::AsyncIter<'a, (Key, Vec)>, _m: std::marker::PhantomData, } #[async_trait] -impl<'a, V> AsyncIterator for AsyncIter<'a, V> +impl AsyncIterator for AsyncIter<'_, V> where - V: DeserializeOwned + Sync + Send + 'a, + V: DeserializeOwned + Sync + Send, { type Item = IterItem; async fn next(&mut self) -> Option { - let item = self.iter.next_item().await; + let item = match self.iter.next_item().await { + None => None, + Some(Err(e)) => return Some(Err(anyhow::Error::new(e))), + Some(Ok(item)) => Some(item), + }; item.map(|(key, v)| match bincode::deserialize::(v.as_ref()) { Ok(v) => Ok((key, v)), Err(e) => Err(anyhow::Error::new(e)), @@ -1834,20 +1968,23 @@ where } } +/// Iterator for database keys pub struct AsyncDbKeyIter<'a> { prefix_len: usize, iters: Vec>, } #[async_trait] -impl<'a> AsyncIterator for AsyncDbKeyIter<'a> { +impl AsyncIterator for AsyncDbKeyIter<'_> { type Item = Result; async fn next(&mut self) -> Option { while let Some(iter) = self.iters.last_mut() { - let item = iter - .next_item() - .await - .map(|key| Ok(key[self.prefix_len..].to_vec())); + let item = match iter.next_item().await { + None => None, + Some(Err(e)) => Some(Err(anyhow::Error::new(e))), + Some(Ok(key)) => Some(Ok(key[self.prefix_len..].to_vec())), + }; + if item.is_some() { return item; } @@ -1860,31 +1997,42 @@ impl<'a> AsyncIterator for AsyncDbKeyIter<'a> { } } +/// Iterator for map keys pub struct AsyncKeyIter<'a> { iter: redis::AsyncIter<'a, (Key, ())>, } #[async_trait] -impl<'a> AsyncIterator for AsyncKeyIter<'a> { +impl AsyncIterator for AsyncKeyIter<'_> { type Item = Result; async fn next(&mut self) -> Option { - self.iter.next_item().await.map(|(key, _)| Ok(key)) + match self.iter.next_item().await { + None => None, + Some(Err(e)) => Some(Err(anyhow::Error::new(e))), + Some(Ok((key, _))) => Some(Ok(key)), + } } } +/// Iterator for maps pub struct AsyncMapIter<'a> { db: RedisStorageDB, iters: Vec>, } #[async_trait] -impl<'a> AsyncIterator for AsyncMapIter<'a> { +impl AsyncIterator for AsyncMapIter<'_> { type Item = Result; async fn next(&mut self) -> Option { while let Some(iter) = self.iters.last_mut() { - let full_name = iter.next_item().await; + let full_name = match iter.next_item().await { + None => None, + Some(Err(e)) => return Some(Err(anyhow::Error::new(e))), + Some(Ok(key)) => Some(key), + }; + if let Some(full_name) = full_name { let name = self.db.map_full_name_to_key(full_name.as_slice()).to_vec(); let m = RedisStorageMap::new(name, full_name, self.db.clone()); @@ -1899,18 +2047,24 @@ impl<'a> AsyncIterator for AsyncMapIter<'a> { } } +/// Iterator for lists pub struct AsyncListIter<'a> { db: RedisStorageDB, iters: Vec>, } #[async_trait] -impl<'a> AsyncIterator for AsyncListIter<'a> { +impl AsyncIterator for AsyncListIter<'_> { type Item = Result; async fn next(&mut self) -> Option { while let Some(iter) = self.iters.last_mut() { - let full_name = iter.next_item().await; + let full_name = match iter.next_item().await { + None => None, + Some(Err(e)) => return Some(Err(anyhow::Error::new(e))), + Some(Ok(key)) => Some(key), + }; + if let Some(full_name) = full_name { let name = self.db.list_full_name_to_key(full_name.as_slice()).to_vec(); let l = RedisStorageList::new(name, full_name, self.db.clone()); @@ -1925,6 +2079,7 @@ impl<'a> AsyncIterator for AsyncListIter<'a> { } } +/// Groups items by Redis slot #[inline] fn transform_by_slot(input: Vec<(u16, T)>) -> Vec> { let mut grouped_data: BTreeMap> = BTreeMap::new(); diff --git a/src/storage_sled.rs b/src/storage_sled.rs index 8a6f103..73f0b42 100644 --- a/src/storage_sled.rs +++ b/src/storage_sled.rs @@ -1,3 +1,17 @@ +//! Sled-based persistent storage implementation +//! +//! This module provides a persistent storage solution backed by Sled (an embedded database). +//! It implements key-value storage, maps (dictionaries), and lists (queues) with support for: +//! - Atomic operations and transactions +//! - Asynchronous API +//! - TTL/expiration (optional feature) +//! - Counters +//! - Batch operations +//! - Iterators +//! +//! The implementation uses multiple sled trees for different data types and provides +//! a command-based interface with background processing for concurrent operations. + use core::fmt; use std::borrow::Cow; use std::fmt::Debug; @@ -7,10 +21,11 @@ use std::ops::Deref; use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering}; use std::sync::Arc; -use anyhow::anyhow; +use anyhow::{anyhow, Error}; use async_trait::async_trait; use convert::Bytesize; use serde::de::DeserializeOwned; +use serde::Deserialize; use serde::Serialize; use serde_json::Value; @@ -31,32 +46,49 @@ use tokio::task::spawn_blocking; use crate::storage::{AsyncIterator, IterItem, Key, List, Map, StorageDB}; #[allow(unused_imports)] use crate::{timestamp_millis, TimestampMillis}; -use crate::{Error, Result, StorageList, StorageMap}; +use crate::{Result, StorageList, StorageMap}; +/// Byte separator used in composite keys const SEPARATOR: &[u8] = b"@"; +/// Tree name for key-value storage const KV_TREE: &[u8] = b"__kv_tree@"; +/// Tree name for map metadata const MAP_TREE: &[u8] = b"__map_tree@"; +/// Tree name for list metadata const LIST_TREE: &[u8] = b"__list_tree@"; +/// Tree for tracking expiration times (expire_at => key) const EXPIRE_KEYS_TREE: &[u8] = b"__expire_key_tree@"; +/// Tree for tracking key expiration (key => expire_at) const KEY_EXPIRE_TREE: &[u8] = b"__key_expire_tree@"; +/// Prefix for map keys const MAP_NAME_PREFIX: &[u8] = b"__map@"; +/// Separator between map name and item key const MAP_KEY_SEPARATOR: &[u8] = b"@__item@"; #[allow(dead_code)] +/// Suffix for map count keys const MAP_KEY_COUNT_SUFFIX: &[u8] = b"@__count@"; +/// Prefix for list keys const LIST_NAME_PREFIX: &[u8] = b"__list@"; +/// Suffix for list count keys const LIST_KEY_COUNT_SUFFIX: &[u8] = b"@__count@"; +/// Suffix for list content keys const LIST_KEY_CONTENT_SUFFIX: &[u8] = b"@__content@"; +/// Enum representing different key types in storage #[allow(dead_code)] #[derive(Debug, Clone, Copy, Deserialize, Serialize)] enum KeyType { + /// Key-value pair KV, + /// Map structure Map, + /// List structure List, } impl KeyType { + /// Encodes key type to a single byte #[inline] #[allow(dead_code)] fn encode(&self) -> &[u8] { @@ -67,6 +99,7 @@ impl KeyType { } } + /// Decodes key type from byte representation #[inline] #[allow(dead_code)] fn decode(v: &[u8]) -> Result { @@ -83,7 +116,9 @@ impl KeyType { } } +/// Enum representing all possible storage operations enum Command { + // Database operations DBInsert(SledStorageDB, Key, Vec, oneshot::Sender>), DBGet(SledStorageDB, IVec, oneshot::Sender>>), DBRemove(SledStorageDB, IVec, oneshot::Sender>), @@ -130,6 +165,7 @@ enum Command { DBLen(SledStorageDB, oneshot::Sender), DBSize(SledStorageDB, oneshot::Sender), + // Map operations MapInsert(SledStorageMap, IVec, IVec, oneshot::Sender>), MapGet(SledStorageMap, IVec, oneshot::Sender>>), MapRemove(SledStorageMap, IVec, oneshot::Sender>), @@ -160,6 +196,7 @@ enum Command { MapIsExpired(SledStorageMap, oneshot::Sender>), MapPrefixIter(SledStorageMap, Option, oneshot::Sender), + // List operations ListPush(SledStorageList, IVec, oneshot::Sender>), ListPushs(SledStorageList, Vec, oneshot::Sender>), ListPushLimit( @@ -193,6 +230,7 @@ enum Command { ListIsExpired(SledStorageList, oneshot::Sender>), ListPrefixIter(SledStorageList, oneshot::Sender), + // Iterator operation #[allow(clippy::type_complexity)] IterNext( sled::Iter, @@ -200,8 +238,10 @@ enum Command { ), } +/// Type alias for cleanup function signature pub type CleanupFun = fn(&SledStorageDB); +/// Default cleanup function that runs in background thread fn def_cleanup(_db: &SledStorageDB) { #[cfg(feature = "ttl")] { @@ -246,10 +286,14 @@ fn def_cleanup(_db: &SledStorageDB) { } } +/// Configuration for Sled storage backend #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SledConfig { + /// Path to database directory pub path: String, + /// Cache capacity in bytes pub cache_capacity: Bytesize, + /// Cleanup function for expired keys #[serde(skip, default = "SledConfig::cleanup_f_default")] pub cleanup_f: CleanupFun, } @@ -265,6 +309,7 @@ impl Default for SledConfig { } impl SledConfig { + /// Converts to Sled's native configuration #[inline] pub fn to_sled_config(&self) -> Result { if self.path.trim().is_empty() { @@ -277,12 +322,14 @@ impl SledConfig { Ok(sled_cfg) } + /// Returns default cleanup function #[inline] fn cleanup_f_default() -> CleanupFun { def_cleanup } } +/// Increments a counter value stored in bytes fn _increment(old: Option<&[u8]>) -> Option> { let number = match old { Some(bytes) => { @@ -299,6 +346,7 @@ fn _increment(old: Option<&[u8]>) -> Option> { Some(number.to_be_bytes().to_vec()) } +/// Decrements a counter value stored in bytes fn _decrement(old: Option<&[u8]>) -> Option> { let number = match old { Some(bytes) => { @@ -315,6 +363,7 @@ fn _decrement(old: Option<&[u8]>) -> Option> { Some(number.to_be_bytes().to_vec()) } +/// Pattern for matching keys with wildcards #[derive(Clone)] pub struct Pattern(Arc>); @@ -338,14 +387,19 @@ impl From<&[u8]> for Pattern { } } +/// Represents a single character in a pattern #[derive(Clone)] pub enum PatternChar { + /// Literal byte Literal(u8), + /// Wildcard matching zero or more characters Wildcard, + /// Matches any single character AnyChar, } impl Pattern { + /// Parses a byte pattern into PatternChar sequence pub fn parse(pattern: &[u8]) -> Self { let mut parsed_pattern = Vec::new(); let mut chars = pattern.bytes().peekable(); @@ -375,6 +429,7 @@ impl Pattern { } } +/// Checks if text matches the given pattern fn is_match>(pattern: P, text: &[u8]) -> bool { let pattern = pattern.into(); let text_chars = text; @@ -407,7 +462,9 @@ fn is_match>(pattern: P, text: &[u8]) -> bool { dp[pattern_len][text_len] } +/// Trait for byte replacement pub trait BytesReplace { + /// Replaces all occurrences of `from` with `to` in the byte slice fn replace(self, from: &[u8], to: &[u8]) -> Vec; } @@ -429,21 +486,31 @@ impl BytesReplace for &[u8] { } } +/// Main database handle for Sled storage #[derive(Clone)] pub struct SledStorageDB { + /// Underlying sled database pub(crate) db: Arc, + /// Tree for key-value storage pub(crate) kv_tree: sled::Tree, + /// Tree for map metadata pub(crate) map_tree: sled::Tree, + /// Tree for list metadata pub(crate) list_tree: sled::Tree, + /// Tree for tracking expiration times #[allow(dead_code)] - pub(crate) expire_key_tree: sled::Tree, //(key, val) => (expire_at, key) + pub(crate) expire_key_tree: sled::Tree, + /// Tree for tracking key expiration #[allow(dead_code)] - pub(crate) key_expire_tree: sled::Tree, //(key, val) => (key, expire_at) + pub(crate) key_expire_tree: sled::Tree, + /// Channel sender for commands cmd_tx: mpsc::Sender, - active_count: Arc, //Active Command Count + /// Count of active commands + active_count: Arc, } impl SledStorageDB { + /// Creates a new SledStorageDB instance #[inline] pub(crate) async fn new(cfg: SledConfig) -> Result { let sled_cfg = cfg.to_sled_config()?; @@ -661,6 +728,7 @@ impl SledStorageDB { Ok(db) } + /// Cleans up expired keys (TTL feature) #[cfg(feature = "ttl")] #[inline] pub fn cleanup(&self, limit: usize) -> usize { @@ -755,6 +823,7 @@ impl SledStorageDB { count } + /// Cleans up expired key-value pairs (TTL feature) #[cfg(feature = "ttl")] #[inline] pub fn cleanup_kvs(&self, limit: usize) -> usize { @@ -823,6 +892,7 @@ impl SledStorageDB { count } + /// Returns the count of active commands #[inline] pub fn active_count(&self) -> isize { self.active_count.load(Ordering::Relaxed) @@ -838,6 +908,7 @@ impl SledStorageDB { // self.list_tree.len() // } + /// Creates a map prefix name #[inline] fn make_map_prefix_name(name: K) -> Key where @@ -846,6 +917,7 @@ impl SledStorageDB { [MAP_NAME_PREFIX, name.as_ref(), SEPARATOR].concat() } + /// Creates a map item prefix name #[inline] fn make_map_item_prefix_name(name: K) -> Key where @@ -854,6 +926,7 @@ impl SledStorageDB { [MAP_NAME_PREFIX, name.as_ref(), MAP_KEY_SEPARATOR].concat() } + /// Creates a map count key name #[inline] fn make_map_count_key_name(name: K) -> Key where @@ -862,16 +935,19 @@ impl SledStorageDB { [MAP_NAME_PREFIX, name.as_ref(), MAP_KEY_COUNT_SUFFIX].concat() } + /// Extracts map name from count key #[inline] fn map_count_key_to_name(key: &[u8]) -> &[u8] { key[MAP_NAME_PREFIX.len()..key.as_ref().len() - MAP_KEY_COUNT_SUFFIX.len()].as_ref() } + /// Checks if a key is a map count key #[inline] fn is_map_count_key(key: &[u8]) -> bool { key.starts_with(MAP_NAME_PREFIX) && key.ends_with(MAP_KEY_COUNT_SUFFIX) } + /// Extracts map name from item key #[allow(dead_code)] #[inline] fn map_item_key_to_name(key: &[u8]) -> Option<&[u8]> { @@ -887,6 +963,7 @@ impl SledStorageDB { None } + /// Creates a list prefix #[inline] fn make_list_prefix(name: K) -> Key where @@ -895,21 +972,25 @@ impl SledStorageDB { [LIST_NAME_PREFIX, name.as_ref()].concat() } + /// Creates a list count key #[inline] fn make_list_count_key(name: &[u8]) -> Vec { [LIST_NAME_PREFIX, name, LIST_KEY_COUNT_SUFFIX].concat() } + /// Extracts list name from count key #[inline] fn list_count_key_to_name(key: &[u8]) -> &[u8] { key[LIST_NAME_PREFIX.len()..key.as_ref().len() - LIST_KEY_COUNT_SUFFIX.len()].as_ref() } + /// Checks if a key is a list count key #[inline] fn is_list_count_key(key: &[u8]) -> bool { key.starts_with(LIST_NAME_PREFIX) && key.ends_with(LIST_KEY_COUNT_SUFFIX) } + /// Checks if a key exists for a specific key type #[inline] fn _contains_key + Sync + Send>( &self, @@ -923,23 +1004,27 @@ impl SledStorageDB { } } + /// Checks if a key exists in key-value store #[inline] fn _kv_contains_key + Sync + Send>(kv: &Tree, key: K) -> Result { Ok(kv.contains_key(key.as_ref())?) } + /// Checks if a map exists #[inline] fn _map_contains_key + Sync + Send>(tree: &Tree, key: K) -> Result { let count_key = SledStorageDB::make_map_count_key_name(key.as_ref()); Ok(tree.contains_key(count_key)?) } + /// Checks if a list exists #[inline] fn _list_contains_key + Sync + Send>(tree: &Tree, name: K) -> Result { let count_key = SledStorageDB::make_list_count_key(name.as_ref()); Ok(tree.contains_key(count_key)?) } + /// Removes a map #[inline] fn _map_remove(&self, key: K) -> Result<()> where @@ -962,6 +1047,7 @@ impl SledStorageDB { Ok(()) } + /// Removes a list #[inline] fn _list_remove(&self, key: K) -> Result<()> where @@ -988,6 +1074,7 @@ impl SledStorageDB { Ok(()) } + /// Removes a key-value pair #[inline] fn _kv_remove(&self, key: K) -> Result<()> where @@ -1008,6 +1095,7 @@ impl SledStorageDB { Ok(()) } + /// Removes expiration key (TTL feature) #[cfg(feature = "ttl")] #[inline] fn _remove_expire_key(&self, key: &[u8]) -> Result<()> { @@ -1019,6 +1107,7 @@ impl SledStorageDB { Ok(()) } + /// Transactionally removes expiration key (TTL feature) #[cfg(feature = "ttl")] #[inline] fn _tx_remove_expire_key( @@ -1034,6 +1123,7 @@ impl SledStorageDB { Ok(()) } + /// Checks if a key is expired #[inline] fn _is_expired(&self, _key: K, _contains_key_f: F) -> Result where @@ -1052,6 +1142,7 @@ impl SledStorageDB { Ok(false) } + /// Gets time-to-live for a key #[inline] fn _ttl( &self, @@ -1067,6 +1158,7 @@ impl SledStorageDB { .map(|(expire_at, at_bytes)| (expire_at - timestamp_millis(), at_bytes))) } + /// Gets expiration time for a key #[inline] fn _ttl_at( &self, @@ -1100,6 +1192,7 @@ impl SledStorageDB { Ok(ttl_res) } + /// Inserts a key-value pair #[inline] fn _insert(&self, key: &[u8], val: &[u8]) -> Result<()> { #[cfg(not(feature = "ttl"))] @@ -1117,6 +1210,7 @@ impl SledStorageDB { Ok(()) } + /// Gets a value by key #[inline] fn _get(&self, key: &[u8]) -> Result> { let res = if self._is_expired(key.as_ref(), |k| Self::_kv_contains_key(&self.kv_tree, k))? { @@ -1127,28 +1221,42 @@ impl SledStorageDB { Ok(res) } + /// Checks if a map key exists #[inline] fn _self_map_contains_key(&self, key: &[u8]) -> Result { - // let this = self; - if self._is_expired(key, |k| Self::_map_contains_key(&self.map_tree, k))? { - Ok(false) - } else { - //Self::_map_contains_key(&self.map_tree, key) - Ok(true) + #[cfg(feature = "ttl")] + { + if self._is_expired(key, |k| Self::_map_contains_key(&self.map_tree, k))? { + Ok(false) + } else { + //Self::_map_contains_key(&self.map_tree, key) + Ok(true) + } } + + #[cfg(not(feature = "ttl"))] + Self::_map_contains_key(&self.map_tree, key) } + /// Checks if a list key exists #[inline] fn _self_list_contains_key(&self, key: &[u8]) -> Result { - let this = self; - if this._is_expired(key, |k| Self::_list_contains_key(&self.list_tree, k))? { - Ok(false) - } else { - // Self::_list_contains_key(&this.list_tree, key) - Ok(true) + #[cfg(feature = "ttl")] + { + let this = self; + if this._is_expired(key, |k| Self::_list_contains_key(&self.list_tree, k))? { + Ok(false) + } else { + // Self::_list_contains_key(&this.list_tree, key) + Ok(true) + } } + + #[cfg(not(feature = "ttl"))] + Self::_list_contains_key(&self.list_tree, key) } + /// Batch insert key-value pairs #[inline] fn _batch_insert(&self, key_vals: Vec<(Key, IVec)>) -> Result<()> { if key_vals.is_empty() { @@ -1195,6 +1303,7 @@ impl SledStorageDB { Ok(()) } + /// Batch remove keys #[inline] fn _batch_remove(&self, keys: Vec) -> Result<()> { if keys.is_empty() { @@ -1235,6 +1344,7 @@ impl SledStorageDB { Ok(()) } + /// Increments a counter #[inline] fn _counter_incr(&self, key: &[u8], increment: isize) -> Result<()> { self.kv_tree.fetch_and_update(key, |old: Option<&[u8]>| { @@ -1254,6 +1364,7 @@ impl SledStorageDB { Ok(()) } + /// Decrements a counter #[inline] fn _counter_decr(&self, key: &[u8], decrement: isize) -> Result<()> { self.kv_tree.fetch_and_update(key, |old: Option<&[u8]>| { @@ -1273,6 +1384,7 @@ impl SledStorageDB { Ok(()) } + /// Gets counter value #[inline] fn _counter_get(&self, key: &[u8]) -> Result> { let this = self; @@ -1285,6 +1397,7 @@ impl SledStorageDB { } } + /// Sets counter value #[inline] fn _counter_set(&self, key: &[u8], val: isize) -> Result<()> { let val = val.to_be_bytes().to_vec(); @@ -1306,17 +1419,24 @@ impl SledStorageDB { Ok(()) } + /// Checks if a key exists in key-value store #[inline] fn _self_contains_key(&self, key: &[u8]) -> Result { - let this = self; - if this._is_expired(key, |k| Self::_kv_contains_key(&self.kv_tree, k))? { - Ok(false) - } else { - // this._contains_key(key, KeyType::KV) - Ok(true) + #[cfg(feature = "ttl")] + { + let this = self; + if this._is_expired(key, |k| Self::_kv_contains_key(&self.kv_tree, k))? { + Ok(false) + } else { + // this._contains_key(key, KeyType::KV) + Ok(true) + } } + #[cfg(not(feature = "ttl"))] + Self::_kv_contains_key(&self.kv_tree, key) } + /// Sets expiration time for a key (TTL feature) #[inline] #[cfg(feature = "ttl")] fn _expire_at(&self, key: &[u8], at: TimestampMillis, key_type: KeyType) -> Result { @@ -1332,6 +1452,7 @@ impl SledStorageDB { } } + /// Transactionally sets expiration time (TTL feature) #[inline] #[cfg(feature = "ttl")] fn _tx_expire_at( @@ -1349,6 +1470,7 @@ impl SledStorageDB { Ok(res) } + /// Gets time-to-live for a key (TTL feature) #[inline] #[cfg(feature = "ttl")] fn _self_ttl(&self, key: &[u8]) -> Result> { @@ -1357,16 +1479,19 @@ impl SledStorageDB { .and_then(|(ttl, _)| if ttl > 0 { Some(ttl) } else { None })) } + /// Creates an iterator for map prefixes #[inline] fn _map_scan_prefix(&self) -> sled::Iter { self.map_tree.scan_prefix(MAP_NAME_PREFIX) } + /// Creates an iterator for list prefixes #[inline] fn _list_scan_prefix(&self) -> sled::Iter { self.list_tree.scan_prefix(LIST_NAME_PREFIX) } + /// Creates an iterator for database scan with pattern #[inline] fn _db_scan_prefix(&self, pattern: Vec) -> sled::Iter { let mut last_esc_char = false; @@ -1404,6 +1529,7 @@ impl SledStorageDB { iter } + /// Gets number of key-value pairs #[inline] fn _kv_len(&self) -> usize { #[cfg(feature = "ttl")] @@ -1418,11 +1544,13 @@ impl SledStorageDB { self.kv_tree.len() } + /// Gets total database size #[inline] fn _db_size(&self) -> usize { self.db.len() + self.kv_tree.len() + self.map_tree.len() + self.list_tree.len() } + /// Sends a command to the background processor #[inline] async fn cmd_send(&self, cmd: Command) -> Result<()> { self.active_count.fetch_add(1, Ordering::Relaxed); @@ -1434,11 +1562,13 @@ impl SledStorageDB { } } + /// Gets a map handle #[inline] fn _map>(&self, name: N) -> SledStorageMap { SledStorageMap::_new(name.as_ref().to_vec(), self.clone()) } + /// Gets a list handle #[inline] fn _list>(&self, name: V) -> SledStorageList { SledStorageList::_new(name.as_ref().to_vec(), self.clone()) @@ -1450,6 +1580,7 @@ impl StorageDB for SledStorageDB { type MapType = SledStorageMap; type ListType = SledStorageList; + /// Creates or gets a map with optional expiration #[inline] async fn map + Sync + Send>( &self, @@ -1459,6 +1590,7 @@ impl StorageDB for SledStorageDB { SledStorageMap::new_expire(name.as_ref().to_vec(), expire, self.clone()).await } + /// Removes a map #[inline] async fn map_remove(&self, name: K) -> Result<()> where @@ -1471,6 +1603,7 @@ impl StorageDB for SledStorageDB { Ok(()) } + /// Checks if a map exists #[inline] async fn map_contains_key + Sync + Send>(&self, key: K) -> Result { let (tx, rx) = oneshot::channel(); @@ -1483,6 +1616,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await??) } + /// Creates or gets a list with optional expiration #[inline] async fn list + Sync + Send>( &self, @@ -1492,6 +1626,7 @@ impl StorageDB for SledStorageDB { SledStorageList::new_expire(name.as_ref().to_vec(), expire, self.clone()).await } + /// Removes a list #[inline] async fn list_remove(&self, name: K) -> Result<()> where @@ -1508,6 +1643,7 @@ impl StorageDB for SledStorageDB { Ok(()) } + /// Checks if a list exists #[inline] async fn list_contains_key + Sync + Send>(&self, key: K) -> Result { let (tx, rx) = oneshot::channel(); @@ -1520,6 +1656,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await??) } + /// Inserts a key-value pair #[inline] async fn insert(&self, key: K, val: &V) -> Result<()> where @@ -1539,6 +1676,7 @@ impl StorageDB for SledStorageDB { Ok(()) } + /// Gets a value by key #[inline] async fn get(&self, key: K) -> Result> where @@ -1554,6 +1692,7 @@ impl StorageDB for SledStorageDB { } } + /// Removes a key-value pair #[inline] async fn remove(&self, key: K) -> Result<()> where @@ -1566,6 +1705,7 @@ impl StorageDB for SledStorageDB { Ok(()) } + /// Batch inserts key-value pairs #[inline] async fn batch_insert(&self, key_vals: Vec<(Key, V)>) -> Result<()> where @@ -1590,6 +1730,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await??) } + /// Batch removes keys #[inline] async fn batch_remove(&self, keys: Vec) -> Result<()> { if keys.is_empty() { @@ -1602,6 +1743,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await??) } + /// Increments a counter #[inline] async fn counter_incr(&self, key: K, increment: isize) -> Result<()> where @@ -1618,6 +1760,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await??) } + /// Decrements a counter #[inline] async fn counter_decr(&self, key: K, decrement: isize) -> Result<()> where @@ -1634,6 +1777,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await??) } + /// Gets counter value #[inline] async fn counter_get(&self, key: K) -> Result> where @@ -1645,6 +1789,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await??) } + /// Sets counter value #[inline] async fn counter_set(&self, key: K, val: isize) -> Result<()> where @@ -1661,6 +1806,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await??) } + /// Checks if a key exists #[inline] async fn contains_key + Sync + Send>(&self, key: K) -> Result { let (tx, rx) = oneshot::channel(); @@ -1673,6 +1819,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await??) } + /// Gets number of key-value pairs (if enabled) #[inline] #[cfg(feature = "len")] async fn len(&self) -> Result { @@ -1681,6 +1828,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await?) } + /// Gets total database size #[inline] async fn db_size(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -1688,6 +1836,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await?) } + /// Sets expiration time for a key (TTL feature) #[inline] #[cfg(feature = "ttl")] async fn expire_at(&self, key: K, at: TimestampMillis) -> Result @@ -1705,6 +1854,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await??) } + /// Sets time-to-live for a key (TTL feature) #[inline] #[cfg(feature = "ttl")] async fn expire(&self, key: K, dur: TimestampMillis) -> Result @@ -1715,6 +1865,7 @@ impl StorageDB for SledStorageDB { self.expire_at(key, at).await } + /// Gets time-to-live for a key (TTL feature) #[inline] #[cfg(feature = "ttl")] async fn ttl(&self, key: K) -> Result> @@ -1727,6 +1878,7 @@ impl StorageDB for SledStorageDB { Ok(rx.await??) } + /// Iterates over all maps #[inline] async fn map_iter<'a>( &'a mut self, @@ -1739,6 +1891,7 @@ impl StorageDB for SledStorageDB { Ok(iter) } + /// Iterates over all lists #[inline] async fn list_iter<'a>( &'a mut self, @@ -1754,6 +1907,7 @@ impl StorageDB for SledStorageDB { Ok(iter) } + /// Scans keys matching pattern async fn scan<'a, P>( &'a mut self, pattern: P, @@ -1775,6 +1929,7 @@ impl StorageDB for SledStorageDB { Ok(iter) } + /// Gets database information #[inline] async fn info(&self) -> Result { let active_count = self.active_count.load(Ordering::Relaxed); @@ -1836,17 +1991,25 @@ impl StorageDB for SledStorageDB { } } +/// Map structure for key-value storage within a namespace #[derive(Clone)] pub struct SledStorageMap { + /// Map name name: Key, + /// Prefix for map keys map_prefix_name: Key, + /// Prefix for map items map_item_prefix_name: Key, + /// Key for map count map_count_key_name: Key, + /// Flag indicating if map is empty empty: Arc, + /// Database handle pub(crate) db: SledStorageDB, } impl SledStorageMap { + /// Creates a new map with optional expiration #[inline] async fn new_expire( name: Key, @@ -1859,6 +2022,7 @@ impl SledStorageMap { rx.await? } + /// Internal method to create map with expiration #[inline] fn _new_expire( name: Key, @@ -1874,6 +2038,7 @@ impl SledStorageMap { Ok(m) } + /// Internal method to create map #[inline] fn _new(name: Key, db: SledStorageDB) -> Self { let map_prefix_name = SledStorageDB::make_map_prefix_name(name.as_slice()); @@ -1889,22 +2054,26 @@ impl SledStorageMap { } } + /// Gets the underlying tree #[inline] fn tree(&self) -> &sled::Tree { &self.db.map_tree } + /// Creates a full item key #[inline] fn make_map_item_key>(&self, key: K) -> Key { [self.map_item_prefix_name.as_ref(), key.as_ref()].concat() } + /// Gets map length (if enabled) #[cfg(feature = "map_len")] #[inline] fn _len_get(&self) -> Result { self._counter_get(self.map_count_key_name.as_slice()) } + /// Transactionally increments a counter #[inline] fn _tx_counter_inc>( tx: &TransactionalTree, @@ -1925,6 +2094,7 @@ impl SledStorageMap { Ok(()) } + /// Transactionally decrements a counter #[inline] fn _tx_counter_dec>( tx: &TransactionalTree, @@ -1949,6 +2119,7 @@ impl SledStorageMap { Ok(()) } + /// Transactionally gets counter value #[inline] fn _tx_counter_get, E>( tx: &TransactionalTree, @@ -1969,6 +2140,7 @@ impl SledStorageMap { } } + /// Transactionally sets counter value #[inline] fn _tx_counter_set, E>( tx: &TransactionalTree, @@ -1979,6 +2151,7 @@ impl SledStorageMap { Ok(()) } + /// Transactionally removes counter #[inline] fn _tx_counter_remove, E>( tx: &TransactionalTree, @@ -1988,6 +2161,7 @@ impl SledStorageMap { Ok(()) } + /// Gets counter value #[inline] fn _counter_get>(&self, key: K) -> Result { if let Some(v) = self.tree().get(key)? { @@ -1997,6 +2171,7 @@ impl SledStorageMap { } } + /// Initializes counter if not present #[inline] fn _counter_init(&self) -> Result<()> { let tree = self.tree(); @@ -2009,6 +2184,7 @@ impl SledStorageMap { Ok(()) } + /// Clears the map #[inline] fn _clear(&self) -> Result<()> { let batch = self._make_clear_batch(); @@ -2018,6 +2194,7 @@ impl SledStorageMap { Ok(()) } + /// Transactionally clears the map #[inline] fn _tx_clear( &self, @@ -2029,6 +2206,7 @@ impl SledStorageMap { Ok(()) } + /// Creates batch for clearing map #[inline] fn _make_clear_batch(&self) -> Batch { let mut batch = Batch::default(); @@ -2046,6 +2224,7 @@ impl SledStorageMap { batch } + /// Inserts a key-value pair into the map #[inline] fn _insert(&self, key: IVec, val: IVec) -> Result<()> { let item_key = self.make_map_item_key(key.as_ref()); @@ -2093,6 +2272,7 @@ impl SledStorageMap { Ok(()) } + /// Gets a value from the map #[inline] fn _get(&self, key: IVec) -> Result> { let this = self; @@ -2107,6 +2287,7 @@ impl SledStorageMap { Ok(res) } + /// Removes a key from the map #[inline] fn _remove(&self, key: IVec) -> Result<()> { let tree = self.tree(); @@ -2132,12 +2313,14 @@ impl SledStorageMap { Ok(()) } + /// Checks if key exists in map #[inline] fn _contains_key(&self, key: IVec) -> Result { let key = self.make_map_item_key(key.as_ref()); Ok(self.tree().contains_key(key)?) } + /// Gets map length (if enabled) #[cfg(feature = "map_len")] #[inline] fn _len(&self) -> Result { @@ -2154,6 +2337,7 @@ impl SledStorageMap { Ok(len as usize) } + /// Checks if map is empty #[inline] fn _is_empty(&self) -> Result { let this = self; @@ -2172,6 +2356,7 @@ impl SledStorageMap { Ok(res) } + /// Removes and returns a value #[inline] fn _remove_and_fetch(&self, key: IVec) -> Result> { let key = self.make_map_item_key(key.as_ref()); @@ -2206,6 +2391,7 @@ impl SledStorageMap { Ok(removed) } + /// Removes keys with prefix #[inline] fn _remove_with_prefix(&self, prefix: IVec) -> Result<()> { let tree = self.tree(); @@ -2256,6 +2442,7 @@ impl SledStorageMap { Ok(()) } + /// Batch inserts key-value pairs #[inline] fn _batch_insert(&self, key_vals: Vec<(IVec, IVec)>) -> Result<()> { for (k, v) in key_vals { @@ -2264,6 +2451,7 @@ impl SledStorageMap { Ok(()) } + /// Batch removes keys #[inline] fn _batch_remove(&self, keys: Vec) -> Result<()> { for k in keys { @@ -2272,12 +2460,14 @@ impl SledStorageMap { Ok(()) } + /// Sets expiration time (TTL feature) #[cfg(feature = "ttl")] #[inline] fn _expire_at(&self, at: TimestampMillis) -> Result { self.db._expire_at(self.name.as_slice(), at, KeyType::Map) } + /// Gets time-to-live (TTL feature) #[cfg(feature = "ttl")] #[inline] fn _ttl(&self) -> Result> { @@ -2290,6 +2480,7 @@ impl SledStorageMap { Ok(res) } + /// Checks if map is expired #[inline] fn _is_expired(&self) -> Result { self.db._is_expired(self.name.as_slice(), |k| { @@ -2297,6 +2488,7 @@ impl SledStorageMap { }) } + /// Checks if map is expired (async) #[inline] async fn call_is_expired(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -2306,6 +2498,7 @@ impl SledStorageMap { rx.await? } + /// Creates prefix iterator #[inline] fn _prefix_iter(&self, prefix: Option) -> sled::Iter { if let Some(prefix) = prefix { @@ -2317,6 +2510,7 @@ impl SledStorageMap { } } + /// Creates prefix iterator (async) #[inline] async fn call_prefix_iter(&self, prefix: Option) -> Result { let (tx, rx) = oneshot::channel(); @@ -2329,11 +2523,13 @@ impl SledStorageMap { #[async_trait] impl Map for SledStorageMap { + /// Gets map name #[inline] fn name(&self) -> &[u8] { self.name.as_slice() } + /// Inserts a key-value pair #[inline] async fn insert(&self, key: K, val: &V) -> Result<()> where @@ -2354,6 +2550,7 @@ impl Map for SledStorageMap { Ok(()) } + /// Gets a value by key #[inline] async fn get(&self, key: K) -> Result> where @@ -2371,6 +2568,7 @@ impl Map for SledStorageMap { } } + /// Removes a key #[inline] async fn remove(&self, key: K) -> Result<()> where @@ -2384,6 +2582,7 @@ impl Map for SledStorageMap { Ok(()) } + /// Checks if key exists #[inline] async fn contains_key + Sync + Send>(&self, key: K) -> Result { let (tx, rx) = oneshot::channel(); @@ -2397,6 +2596,7 @@ impl Map for SledStorageMap { Ok(rx.await??) } + /// Gets map length (if enabled) #[cfg(feature = "map_len")] #[inline] async fn len(&self) -> Result { @@ -2405,6 +2605,7 @@ impl Map for SledStorageMap { Ok(rx.await??) } + /// Checks if map is empty #[inline] async fn is_empty(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -2414,6 +2615,7 @@ impl Map for SledStorageMap { Ok(rx.await??) } + /// Clears the map #[inline] async fn clear(&self) -> Result<()> { let (tx, rx) = oneshot::channel(); @@ -2424,6 +2626,7 @@ impl Map for SledStorageMap { Ok(()) } + /// Removes and returns a value #[inline] async fn remove_and_fetch(&self, key: K) -> Result> where @@ -2445,6 +2648,7 @@ impl Map for SledStorageMap { } } + /// Removes keys with prefix #[inline] async fn remove_with_prefix(&self, prefix: K) -> Result<()> where @@ -2462,6 +2666,7 @@ impl Map for SledStorageMap { Ok(()) } + /// Batch inserts key-value pairs #[inline] async fn batch_insert(&self, key_vals: Vec<(Key, V)>) -> Result<()> where @@ -2484,6 +2689,7 @@ impl Map for SledStorageMap { Ok(()) } + /// Batch removes keys #[inline] async fn batch_remove(&self, keys: Vec) -> Result<()> { let keys = keys.into_iter().map(|k| k.into()).collect::>(); @@ -2496,6 +2702,7 @@ impl Map for SledStorageMap { Ok(()) } + /// Iterates over map items #[inline] async fn iter<'a, V>( &'a mut self, @@ -2526,6 +2733,7 @@ impl Map for SledStorageMap { Ok(res) } + /// Iterates over map keys #[inline] async fn key_iter<'a>( &'a mut self, @@ -2552,6 +2760,7 @@ impl Map for SledStorageMap { Ok(res) } + /// Iterates over items with prefix #[inline] async fn prefix_iter<'a, P, V>( &'a mut self, @@ -2585,6 +2794,7 @@ impl Map for SledStorageMap { Ok(res) } + /// Sets expiration time (TTL feature) #[cfg(feature = "ttl")] async fn expire_at(&self, at: TimestampMillis) -> Result { let (tx, rx) = oneshot::channel(); @@ -2594,12 +2804,14 @@ impl Map for SledStorageMap { Ok(rx.await??) } + /// Sets time-to-live (TTL feature) #[cfg(feature = "ttl")] async fn expire(&self, dur: TimestampMillis) -> Result { let at = timestamp_millis() + dur; self.expire_at(at).await } + /// Gets time-to-live (TTL feature) #[cfg(feature = "ttl")] async fn ttl(&self) -> Result> { let (tx, rx) = oneshot::channel(); @@ -2608,14 +2820,19 @@ impl Map for SledStorageMap { } } +/// List structure for queue-like storage within a namespace #[derive(Clone)] pub struct SledStorageList { + /// List name name: Key, + /// Prefix for list keys prefix_name: Key, + /// Database handle pub(crate) db: SledStorageDB, } impl SledStorageList { + /// Creates a new list with optional expiration #[inline] async fn new_expire( name: Key, @@ -2628,6 +2845,7 @@ impl SledStorageList { rx.await? } + /// Internal method to create list with expiration #[inline] fn _new_expire( name: Key, @@ -2642,6 +2860,7 @@ impl SledStorageList { Ok(l) } + /// Internal method to create list #[inline] fn _new(name: Key, db: SledStorageDB) -> Self { let prefix_name = SledStorageDB::make_list_prefix(name.as_slice()); @@ -2652,22 +2871,26 @@ impl SledStorageList { } } + /// Gets list name #[inline] pub(crate) fn name(&self) -> &[u8] { self.name.as_slice() } + /// Gets the underlying tree #[inline] pub(crate) fn tree(&self) -> &sled::Tree { &self.db.list_tree } + /// Creates list count key #[inline] fn make_list_count_key(&self) -> Vec { let list_count_key = [self.prefix_name.as_ref(), LIST_KEY_COUNT_SUFFIX].concat(); list_count_key } + /// Creates list content prefix #[inline] fn make_list_content_prefix(prefix_name: &[u8], idx: Option<&[u8]>) -> Vec { if let Some(idx) = idx { @@ -2677,6 +2900,7 @@ impl SledStorageList { } } + /// Creates list content key #[inline] fn make_list_content_key(&self, idx: usize) -> Vec { Self::make_list_content_prefix( @@ -2685,6 +2909,7 @@ impl SledStorageList { ) } + /// Creates batch of list content keys #[inline] fn make_list_content_keys(&self, start: usize, end: usize) -> Vec> { (start..end) @@ -2692,6 +2917,7 @@ impl SledStorageList { .collect() } + /// Transactionally gets list count #[inline] fn tx_list_count_get( tx: &TransactionalTree, @@ -2713,6 +2939,7 @@ impl SledStorageList { } } + /// Transactionally sets list count #[inline] fn tx_list_count_set( tx: &TransactionalTree, @@ -2733,6 +2960,7 @@ impl SledStorageList { Ok(()) } + /// Transactionally sets list content #[inline] fn tx_list_content_set( tx: &TransactionalTree, @@ -2747,6 +2975,7 @@ impl SledStorageList { Ok(()) } + /// Transactionally sets batch list content #[inline] fn tx_list_content_batch_set( tx: &TransactionalTree, @@ -2764,6 +2993,7 @@ impl SledStorageList { Ok(()) } + /// Clears the list #[inline] fn _clear(&self) -> Result<()> { let mut batch = Batch::default(); @@ -2789,6 +3019,7 @@ impl SledStorageList { Ok(()) } + /// Transactionally clears the list #[inline] fn _tx_clear( list_tree_tx: &TransactionalTree, @@ -2798,6 +3029,7 @@ impl SledStorageList { Ok(()) } + /// Creates batch for clearing list #[inline] fn _make_clear_batch(&self) -> Batch { let mut batch = Batch::default(); @@ -2817,6 +3049,7 @@ impl SledStorageList { batch } + /// Pushes value to list #[inline] fn _push(&self, data: IVec) -> Result<()> { let this = self; @@ -2856,6 +3089,7 @@ impl SledStorageList { Ok(()) } + /// Pushes multiple values to list #[inline] fn _pushs(&self, vals: Vec) -> Result<()> { if vals.is_empty() { @@ -2905,6 +3139,7 @@ impl SledStorageList { Ok(()) } + /// Pushes value with limit #[inline] fn _push_limit( &self, @@ -2974,6 +3209,7 @@ impl SledStorageList { Ok(removed) } + /// Pops value from list #[inline] fn _pop(&self) -> Result> { let this = self; @@ -3004,6 +3240,7 @@ impl SledStorageList { Ok(removed) } + /// Gets all values in list #[inline] fn _all(&self) -> Result> { let this = self; @@ -3025,6 +3262,7 @@ impl SledStorageList { Ok(res) } + /// Gets value by index #[inline] fn _get_index(&self, idx: usize) -> Result> { let this = self; @@ -3056,6 +3294,7 @@ impl SledStorageList { Ok(res) } + /// Gets list length #[inline] fn _len(&self) -> Result { let this = self; @@ -3077,6 +3316,7 @@ impl SledStorageList { Ok(res) } + /// Checks if list is empty #[inline] fn _is_empty(&self) -> Result { let this = self; @@ -3099,12 +3339,14 @@ impl SledStorageList { Ok(res) } + /// Sets expiration time (TTL feature) #[cfg(feature = "ttl")] #[inline] fn _expire_at(&self, at: TimestampMillis) -> Result { self.db._expire_at(self.name.as_slice(), at, KeyType::List) } + /// Gets time-to-live (TTL feature) #[cfg(feature = "ttl")] #[inline] fn _ttl(&self) -> Result> { @@ -3116,6 +3358,7 @@ impl SledStorageList { .and_then(|(at, _)| if at > 0 { Some(at) } else { None })) } + /// Checks if list is expired #[inline] fn _is_expired(&self) -> Result { self.db._is_expired(self.name.as_slice(), |k| { @@ -3123,6 +3366,7 @@ impl SledStorageList { }) } + /// Checks if list is expired (async) #[inline] async fn call_is_expired(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -3132,12 +3376,14 @@ impl SledStorageList { rx.await? } + /// Creates prefix iterator #[inline] fn _prefix_iter(&self) -> sled::Iter { let list_content_prefix = Self::make_list_content_prefix(self.prefix_name.as_slice(), None); self.tree().scan_prefix(list_content_prefix) } + /// Creates prefix iterator (async) #[inline] async fn call_prefix_iter(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -3150,11 +3396,13 @@ impl SledStorageList { #[async_trait] impl List for SledStorageList { + /// Gets list name #[inline] fn name(&self) -> &[u8] { self.name.as_slice() } + /// Pushes value to list #[inline] async fn push(&self, val: &V) -> Result<()> where @@ -3169,6 +3417,7 @@ impl List for SledStorageList { Ok(()) } + /// Pushes multiple values to list #[inline] async fn pushs(&self, vals: Vec) -> Result<()> where @@ -3195,6 +3444,7 @@ impl List for SledStorageList { Ok(()) } + /// Pushes value with limit #[inline] async fn push_limit( &self, @@ -3230,6 +3480,7 @@ impl List for SledStorageList { Ok(removed) } + /// Pops value from list #[inline] async fn pop(&self) -> Result> where @@ -3249,6 +3500,7 @@ impl List for SledStorageList { Ok(removed) } + /// Gets all values in list #[inline] async fn all(&self) -> Result> where @@ -3263,6 +3515,7 @@ impl List for SledStorageList { .collect::>>() } + /// Gets value by index #[inline] async fn get_index(&self, idx: usize) -> Result> where @@ -3280,6 +3533,7 @@ impl List for SledStorageList { }) } + /// Gets list length #[inline] async fn len(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -3287,6 +3541,7 @@ impl List for SledStorageList { Ok(rx.await??) } + /// Checks if list is empty #[inline] async fn is_empty(&self) -> Result { let (tx, rx) = oneshot::channel(); @@ -3296,6 +3551,7 @@ impl List for SledStorageList { Ok(rx.await??) } + /// Clears the list #[inline] async fn clear(&self) -> Result<()> { let (tx, rx) = oneshot::channel(); @@ -3305,6 +3561,7 @@ impl List for SledStorageList { Ok(rx.await??) } + /// Iterates over list values #[inline] async fn iter<'a, V>( &'a mut self, @@ -3334,6 +3591,7 @@ impl List for SledStorageList { Ok(res) } + /// Sets expiration time (TTL feature) #[cfg(feature = "ttl")] async fn expire_at(&self, at: TimestampMillis) -> Result { let (tx, rx) = oneshot::channel(); @@ -3343,12 +3601,14 @@ impl List for SledStorageList { Ok(rx.await??) } + /// Sets time-to-live (TTL feature) #[cfg(feature = "ttl")] async fn expire(&self, dur: TimestampMillis) -> Result { let at = timestamp_millis() + dur; self.expire_at(at).await } + /// Gets time-to-live (TTL feature) #[cfg(feature = "ttl")] async fn ttl(&self) -> Result> { let (tx, rx) = oneshot::channel(); @@ -3357,6 +3617,7 @@ impl List for SledStorageList { } } +/// Async iterator for map items pub struct AsyncIter<'a, V> { db: &'a SledStorageDB, prefix_len: usize, @@ -3364,14 +3625,14 @@ pub struct AsyncIter<'a, V> { _m: std::marker::PhantomData, } -impl<'a, V> Debug for AsyncIter<'a, V> { +impl Debug for AsyncIter<'_, V> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("AsyncIter .. ").finish() } } #[async_trait] -impl<'a, V> AsyncIterator for AsyncIter<'a, V> +impl AsyncIterator for AsyncIter<'_, V> where V: DeserializeOwned + Sync + Send + 'static, { @@ -3410,20 +3671,21 @@ where } } +/// Async iterator for map keys pub struct AsyncKeyIter<'a> { db: &'a SledStorageDB, prefix_len: usize, iter: Option, } -impl<'a> Debug for AsyncKeyIter<'a> { +impl Debug for AsyncKeyIter<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("AsyncKeyIter .. ").finish() } } #[async_trait] -impl<'a> AsyncIterator for AsyncKeyIter<'a> { +impl AsyncIterator for AsyncKeyIter<'_> { type Item = Result; async fn next(&mut self) -> Option { @@ -3454,20 +3716,21 @@ impl<'a> AsyncIterator for AsyncKeyIter<'a> { } } +/// Async iterator for list values pub struct AsyncListValIter<'a, V> { db: &'a SledStorageDB, iter: Option, _m: std::marker::PhantomData, } -impl<'a, V> Debug for AsyncListValIter<'a, V> { +impl Debug for AsyncListValIter<'_, V> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("AsyncListValIter .. ").finish() } } #[async_trait] -impl<'a, V> AsyncIterator for AsyncListValIter<'a, V> +impl AsyncIterator for AsyncListValIter<'_, V> where V: DeserializeOwned + Sync + Send + 'static, { @@ -3500,6 +3763,7 @@ where } } +/// Empty iterator pub struct AsyncEmptyIter { _m: std::marker::PhantomData, } @@ -3522,6 +3786,7 @@ where } } +/// Async iterator for maps pub struct AsyncMapIter<'a> { db: &'a SledStorageDB, iter: Option, @@ -3536,14 +3801,14 @@ impl<'a> AsyncMapIter<'a> { } } -impl<'a> Debug for AsyncMapIter<'a> { +impl Debug for AsyncMapIter<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("AsyncMapIter .. ").finish() } } #[async_trait] -impl<'a> AsyncIterator for AsyncMapIter<'a> { +impl AsyncIterator for AsyncMapIter<'_> { type Item = Result; async fn next(&mut self) -> Option { @@ -3579,19 +3844,20 @@ impl<'a> AsyncIterator for AsyncMapIter<'a> { } } +/// Async iterator for lists pub struct AsyncListIter<'a> { db: &'a SledStorageDB, iter: Option, } -impl<'a> Debug for AsyncListIter<'a> { +impl Debug for AsyncListIter<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("AsyncListIter .. ").finish() } } #[async_trait] -impl<'a> AsyncIterator for AsyncListIter<'a> { +impl AsyncIterator for AsyncListIter<'_> { type Item = Result; async fn next(&mut self) -> Option { @@ -3626,20 +3892,21 @@ impl<'a> AsyncIterator for AsyncListIter<'a> { } } +/// Async iterator for database keys with pattern matching pub struct AsyncDbKeyIter<'a> { db: &'a SledStorageDB, pattern: Pattern, iter: Option, } -impl<'a> Debug for AsyncDbKeyIter<'a> { +impl Debug for AsyncDbKeyIter<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("AsyncDbKeyIter .. ").finish() } } #[async_trait] -impl<'a> AsyncIterator for AsyncDbKeyIter<'a> { +impl AsyncIterator for AsyncDbKeyIter<'_> { type Item = Result; async fn next(&mut self) -> Option {