From 07fa73a6df676d105b9a44778a85bc79068fa74a Mon Sep 17 00:00:00 2001 From: bittcrafter Date: Tue, 29 Oct 2024 09:20:41 +0800 Subject: [PATCH 1/3] Upgrade deps to redis = "0.27", dashmap = "6.1" --- Cargo.toml | 8 +-- src/storage_redis.rs | 113 ++++++++++++++++++++++--------------------- 2 files changed, 61 insertions(+), 60 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index efd5412..bea5c71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmqtt-storage" -version = "0.5.1" +version = "0.5.2" authors = ["rmqtt "] edition = "2021" license = "MIT OR Apache-2.0" @@ -8,7 +8,7 @@ repository = "https://github.com/rmqtt/rmqtt-storage" homepage = "https://github.com/rmqtt/rmqtt-storage" description = "rmqtt-storage - Is a simple wrapper around some key-value storages" keywords = ["storage", "async"] -categories = ["Database interfaces"] +categories = ["database"] exclude = ["examples", ".gitignore", ".cargo/config"] @@ -23,7 +23,7 @@ len = [] [dependencies] sled = "0.34" tokio = { version = "1", features = ["sync", "rt"] } -redis = { version = "0.24", features = [ "tokio-comp", "connection-manager" ] } +redis = { version = "0.27", features = [ "tokio-comp", "connection-manager" ] } futures = "0.3" serde = { version = "1.0", features = ["derive"] } @@ -33,7 +33,7 @@ async-trait = "0.1" bincode = "1.3" log = "0.4" chrono = { version = "0.4", default-features = false, features = ["clock"] } -dashmap = "5.5" +dashmap = "6.1" ahash = "0.8" convert = { package = "box-convert", version = "0.1", features = ["bytesize"] } diff --git a/src/storage_redis.rs b/src/storage_redis.rs index d4292d4..6c4d5d2 100644 --- a/src/storage_redis.rs +++ b/src/storage_redis.rs @@ -1,17 +1,17 @@ use anyhow::anyhow; use async_trait::async_trait; -use redis::aio::ConnectionManager; +use redis::aio::{ConnectionManager, ConnectionManagerConfig}; use redis::{pipe, AsyncCommands}; use serde::de::DeserializeOwned; use serde::Serialize; use serde_json::Value; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::Duration; use crate::storage::{AsyncIterator, IterItem, Key, List, Map, StorageDB}; use crate::{Result, StorageList, StorageMap}; -#[allow(unused_imports)] use crate::{timestamp_millis, TimestampMillis}; const SEPARATOR: &[u8] = b"@"; @@ -54,15 +54,13 @@ impl RedisStorageDB { return Err(anyhow!(e)); } }; - let async_conn = match client - .get_connection_manager_with_backoff( - 2, 100, - 2, - // Duration::from_secs(5), - // Duration::from_secs(8), - ) - .await - { + let mgr_cfg = ConnectionManagerConfig::default() + .set_exponent_base(100) + .set_factor(2) + .set_number_of_retries(2) + .set_connection_timeout(Duration::from_secs(15)) + .set_response_timeout(Duration::from_secs(10)); + let async_conn = match client.get_connection_manager_with_config(mgr_cfg).await { Ok(conn) => conn, Err(e) => { log::error!("get redis connection error, config is {:?}, {:?}", cfg, e); @@ -194,10 +192,11 @@ impl RedisStorageDB { .atomic() .set(full_key.as_slice(), bincode::serialize(val)?) .pexpire(full_key.as_slice(), expire_interval) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } else { - self.async_conn() + let _: () = self + .async_conn() .set(full_key, bincode::serialize(val)?) .await?; } @@ -212,14 +211,14 @@ impl RedisStorageDB { .set(full_key.as_slice(), bincode::serialize(val)?) .pexpire(full_key.as_slice(), expire_interval) .zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } else { pipe() .atomic() .set(full_key.as_slice(), bincode::serialize(val)?) .zadd(db_zkey, key.as_ref(), i64::MAX) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } } @@ -248,7 +247,7 @@ impl RedisStorageDB { rpipe = rpipe.expire(k, at); } } - rpipe.query_async::<_, ()>(&mut async_conn).await?; + rpipe.query_async::<()>(&mut async_conn).await?; } #[cfg(feature = "len")] @@ -279,7 +278,7 @@ impl RedisStorageDB { rpipe = rpipe.expire(k, at); } } - rpipe.query_async::<_, ((), ())>(&mut async_conn).await?; + rpipe.query_async::<((), ())>(&mut async_conn).await?; } Ok(()) } @@ -292,7 +291,7 @@ impl RedisStorageDB { .collect::>(); #[cfg(not(feature = "len"))] { - self.async_conn().del(full_keys).await?; + let _: () = self.async_conn().del(full_keys).await?; } #[cfg(feature = "len")] { @@ -302,7 +301,7 @@ impl RedisStorageDB { .atomic() .del(full_keys.as_slice()) .zrem(db_zkey, keys) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } Ok(()) @@ -327,10 +326,10 @@ impl RedisStorageDB { .atomic() .incr(full_key.as_slice(), increment) .pexpire(full_key.as_slice(), expire_interval) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } else { - self.async_conn().incr(full_key, increment).await?; + let _: () = self.async_conn().incr(full_key, increment).await?; } } #[cfg(feature = "len")] @@ -343,14 +342,14 @@ impl RedisStorageDB { .incr(full_key.as_slice(), increment) .pexpire(full_key.as_slice(), expire_interval) .zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } else { pipe() .atomic() .incr(full_key.as_slice(), increment) .zadd(db_zkey, key.as_ref(), i64::MAX) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } } @@ -377,10 +376,10 @@ impl RedisStorageDB { .atomic() .decr(full_key.as_slice(), decrement) .pexpire(full_key.as_slice(), expire_interval) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } else { - self.async_conn().decr(full_key, decrement).await?; + let _: () = self.async_conn().decr(full_key, decrement).await?; } } #[cfg(feature = "len")] @@ -393,14 +392,14 @@ impl RedisStorageDB { .decr(full_key.as_slice(), decrement) .pexpire(full_key.as_slice(), expire_interval) .zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } else { pipe() .atomic() .decr(full_key.as_slice(), decrement) .zadd(db_zkey, key.as_ref(), i64::MAX) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } } @@ -426,10 +425,10 @@ impl RedisStorageDB { .atomic() .set(full_key.as_slice(), val) .pexpire(full_key.as_slice(), expire_interval) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } else { - self.async_conn().set(full_key, val).await?; + let _: () = self.async_conn().set(full_key, val).await?; } } #[cfg(feature = "len")] @@ -442,14 +441,14 @@ impl RedisStorageDB { .set(full_key.as_slice(), val) .pexpire(full_key.as_slice(), expire_interval) .zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } else { pipe() .atomic() .set(full_key.as_slice(), val) .zadd(db_zkey, key.as_ref(), i64::MAX) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } } @@ -466,7 +465,7 @@ impl RedisStorageDB { #[cfg(not(feature = "len"))] { - self.async_conn().del(full_key).await?; + let _: () = self.async_conn().del(full_key).await?; } #[cfg(feature = "len")] { @@ -476,7 +475,7 @@ impl RedisStorageDB { .atomic() .del(full_key.as_slice()) .zrem(db_zkey, key.as_ref()) - .query_async::<_, ()>(&mut async_conn) + .query_async::<()>(&mut async_conn) .await?; } Ok(()) @@ -507,7 +506,7 @@ impl StorageDB for RedisStorageDB { K: AsRef<[u8]> + Sync + Send, { let map_full_name = self.make_map_full_name(name.as_ref()); - self.async_conn().del(map_full_name).await?; + let _: () = self.async_conn().del(map_full_name).await?; Ok(()) } @@ -536,7 +535,7 @@ impl StorageDB for RedisStorageDB { K: AsRef<[u8]> + Sync + Send, { let list_full_name = self.make_list_full_name(name.as_ref()); - self.async_conn().del(list_full_name).await?; + let _: () = self.async_conn().del(list_full_name).await?; Ok(()) } @@ -656,7 +655,7 @@ impl StorageDB for RedisStorageDB { let (_, count) = pipe() .zrembyscore(db_zkey.as_slice(), 0, timestamp_millis()) .zcard(db_zkey.as_slice()) - .query_async::<_, (i64, usize)>(&mut async_conn) + .query_async::<(i64, usize)>(&mut async_conn) .await?; Ok(count) } @@ -667,7 +666,7 @@ impl StorageDB for RedisStorageDB { //DBSIZE let dbsize = redis::pipe() .cmd("DBSIZE") - .query_async::<_, redis::Value>(&mut async_conn) + .query_async::(&mut async_conn) .await?; let dbsize = dbsize.as_sequence().and_then(|vs| { vs.iter().next().and_then(|v| { @@ -704,7 +703,7 @@ impl StorageDB for RedisStorageDB { .atomic() .zadd(db_zkey, key.as_ref(), at) .pexpire_at(full_name.as_slice(), at) - .query_async::<_, (i64, bool)>(&mut async_conn) + .query_async::<(i64, bool)>(&mut async_conn) .await?; Ok(res) } @@ -731,7 +730,7 @@ impl StorageDB for RedisStorageDB { .atomic() .zadd(db_zkey, key.as_ref(), timestamp_millis() + dur) .pexpire(full_name.as_slice(), dur) - .query_async::<_, (i64, bool)>(&mut async_conn) + .query_async::<(i64, bool)>(&mut async_conn) .await?; Ok(res) } @@ -801,7 +800,7 @@ impl StorageDB for RedisStorageDB { let mut conn = self.async_conn(); let dbsize = redis::pipe() .cmd("dbsize") - .query_async::<_, redis::Value>(&mut conn) + .query_async::(&mut conn) .await?; let dbsize = dbsize.as_sequence().and_then(|vs| { vs.iter().next().and_then(|v| { @@ -895,7 +894,7 @@ impl RedisStorageMap { if let Some(expire) = self.expire.as_ref() { //HSET key field value //PEXPIRE key ms - redis::pipe() + let _: () = redis::pipe() .atomic() .hset(name, key, val) .pexpire(name, *expire) @@ -907,7 +906,7 @@ impl RedisStorageMap { } //HSET key field value - async_conn.hset(name, key.as_ref(), val).await?; + let _: () = async_conn.hset(name, key.as_ref(), val).await?; Ok(()) } @@ -921,7 +920,7 @@ impl RedisStorageMap { if let Some(expire) = self.expire.as_ref() { //HMSET key field value //PEXPIRE key ms - redis::pipe() + let _: () = redis::pipe() .atomic() .hset_multiple(name, key_vals.as_slice()) .pexpire(name, *expire) @@ -934,7 +933,7 @@ impl RedisStorageMap { } //HSET key field value - async_conn.hset_multiple(name, key_vals.as_slice()).await?; + let _: () = async_conn.hset_multiple(name, key_vals.as_slice()).await?; Ok(()) } } @@ -980,7 +979,8 @@ impl Map for RedisStorageMap { K: AsRef<[u8]> + Sync + Send, { //HDEL key field [field ...] - self.async_conn() + let _: () = self + .async_conn() .hdel(self.full_name.as_slice(), key.as_ref()) .await?; Ok(()) @@ -1019,7 +1019,7 @@ impl Map for RedisStorageMap { #[inline] async fn clear(&self) -> Result<()> { //DEL key [key ...] - self.async_conn().del(self.full_name.as_slice()).await?; + let _: () = self.async_conn().del(self.full_name.as_slice()).await?; self.empty.store(true, Ordering::SeqCst); Ok(()) } @@ -1067,12 +1067,12 @@ impl Map for RedisStorageMap { { removeds.push(key); if removeds.len() > 20 { - conn2.hdel(name, removeds.as_slice()).await?; + let _: () = conn2.hdel(name, removeds.as_slice()).await?; removeds.clear(); } } if !removeds.is_empty() { - conn.hdel(name, removeds).await?; + let _: () = conn.hdel(name, removeds).await?; } Ok(()) } @@ -1100,7 +1100,8 @@ impl Map for RedisStorageMap { #[inline] async fn batch_remove(&self, keys: Vec) -> Result<()> { if !keys.is_empty() { - self.async_conn() + let _: () = self + .async_conn() .hdel(self.full_name.as_slice(), keys) .await?; } @@ -1257,7 +1258,7 @@ impl RedisStorageList { if let Some(expire) = self.expire.as_ref() { //RPUSH key value [value ...] //PEXPIRE key ms - redis::pipe() + let _: () = redis::pipe() .atomic() .rpush(name, val) .pexpire(name, *expire) @@ -1269,7 +1270,7 @@ impl RedisStorageList { } //RPUSH key value [value ...] - async_conn.rpush(name, val).await?; + let _: () = async_conn.rpush(name, val).await?; Ok(()) } @@ -1283,7 +1284,7 @@ impl RedisStorageList { let name = self.full_name.as_slice(); //RPUSH key value [value ...] //PEXPIRE key ms - redis::pipe() + let _: () = redis::pipe() .atomic() .rpush(name, vals) .pexpire(name, *expire) @@ -1295,7 +1296,7 @@ impl RedisStorageList { } //RPUSH key value [value ...] - async_conn.rpush(self.full_name.as_slice(), vals).await?; + let _: () = async_conn.rpush(self.full_name.as_slice(), vals).await?; Ok(()) } @@ -1314,7 +1315,7 @@ impl RedisStorageList { let name = self.full_name.as_slice(); let count = conn.llen::<_, usize>(name).await?; let res = if count < limit { - redis::pipe() + let _: () = redis::pipe() .atomic() .rpush(name, val) .pexpire(name, *expire) @@ -1355,7 +1356,7 @@ impl RedisStorageList { let count = async_conn.llen::<_, usize>(name).await?; if count < limit { - async_conn.rpush(name, val).await?; + let _: () = async_conn.rpush(name, val).await?; Ok(None) } else if pop_front_if_limited { let (poped, _): (Option>, Option<()>) = redis::pipe() @@ -1490,7 +1491,7 @@ impl List for RedisStorageList { #[inline] async fn clear(&self) -> Result<()> { - self.async_conn().del(self.full_name.as_slice()).await?; + let _: () = self.async_conn().del(self.full_name.as_slice()).await?; self.empty.store(true, Ordering::SeqCst); Ok(()) } From 92cacbc905131b5eefa4b4d8c4d5f474d025fed9 Mon Sep 17 00:00:00 2001 From: bittcrafter Date: Sat, 28 Jun 2025 17:50:21 +0800 Subject: [PATCH 2/3] fix(storage_sled): improve TTL feature handling in contains_key methods - Added proper conditional compilation for TTL feature in: * `_self_map_contains_key` * `_self_list_contains_key` * `_self_contains_key` - Without TTL feature, now correctly calls underlying storage methods: * `_map_contains_key` for map storage * `_list_contains_key` for list storage * `_kv_contains_key` for key-value storage - Fixes potential false negatives in key existence checks when TTL is disabled - Maintains expiration checking behavior when TTL feature is enabled - More reliable storage operations in both configurations --- src/storage_sled.rs | 57 +++++++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/src/storage_sled.rs b/src/storage_sled.rs index 8a6f103..bc84823 100644 --- a/src/storage_sled.rs +++ b/src/storage_sled.rs @@ -1129,24 +1129,37 @@ impl SledStorageDB { #[inline] fn _self_map_contains_key(&self, key: &[u8]) -> Result { - // let this = self; - if self._is_expired(key, |k| Self::_map_contains_key(&self.map_tree, k))? { - Ok(false) - } else { - //Self::_map_contains_key(&self.map_tree, key) - Ok(true) + #[cfg(feature = "ttl")] + { + if self._is_expired(key, |k| Self::_map_contains_key(&self.map_tree, k))? { + Ok(false) + } else { + //Self::_map_contains_key(&self.map_tree, key) + Ok(true) + } + } + + #[cfg(not(feature = "ttl"))] + { + Self::_map_contains_key(&self.map_tree, key) } } #[inline] fn _self_list_contains_key(&self, key: &[u8]) -> Result { - let this = self; - if this._is_expired(key, |k| Self::_list_contains_key(&self.list_tree, k))? { - Ok(false) - } else { - // Self::_list_contains_key(&this.list_tree, key) - Ok(true) + #[cfg(feature = "ttl")] + { + let this = self; + if this._is_expired(key, |k| Self::_list_contains_key(&self.list_tree, k))? { + Ok(false) + } else { + // Self::_list_contains_key(&this.list_tree, key) + Ok(true) + } } + + #[cfg(not(feature = "ttl"))] + Self::_list_contains_key(&self.list_tree, key) } #[inline] @@ -1308,12 +1321,20 @@ impl SledStorageDB { #[inline] fn _self_contains_key(&self, key: &[u8]) -> Result { - let this = self; - if this._is_expired(key, |k| Self::_kv_contains_key(&self.kv_tree, k))? { - Ok(false) - } else { - // this._contains_key(key, KeyType::KV) - Ok(true) + #[cfg(feature = "ttl")] + { + let this = self; + if this._is_expired(key, |k| Self::_kv_contains_key(&self.kv_tree, k))? { + Ok(false) + } else { + // this._contains_key(key, KeyType::KV) + Ok(true) + } + } + + #[cfg(not(feature = "ttl"))] + { + Self::_kv_contains_key(&self.kv_tree, key) } } From 070cfd3f1f2318b164a88589119991118c787141 Mon Sep 17 00:00:00 2001 From: bittcrafter Date: Sat, 28 Jun 2025 18:01:34 +0800 Subject: [PATCH 3/3] chore(storage): bump version from 0.5.2 to 0.5.3 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index bea5c71..fcc1c61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rmqtt-storage" -version = "0.5.2" +version = "0.5.3" authors = ["rmqtt "] edition = "2021" license = "MIT OR Apache-2.0"