Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove subscription API from store API #1346

Merged
merged 1 commit into from
Sep 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion nativelink-store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ rust_test_suite(
"tests/completeness_checking_store_test.rs",
"tests/compression_store_test.rs",
"tests/dedup_store_test.rs",
"tests/default_store_key_subscribe_test.rs",
"tests/existence_store_test.rs",
"tests/fast_slow_store_test.rs",
"tests/filesystem_store_test.rs",
Expand Down
145 changes: 4 additions & 141 deletions nativelink-store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Bound;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::sync::Arc;
use std::time::SystemTime;

use async_trait::async_trait;
use bytes::{Bytes, BytesMut};
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_error::{Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
use nativelink_util::evicting_map::{EvictingMap, LenEntry};
use nativelink_util::health_utils::{default_health_status_indicator, HealthStatusIndicator};
use nativelink_util::store_trait::{
StoreDriver, StoreKey, StoreOptimizations, StoreSubscription, StoreSubscriptionItem,
UploadSizeInfo,
};
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use tokio::sync::watch;
use tracing::{event, Level};
use nativelink_util::store_trait::{StoreDriver, StoreKey, UploadSizeInfo};

use crate::cas_utils::is_zero_digest;

Expand All @@ -57,92 +50,18 @@ impl LenEntry for BytesWrapper {
}
}

/// A subscription to a key in the MemoryStore.
struct MemoryStoreSubscription {
store: Weak<MemoryStore>,
key: StoreKey<'static>,
receiver: Option<watch::Receiver<Result<Arc<dyn StoreSubscriptionItem>, Error>>>,
}

#[async_trait]
impl StoreSubscription for MemoryStoreSubscription {
fn peek(&self) -> Result<Arc<dyn StoreSubscriptionItem>, Error> {
self.receiver.as_ref().unwrap().borrow().clone()
}

async fn changed(&mut self) -> Result<Arc<dyn StoreSubscriptionItem>, Error> {
self.receiver
.as_mut()
.unwrap()
.changed()
.await
.map_err(|e| {
make_err!(
Code::Internal,
"Sender dropped in DefaultStoreSubscription::changed - {e:?}"
)
})?;
self.receiver.as_ref().unwrap().borrow().clone()
}
}

/// When the subscription is dropped, we need to remove the subscription from the store
/// to prevent memory leaks.
impl Drop for MemoryStoreSubscription {
fn drop(&mut self) {
// Make sure we manually drop receiver first.
self.receiver = None;
let Some(store) = self.store.upgrade() else {
return;
};
store.remove_dropped_subscription(self.key.borrow().into_owned());
}
}

struct MemoryStoreSubscriptionItem {
store: Weak<MemoryStore>,
key: StoreKey<'static>,
}

#[async_trait]
impl StoreSubscriptionItem for MemoryStoreSubscriptionItem {
async fn get_key<'a>(&'a self) -> Result<StoreKey<'a>, Error> {
Ok(self.key.borrow())
}

async fn get_part(
&self,
writer: &mut DropCloserWriteHalf,
offset: usize,
length: Option<usize>,
) -> Result<(), Error> {
let store = self
.store
.upgrade()
.err_tip(|| "Store dropped in MemoryStoreSubscriptionItem::get_part")?;
Pin::new(store.as_ref())
.get_part(self.key.borrow(), writer, offset, length)
.await
}
}

type SubscriptionSender = watch::Sender<Result<Arc<dyn StoreSubscriptionItem>, Error>>;
#[derive(MetricsComponent)]
pub struct MemoryStore {
weak_self: Weak<Self>,
#[metric(group = "evicting_map")]
evicting_map: EvictingMap<StoreKey<'static>, BytesWrapper, SystemTime>,
subscriptions: RwLock<HashMap<StoreKey<'static>, SubscriptionSender>>,
}

impl MemoryStore {
pub fn new(config: &nativelink_config::stores::MemoryStore) -> Arc<Self> {
let empty_policy = nativelink_config::stores::EvictionPolicy::default();
let eviction_policy = config.eviction_policy.as_ref().unwrap_or(&empty_policy);
Arc::new_cyclic(|weak_self| MemoryStore {
weak_self: weak_self.clone(),
Arc::new(Self {
evicting_map: EvictingMap::new(eviction_policy, SystemTime::now()),
subscriptions: RwLock::new(HashMap::new()),
})
}

Expand All @@ -155,21 +74,6 @@ impl MemoryStore {
pub async fn remove_entry(&self, key: StoreKey<'_>) -> bool {
self.evicting_map.remove(&key.into_owned()).await
}

/// Tells the store that a subscription has been dropped and gives an opportunity to clean up.
fn remove_dropped_subscription(&self, key: StoreKey<'static>) {
let mut subscriptions = self.subscriptions.write();
let Some(sender) = subscriptions.get(&key) else {
event!(
Level::ERROR,
"Subscription dropped for key '{key:?}' but not found in subscriptions"
);
return;
};
if sender.receiver_count() == 0 {
subscriptions.remove(&key);
}
}
}

#[async_trait]
Expand Down Expand Up @@ -233,13 +137,6 @@ impl StoreDriver for MemoryStore {
self.evicting_map
.insert(key.borrow().into_owned(), BytesWrapper(final_buffer))
.await;
{
// Notify all subscribers that the key has been modified.
let subscription_lock = self.subscriptions.read();
if let Some(sender) = subscription_lock.get(&key.into_owned()) {
sender.send_modify(|_| { /* We just need to flag it modified. */ });
}
}
Ok(())
}

Expand Down Expand Up @@ -276,40 +173,6 @@ impl StoreDriver for MemoryStore {
Ok(())
}

fn optimized_for(&self, optimization: StoreOptimizations) -> bool {
optimization == StoreOptimizations::SubscribeChanges
}

async fn subscribe(self: Arc<Self>, key: StoreKey<'_>) -> Box<dyn StoreSubscription> {
let key = key.into_owned();
let subscription_lock = self.subscriptions.upgradable_read();
if let Some(watch_sender) = subscription_lock.get(&key) {
let mut rx = watch_sender.subscribe();
rx.mark_changed();
return Box::new(MemoryStoreSubscription {
store: self.weak_self.clone(),
key: key.borrow().into_owned(),
receiver: Some(rx),
});
}
{
let mut subscription_lock = RwLockUpgradableReadGuard::upgrade(subscription_lock);
let subscription_item: Arc<dyn StoreSubscriptionItem> =
Arc::new(MemoryStoreSubscriptionItem {
store: self.weak_self.clone(),
key: key.borrow().into_owned(),
});
let (tx, mut rx) = watch::channel(Ok(subscription_item));
subscription_lock.insert(key.borrow().into_owned(), tx.clone());
rx.mark_changed();
Box::new(MemoryStoreSubscription {
store: self.weak_self.clone(),
key,
receiver: Some(rx),
})
}
}

fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
self
}
Expand Down
88 changes: 0 additions & 88 deletions nativelink-store/tests/default_store_key_subscribe_test.rs

This file was deleted.

Loading