Skip to content

Commit 11c0da3

Browse files
Remove redundant rows from subscription updates
1 parent aaea2d7 commit 11c0da3

File tree

3 files changed

+39
-7
lines changed

3 files changed

+39
-7
lines changed

crates/core/src/subscription/delta.rs

+24-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use anyhow::Result;
2+
use hashbrown::HashMap;
23
use spacetimedb_execution::{Datastore, DeltaStore};
34
use spacetimedb_lib::metrics::ExecutionMetrics;
45
use spacetimedb_subscription::SubscriptionPlan;
6+
use spacetimedb_vm::relation::RelValue;
57

68
use crate::host::module_host::UpdatesRelValue;
79

@@ -21,19 +23,37 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
2123
plan: &SubscriptionPlan,
2224
) -> Result<Option<UpdatesRelValue<'a>>> {
2325
metrics.delta_queries_evaluated += 1;
24-
let mut inserts = vec![];
25-
let mut deletes = vec![];
26+
27+
let mut insert_counts = HashMap::new();
28+
let mut delete_counts = HashMap::new();
2629

2730
plan.for_each_insert(tx, metrics, &mut |row| {
28-
inserts.push(row.into());
31+
*insert_counts.entry(row).or_default() += 1;
2932
Ok(())
3033
})?;
3134

3235
plan.for_each_delete(tx, metrics, &mut |row| {
33-
deletes.push(row.into());
36+
match insert_counts.get_mut(&row) {
37+
None | Some(0) => {
38+
*delete_counts.entry(row).or_default() += 1;
39+
}
40+
Some(n) => {
41+
*n -= 1;
42+
}
43+
}
3444
Ok(())
3545
})?;
3646

47+
let mut inserts = vec![];
48+
let mut deletes = vec![];
49+
50+
for (row, n) in insert_counts.into_iter().filter(|(_, n)| *n > 0) {
51+
inserts.extend(std::iter::repeat_n(row, n).map(RelValue::from));
52+
}
53+
for (row, n) in delete_counts.into_iter().filter(|(_, n)| *n > 0) {
54+
deletes.extend(std::iter::repeat_n(row, n).map(RelValue::from));
55+
}
56+
3757
// Return `None` for empty updates
3858
if inserts.is_empty() && deletes.is_empty() {
3959
return Ok(None);

crates/core/src/subscription/module_subscription_actor.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1692,7 +1692,7 @@ mod tests {
16921692

16931693
// We should only have evaluated a single query
16941694
assert_eq!(metrics.delta_queries_evaluated, 1);
1695-
assert_eq!(metrics.delta_queries_matched, 1);
1695+
assert_eq!(metrics.delta_queries_matched, 0);
16961696

16971697
// Insert a new row into `v`
16981698
let metrics = commit_tx(&db, &subs, [], [(v_id, product![2u64, 6u64, 6u64])])?;

crates/execution/src/lib.rs

+14-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::ops::RangeBounds;
1+
use std::{
2+
hash::{Hash, Hasher},
3+
ops::RangeBounds,
4+
};
25

36
use anyhow::{anyhow, Result};
47
use iter::PlanIter;
@@ -95,12 +98,21 @@ pub trait DeltaStore {
9598
}
9699
}
97100

98-
#[derive(Clone)]
101+
#[derive(Clone, PartialEq, Eq)]
99102
pub enum Row<'a> {
100103
Ptr(RowRef<'a>),
101104
Ref(&'a ProductValue),
102105
}
103106

107+
impl Hash for Row<'_> {
108+
fn hash<H: Hasher>(&self, state: &mut H) {
109+
match self {
110+
Self::Ptr(x) => x.hash(state),
111+
Self::Ref(x) => x.hash(state),
112+
}
113+
}
114+
}
115+
104116
impl Row<'_> {
105117
pub fn to_product_value(&self) -> ProductValue {
106118
match self {

0 commit comments

Comments
 (0)