Skip to content

Commit 59faab8

Browse files
Remove redundant rows from subscription updates (#2654)
1 parent 2f33924 commit 59faab8

File tree

7 files changed

+132
-11
lines changed

7 files changed

+132
-11
lines changed

crates/core/src/db/db_metrics/mod.rs

+10
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,16 @@ metrics_group!(
126126
#[labels(db: Identity)]
127127
pub delta_queries_matched: IntCounterVec,
128128

129+
#[name = spacetime_num_duplicate_rows_evaluated]
130+
#[help = "The number of times we evaluate the same row in a subscription update"]
131+
#[labels(db: Identity)]
132+
pub duplicate_rows_evaluated: IntCounterVec,
133+
134+
#[name = spacetime_num_duplicate_rows_sent]
135+
#[help = "The number of duplicate rows we send in a subscription update"]
136+
#[labels(db: Identity)]
137+
pub duplicate_rows_sent: IntCounterVec,
138+
129139
#[name = spacetime_subscription_connections]
130140
#[help = "Number of connections with active subscriptions"]
131141
#[labels(database_identity: Identity)]

crates/core/src/subscription/delta.rs

+78-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use anyhow::Result;
2+
use hashbrown::HashMap;
23
use spacetimedb_execution::{Datastore, DeltaStore};
34
use spacetimedb_lib::metrics::ExecutionMetrics;
45
use spacetimedb_subscription::SubscriptionPlan;
6+
use spacetimedb_vm::relation::RelValue;
57

68
use crate::host::module_host::UpdatesRelValue;
79

@@ -21,24 +23,92 @@ pub fn eval_delta<'a, Tx: Datastore + DeltaStore>(
2123
plan: &SubscriptionPlan,
2224
) -> Result<Option<UpdatesRelValue<'a>>> {
2325
metrics.delta_queries_evaluated += 1;
26+
2427
let mut inserts = vec![];
2528
let mut deletes = vec![];
2629

27-
plan.for_each_insert(tx, metrics, &mut |row| {
28-
inserts.push(row.into());
29-
Ok(())
30-
})?;
30+
let mut duplicate_rows_evaluated = 0;
31+
let mut duplicate_rows_sent = 0;
32+
33+
if !plan.is_join() {
34+
// Single table plans will never return redundant rows,
35+
// so there's no need to track row counts.
36+
plan.for_each_insert(tx, metrics, &mut |row| {
37+
inserts.push(row.into());
38+
Ok(())
39+
})?;
40+
41+
plan.for_each_delete(tx, metrics, &mut |row| {
42+
deletes.push(row.into());
43+
Ok(())
44+
})?;
45+
} else {
46+
// Query plans for joins may return redundant rows.
47+
// We track row counts to avoid sending them to clients.
48+
let mut insert_counts = HashMap::new();
49+
let mut delete_counts = HashMap::new();
50+
51+
plan.for_each_insert(tx, metrics, &mut |row| {
52+
let n = insert_counts.entry(row).or_default();
53+
if *n > 0 {
54+
duplicate_rows_evaluated += 1;
55+
}
56+
*n += 1;
57+
Ok(())
58+
})?;
3159

32-
plan.for_each_delete(tx, metrics, &mut |row| {
33-
deletes.push(row.into());
34-
Ok(())
35-
})?;
60+
plan.for_each_delete(tx, metrics, &mut |row| {
61+
match insert_counts.get_mut(&row) {
62+
// We have not seen an insert for this row.
63+
// If we have seen a delete, increment the metric.
64+
// Always increment the delete_count.
65+
None => {
66+
let n = delete_counts.entry(row).or_default();
67+
if *n > 0 {
68+
duplicate_rows_evaluated += 1;
69+
}
70+
*n += 1;
71+
}
72+
// We have already seen an insert for this row.
73+
// This is a duplicate, so increment the metric.
74+
//
75+
// There are no more inserts for this row,
76+
// so increment the delete_count as well.
77+
Some(0) => {
78+
duplicate_rows_evaluated += 1;
79+
*delete_counts.entry(row).or_default() += 1;
80+
}
81+
// We have already seen an insert for this row.
82+
// This is a duplicate, so increment the metric.
83+
//
84+
// There are still more inserts for this row,
85+
// so don't increment the delete_count.
86+
Some(n) => {
87+
duplicate_rows_evaluated += 1;
88+
*n -= 1;
89+
}
90+
}
91+
Ok(())
92+
})?;
93+
94+
for (row, n) in insert_counts.into_iter().filter(|(_, n)| *n > 0) {
95+
duplicate_rows_sent += n as u64 - 1;
96+
inserts.extend(std::iter::repeat_n(row, n).map(RelValue::from));
97+
}
98+
for (row, n) in delete_counts.into_iter().filter(|(_, n)| *n > 0) {
99+
duplicate_rows_sent += n as u64 - 1;
100+
deletes.extend(std::iter::repeat_n(row, n).map(RelValue::from));
101+
}
102+
}
36103

37104
// Return `None` for empty updates
38105
if inserts.is_empty() && deletes.is_empty() {
39106
return Ok(None);
40107
}
41108

42109
metrics.delta_queries_matched += 1;
110+
metrics.duplicate_rows_evaluated += duplicate_rows_evaluated;
111+
metrics.duplicate_rows_sent += duplicate_rows_sent;
112+
43113
Ok(Some(UpdatesRelValue { inserts, deletes }))
44114
}

crates/core/src/subscription/mod.rs

+12
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,18 @@ pub(crate) fn record_exec_metrics(workload: &WorkloadType, db: &Identity, metric
6666
.with_label_values(db)
6767
.inc_by(metrics.delta_queries_evaluated);
6868
}
69+
if metrics.duplicate_rows_evaluated > 0 {
70+
DB_METRICS
71+
.duplicate_rows_evaluated
72+
.with_label_values(db)
73+
.inc_by(metrics.duplicate_rows_evaluated);
74+
}
75+
if metrics.duplicate_rows_sent > 0 {
76+
DB_METRICS
77+
.duplicate_rows_sent
78+
.with_label_values(db)
79+
.inc_by(metrics.duplicate_rows_sent);
80+
}
6981
}
7082

7183
/// Execute a subscription query

crates/core/src/subscription/module_subscription_actor.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1788,7 +1788,7 @@ mod tests {
17881788

17891789
// We should only have evaluated a single query
17901790
assert_eq!(metrics.delta_queries_evaluated, 1);
1791-
assert_eq!(metrics.delta_queries_matched, 1);
1791+
assert_eq!(metrics.delta_queries_matched, 0);
17921792

17931793
// Insert a new row into `v`
17941794
let metrics = commit_tx(&db, &subs, [], [(v_id, product![2u64, 6u64, 6u64])])?;

crates/execution/src/lib.rs

+14-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::ops::RangeBounds;
1+
use std::{
2+
hash::{Hash, Hasher},
3+
ops::RangeBounds,
4+
};
25

36
use anyhow::{anyhow, Result};
47
use iter::PlanIter;
@@ -95,12 +98,21 @@ pub trait DeltaStore {
9598
}
9699
}
97100

98-
#[derive(Clone)]
101+
#[derive(Clone, PartialEq, Eq)]
99102
pub enum Row<'a> {
100103
Ptr(RowRef<'a>),
101104
Ref(&'a ProductValue),
102105
}
103106

107+
impl Hash for Row<'_> {
108+
fn hash<H: Hasher>(&self, state: &mut H) {
109+
match self {
110+
Self::Ptr(x) => x.hash(state),
111+
Self::Ref(x) => x.hash(state),
112+
}
113+
}
114+
}
115+
104116
impl Row<'_> {
105117
pub fn to_product_value(&self) -> ProductValue {
106118
match self {

crates/lib/src/metrics.rs

+12
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ pub struct ExecutionMetrics {
5050
pub delta_queries_evaluated: u64,
5151
/// How many subscriptions had some updates?
5252
pub delta_queries_matched: u64,
53+
/// How many times do we evaluate the same row in a subscription update?
54+
pub duplicate_rows_evaluated: u64,
55+
/// How many duplicate rows do we send in a subscription update?
56+
pub duplicate_rows_sent: u64,
5357
}
5458

5559
impl ExecutionMetrics {
@@ -66,6 +70,8 @@ impl ExecutionMetrics {
6670
rows_updated,
6771
delta_queries_evaluated,
6872
delta_queries_matched,
73+
duplicate_rows_evaluated,
74+
duplicate_rows_sent,
6975
}: ExecutionMetrics,
7076
) {
7177
self.index_seeks += index_seeks;
@@ -78,6 +84,8 @@ impl ExecutionMetrics {
7884
self.rows_updated += rows_updated;
7985
self.delta_queries_evaluated += delta_queries_evaluated;
8086
self.delta_queries_matched += delta_queries_matched;
87+
self.duplicate_rows_evaluated += duplicate_rows_evaluated;
88+
self.duplicate_rows_sent += duplicate_rows_sent;
8189
}
8290
}
8391

@@ -100,6 +108,8 @@ mod tests {
100108
rows_updated: 1,
101109
delta_queries_evaluated: 2,
102110
delta_queries_matched: 3,
111+
duplicate_rows_evaluated: 4,
112+
duplicate_rows_sent: 2,
103113
});
104114

105115
assert_eq!(a.index_seeks, 1);
@@ -111,5 +121,7 @@ mod tests {
111121
assert_eq!(a.rows_deleted, 1);
112122
assert_eq!(a.delta_queries_evaluated, 2);
113123
assert_eq!(a.delta_queries_matched, 3);
124+
assert_eq!(a.duplicate_rows_evaluated, 4);
125+
assert_eq!(a.duplicate_rows_sent, 2);
114126
}
115127
}

crates/subscription/src/lib.rs

+5
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ pub struct SubscriptionPlan {
230230
}
231231

232232
impl SubscriptionPlan {
233+
/// Is this a plan for a join?
234+
pub fn is_join(&self) -> bool {
235+
self.fragments.insert_plans.len() > 1 && self.fragments.delete_plans.len() > 1
236+
}
237+
233238
/// To which table does this plan subscribe?
234239
pub fn subscribed_table_id(&self) -> TableId {
235240
self.return_id

0 commit comments

Comments
 (0)