diff --git a/client/src/bin/spaced.rs b/client/src/bin/spaced.rs index 0abe704..8090624 100644 --- a/client/src/bin/spaced.rs +++ b/client/src/bin/spaced.rs @@ -8,7 +8,7 @@ use spaces_client::{ rpc::{AsyncChainState, RpcServerImpl, WalletLoadRequest, WalletManager}, source::{BitcoinBlockSource, BitcoinRpc}, store, - sync::Spaced, + spaces::Spaced, wallets::RpcWallet, }; use store::LiveSnapshot; @@ -53,7 +53,7 @@ impl Composer { } } - async fn setup_rpc_wallet(&mut self, spaced: &Spaced, rx: mpsc::Receiver) { + async fn setup_rpc_wallet(&mut self, spaced: &Spaced, rx: mpsc::Receiver, cbf: bool) { let wallet_service = RpcWallet::service( spaced.network, spaced.rpc.clone(), @@ -61,6 +61,7 @@ impl Composer { rx, self.shutdown.clone(), spaced.num_workers, + cbf ); self.services.spawn(async move { @@ -107,7 +108,7 @@ impl Composer { .map_err(|e| anyhow!("RPC Server error: {}", e)) }); - self.setup_rpc_wallet(spaced, wallet_loader_rx).await; + self.setup_rpc_wallet(spaced, wallet_loader_rx, spaced.cbf).await; } async fn setup_sync_service(&mut self, mut spaced: Spaced) { diff --git a/client/src/cbf.rs b/client/src/cbf.rs new file mode 100644 index 0000000..efab775 --- /dev/null +++ b/client/src/cbf.rs @@ -0,0 +1,291 @@ +use std::collections::{BTreeMap, HashSet, VecDeque}; +use std::time::Duration; +use anyhow::anyhow; +use log::info; +use tokio::time::Instant; +use spaces_protocol::bitcoin::BlockHash; +use spaces_protocol::constants::ChainAnchor; +use spaces_wallet::bdk_wallet::chain::{local_chain, BlockId, ConfirmationBlockTime, IndexedTxGraph, TxUpdate}; +use spaces_wallet::bdk_wallet::chain::keychain_txout::KeychainTxOutIndex; +use spaces_wallet::bdk_wallet::{KeychainKind, Update}; +use spaces_wallet::bitcoin::bip158::BlockFilter; +use spaces_wallet::bitcoin::ScriptBuf; +use spaces_wallet::SpacesWallet; +use crate::client::{BlockSource, BlockchainInfo}; +use crate::source::BitcoinBlockSource; +use crate::wallets::WalletProgressUpdate; + +pub struct CompactFilterSync { + graph: IndexedTxGraph>, + chain: local_chain::LocalChain, + chain_changeset: BTreeMap>, + scripts: HashSet, + last_peek_index: u32, + initial_tip: ChainAnchor, + queued_blocks: BTreeMap, + queued_filters: VecDeque, + filters_tip: u32, + block_matches: u32, + total_filters: u32, + wait: Option, + state: SyncState, +} + +enum SyncState { + SyncChecks, + LoadFilterRange(BlockchainInfo), + ProcessFilters, + QueueBlocks, + WaitForBlocks, + ProcessBlocks, + ApplyUpdate, + Synced, +} + +impl CompactFilterSync { + pub fn new(wallet: &SpacesWallet) -> Self { + let initial_tip = { + let tip = wallet.local_chain().tip(); + ChainAnchor { height: tip.height(), hash: tip.hash() } + }; + + let mut cbf = Self { + graph: IndexedTxGraph::new(wallet.spk_index().clone()), + chain: wallet.local_chain().clone(), + chain_changeset: BTreeMap::new(), + scripts: HashSet::new(), + last_peek_index: 0, + initial_tip, + queued_blocks: BTreeMap::new(), + queued_filters: Default::default(), + filters_tip: 0, + block_matches: 0, + total_filters: 0, + wait: None, + state: SyncState::SyncChecks, + }; + cbf.load_scripts(wallet); + cbf + } + + fn load_scripts(&mut self, wallet: &SpacesWallet) { + let lookahead = wallet.spk_index().lookahead(); + let mut max_idx = 0; + for keychain in [KeychainKind::External, KeychainKind::Internal] { + let last_revealed = wallet + .spk_index() + .last_revealed_index(keychain) + .unwrap_or(0); + let chain_limit = last_revealed + lookahead; + for idx in 0..=chain_limit { + let script = wallet.peek_address(keychain, idx).script_pubkey(); + self.scripts.insert(script); + } + max_idx = max_idx.max(chain_limit); + } + self.last_peek_index = max_idx; + } + + /// Expand scripts by an additional fixed window beyond the last peek + fn load_more_scripts(&mut self, wallet: &SpacesWallet) { + let end = self.last_peek_index + 10; + for keychain in [KeychainKind::External, KeychainKind::Internal] { + for idx in self.last_peek_index..=end { + let script = wallet.peek_address(keychain, idx).script_pubkey(); + self.scripts.insert(script); + } + } + self.last_peek_index = end; + } + + pub fn synced(&self) -> bool { + matches!(self.state, SyncState::Synced) + } + + pub fn sync_next( + &mut self, + wallet: &mut SpacesWallet, + source: &BitcoinBlockSource, + progress: &mut WalletProgressUpdate, + ) -> anyhow::Result<()> { + if self.wait.is_some_and(|w| w.elapsed() < Duration::from_secs(10)) { + return Ok(()); + } + self.wait = None; + + match &self.state { + SyncState::SyncChecks => { + let info = source.get_blockchain_info()?; + if info.headers != info.blocks { + info!("Source still syncing, retrying..."); + *progress = WalletProgressUpdate::Syncing; + self.wait = Some(Instant::now()); + return Ok(()); + } + if info.filters != info.filter_headers { + info!("Filters syncing, retrying..."); + *progress = WalletProgressUpdate::CbfFilterSync { + total: info.filter_headers.unwrap_or(0), + completed: info.filters.unwrap_or(0), + }; + self.wait = Some(Instant::now()); + return Ok(()); + } + // if wallet already past filter headers, we're done + if let Some(filter_headers) = info.filter_headers { + if self.initial_tip.height >= filter_headers { + info!("wallet({}): tip {} >= filters {}, cbf done", wallet.name(), self.initial_tip.height, filter_headers); + self.state = SyncState::Synced; + return Ok(()); + } + } + self.state = SyncState::LoadFilterRange(info); + } + SyncState::LoadFilterRange(info) => { + let checkpoint = info + .checkpoint + .ok_or_else(|| anyhow!("filter sync: checkpoint missing"))?; + if self.initial_tip.height < checkpoint.height { + return Err(anyhow!( + "Wallet birthday {} < checkpoint {}", self.initial_tip.height, checkpoint.height + )); + } + + let start = self.initial_tip.height; + let end = info + .prune_height + .ok_or(anyhow!("Prune height missing"))?; + let available_filters = info.filters.ok_or(anyhow!("Filters missing"))?; + if end > available_filters { + return Err(anyhow!("Prune height {} > {} available filters", end, available_filters)); + } + + if start >= end { + return Ok(()); + } + for height in start..=end { + self.queued_filters.push_back(height); + } + self.filters_tip = end; + self.total_filters = self.queued_filters.len() as u32; + self.state = SyncState::ProcessFilters; + } + SyncState::ProcessFilters => { + let height = match self.queued_filters.pop_front() { + None => { + self.state = SyncState::QueueBlocks; + return Ok(()); + } + Some(f) => f, + }; + let idx_filter = source.get_block_filter_by_height(height)?; + let idx_filter = idx_filter + .ok_or_else(|| anyhow!("filter sync: block filter missing {}", height))?; + let filter = BlockFilter::new(&idx_filter.content); + if filter.match_any(&idx_filter.hash, self.scripts.iter().map(|s| s.as_bytes()))? { + self.queued_blocks.insert(height, idx_filter.hash); + self.load_more_scripts(wallet); + self.block_matches += 1; + info!("wallet({}) processed block filter {} - match found", wallet.name(), height); + } else { + info!("wallet({}) processed block filter {} - no match", wallet.name(), height); + } + *progress = WalletProgressUpdate::CbfProcessFilters { + total: self.total_filters, + completed: self.total_filters - self.queued_filters.len() as u32, + }; + } + SyncState::QueueBlocks => { + if !self.queued_blocks.is_empty() { + let heights: Vec = self.queued_blocks.keys().copied().collect(); + info!("wallet({}): queueing {} blocks", wallet.name(), heights.len()); + source.queue_blocks(heights)?; + } + self.state = SyncState::WaitForBlocks; + } + SyncState::WaitForBlocks => { + let info = source.get_blockchain_info()?; + let status = info + .block_queue + .as_ref() + .ok_or_else(|| anyhow!("filter sync: block queue missing"))?; + + if status.pending > 0 { + info!("wallet({}): waiting for {} pending blocks", wallet.name(), status.pending); + + // The client has a global state for pending blocks in the queue + // so we cap it just in case other things are queuing blocks + // at the same time + let pending = std::cmp::min(status.pending, self.block_matches); + *progress = WalletProgressUpdate::CbfDownloadMatchingBlocks { + total: self.block_matches, + completed: self.block_matches - pending, + }; + + self.wait = Some(Instant::now()); + return Ok(()); + } + + if status.completed < self.queued_blocks.len() as u32 { + return Err(anyhow!( + "incomplete downloads: {} of {}", status.completed, self.queued_blocks.len() + )); + } + self.state = SyncState::ProcessBlocks; + } + SyncState::ProcessBlocks => { + let (height, hash) = match self.queued_blocks.pop_first() { + None => { + *progress = WalletProgressUpdate::CbfApplyUpdate; + self.state = SyncState::ApplyUpdate; + return Ok(()); + } + Some(f) => f, + }; + info!("wallet({}): processing block {} {}", wallet.name(), height, hash); + let block = source.get_block(&hash)? + .ok_or(anyhow!("block {} {} not found", height, hash))?; + self.chain_changeset.insert(height, Some(hash)); + let _ = self.graph.apply_block_relevant(&block, height); + *progress = WalletProgressUpdate::CbfProcessMatchingBlocks { + total: self.block_matches, + completed: self.block_matches - self.queued_blocks.len() as u32 , + }; + } + SyncState::ApplyUpdate => { + info!("wallet({}): updating wallet tip to {}", wallet.name(), self.filters_tip); + let filters_anchor = BlockId { + height: self.filters_tip, + hash: source.get_block_hash(self.filters_tip)?, + }; + + let update = self.get_scan_response(); + wallet.apply_update(update)?; + wallet.insert_checkpoint(filters_anchor)?; + info!("wallet({}): compact filter sync portion complete at {}", wallet.name(), self.filters_tip); + self.state = SyncState::Synced; + // Only CBF portion is done + *progress = WalletProgressUpdate::Syncing + } + SyncState::Synced => {}, + } + Ok(()) + } + + // based on https://github.com/bitcoindevkit/bdk-kyoto/blob/master/src/lib.rs#L137 + fn get_scan_response(&mut self) -> Update { + let changes = std::mem::take(&mut self.chain_changeset); + self.chain + .apply_changeset(&local_chain::ChangeSet::from(changes)) + .expect("initialized from genesis"); + let tx_update = TxUpdate::from(self.graph.graph().clone()); + let graph = std::mem::take(&mut self.graph); + let last_indices = graph.index.last_used_indices(); + self.graph = IndexedTxGraph::new(graph.index); + Update { + tx_update, + last_active_indices: last_indices, + chain: Some(self.chain.tip()), + } + } +} diff --git a/client/src/client.rs b/client/src/client.rs index efa59ec..0778595 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -5,7 +5,8 @@ use std::{error::Error, fmt}; use anyhow::{anyhow, Result}; use bincode::{Decode, Encode}; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use serde::de::Error as SerdeError; use spaces_protocol::{ bitcoin::{Amount, Block, BlockHash, OutPoint, Txid}, constants::{ChainAnchor, ROLLOUT_BATCH_SIZE, ROLLOUT_BLOCK_INTERVAL}, @@ -20,16 +21,53 @@ use crate::{ source::BitcoinRpcError, store::{ChainState, ChainStore, LiveSnapshot, LiveStore, Sha256}, }; +use crate::source::BlockQueueResult; pub trait BlockSource { fn get_block_hash(&self, height: u32) -> Result; - fn get_block(&self, hash: &BlockHash) -> Result; + fn get_block(&self, hash: &BlockHash) -> Result, BitcoinRpcError>; fn get_median_time(&self) -> Result; fn in_mempool(&self, txid: &Txid, height: u32) -> Result; fn get_block_count(&self) -> Result; fn get_best_chain(&self, tip: Option, expected_chain: Network) -> Result, BitcoinRpcError>; + fn get_blockchain_info(&self) -> Result; + fn get_block_filter_by_height(&self, height: u32) -> Result, BitcoinRpcError>; + fn queue_blocks(&self, heights: Vec) -> Result<(), BitcoinRpcError>; } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BlockFilterRpc { + pub hash: BlockHash, + pub height: u32, + #[serde( + serialize_with = "serialize_hex", + deserialize_with = "deserialize_hex" + )] + pub content: Vec, +} + +#[derive(Serialize, Deserialize)] +pub struct BlockchainInfo { + pub chain: String, + pub blocks: u32, + pub headers: u32, + #[serde(rename = "bestblockhash")] + pub best_block_hash: BlockHash, + #[serde(rename = "pruneheight", skip_serializing_if = "Option::is_none")] + pub prune_height: Option, + pub pruned: bool, + // Light sync specific info + #[serde(skip_serializing_if = "Option::is_none")] + pub filters: Option, + #[serde(rename = "filterheaders", skip_serializing_if = "Option::is_none")] + pub filter_headers: Option, + #[serde(rename = "blockqueue", skip_serializing_if = "Option::is_none")] + pub block_queue: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub checkpoint: Option, +} + + #[derive(Debug, Clone)] pub struct Client { validator: Validator, @@ -354,3 +392,19 @@ fn unwrap_bid_value(spaceout: &SpaceOut) -> (Amount, Amount) { } panic!("expected a bid covenant") } + + +fn serialize_hex(bytes: &Vec, s: S) -> std::result::Result +where + S: Serializer, +{ + s.serialize_str(hex::encode(bytes).as_str()) +} + +fn deserialize_hex<'de, D>(d: D) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, +{ + let s = String::deserialize(d)?; + hex::decode(s).map_err(D::Error::custom) +} \ No newline at end of file diff --git a/client/src/config.rs b/client/src/config.rs index 5bda884..20749d0 100644 --- a/client/src/config.rs +++ b/client/src/config.rs @@ -22,7 +22,7 @@ use toml::Value; use crate::{ source::{BitcoinRpc, BitcoinRpcAuth}, store::{LiveStore, Store}, - sync::Spaced, + spaces::Spaced, }; const RPC_OPTIONS: &str = "RPC Server Options"; @@ -224,6 +224,7 @@ impl Args { num_workers: args.jobs as usize, anchors_path, synced: false, + cbf: args.bitcoin_rpc_light }) } diff --git a/client/src/format.rs b/client/src/format.rs index 1ddaa5c..792c977 100644 --- a/client/src/format.rs +++ b/client/src/format.rs @@ -14,7 +14,7 @@ use spaces_wallet::{ BidEventDetails, BidoutEventDetails, OpenEventDetails, SendEventDetails, TransferEventDetails, TxEventKind, }, - Balance, DoubleUtxo, WalletInfo, WalletOutput, + Balance, DoubleUtxo, WalletOutput, }; use tabled::{Table, Tabled}; @@ -22,6 +22,7 @@ use crate::{ rpc::ServerInfo, wallets::{ListSpacesResponse, TxInfo, TxResponse, WalletResponse}, }; +use crate::wallets::{WalletInfoWithProgress, WalletProgressUpdate}; #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, ValueEnum, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] @@ -163,27 +164,62 @@ pub fn print_server_info(info: ServerInfo, format: Format) { } } -pub fn print_wallet_info(info: WalletInfo, format: Format) { +pub fn print_wallet_info(prog: WalletInfoWithProgress, format: Format) { match format { Format::Text => { - println!("WALLET: {}", info.label); - println!(" Tip {}\n Birthday {}", info.tip, info.start_block); + println!("WALLET: {}", prog.info.label); + println!(" Tip {}\n Birthday {}", prog.info.tip, prog.info.start_block); println!(" Public descriptors"); - for desc in info.descriptors { + for desc in prog.info.descriptors { println!(" {}", desc.descriptor); } + + // Print sync status + println!(" Sync Status:"); + match prog.status { + WalletProgressUpdate::SourceSync { total, completed } => { + println!(" Source Syncing: {}/{} ({:.1}%)", completed, total, + (completed as f64 / total as f64) * 100.0); + } + WalletProgressUpdate::CbfFilterSync { total, completed } => { + println!(" Filters Syncing: {}/{} ({:.1}%)", completed, total, + (completed as f64 / total as f64) * 100.0); + } + WalletProgressUpdate::CbfProcessFilters { total, completed } => { + println!(" Processing Filters: {}/{} ({:.1}%)", completed, total, + (completed as f64 / total as f64) * 100.0); + } + WalletProgressUpdate::CbfDownloadMatchingBlocks { total, completed } => { + println!(" Downloading Matching Blocks: {}/{} ({:.1}%)", completed, total, + (completed as f64 / total as f64) * 100.0); + } + WalletProgressUpdate::CbfProcessMatchingBlocks { total, completed } => { + println!(" Processing Matching Blocks: {}/{} ({:.1}%)", completed, total, + (completed as f64 / total as f64) * 100.0); + } + WalletProgressUpdate::Syncing => { + println!(" Syncing: In progress ({:.1}%):", prog.info.progress * 100.0); + } + WalletProgressUpdate::CbfApplyUpdate => { + println!(" Applying compact filters update"); + } + WalletProgressUpdate::Complete => { + println!(" Complete"); + } + } + println!(); } Format::Json => { - println!("{}", serde_json::to_string_pretty(&info).unwrap()); + println!("{}", serde_json::to_string_pretty(&prog.info).unwrap()); } } } fn ascii_table(iter: I) -> String where - I: IntoIterator, + I: IntoIterator, T: Tabled, { Table::new(iter) diff --git a/client/src/lib.rs b/client/src/lib.rs index 471e02f..047f48f 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -16,8 +16,9 @@ pub mod format; pub mod rpc; pub mod source; pub mod store; -pub mod sync; +pub mod spaces; pub mod wallets; +mod cbf; fn std_wait(mut predicate: F, wait: Duration) where @@ -59,3 +60,15 @@ where Vec::::deserialize(deserializer) } } + +pub fn calc_progress(start_block: u32, tip: u32, chain_tip: u32) -> f32 { + if chain_tip <= start_block || tip < start_block { + 0.0 + } else if tip >= chain_tip { + 1.0 + } else { + let blocks_synced = tip - start_block; + let blocks_to_sync = chain_tip - start_block; + blocks_synced as f32 / blocks_to_sync as f32 + } +} diff --git a/client/src/rpc.rs b/client/src/rpc.rs index dcfc133..3d4b128 100644 --- a/client/src/rpc.rs +++ b/client/src/rpc.rs @@ -40,7 +40,7 @@ use spaces_protocol::{ use spaces_wallet::{ bdk_wallet as bdk, bdk_wallet::template::Bip86, bitcoin::hashes::Hash as BitcoinHash, export::WalletExport, nostr::NostrEvent, Balance, DoubleUtxo, Listing, SpacesWallet, - WalletConfig, WalletDescriptors, WalletInfo, WalletOutput, + WalletConfig, WalletDescriptors, WalletOutput, }; use tokio::{ select, @@ -48,19 +48,11 @@ use tokio::{ task::JoinSet, }; -use crate::{ - checker::TxChecker, - client::{BlockMeta, TxEntry}, - config::ExtendedNetwork, - deserialize_base64, serialize_base64, - source::BitcoinRpc, - store::{ChainState, LiveSnapshot, RolloutEntry, Sha256}, - sync::{COMMIT_BLOCK_INTERVAL, ROOT_ANCHORS_COUNT}, - wallets::{ - AddressKind, ListSpacesResponse, RpcWallet, TxInfo, TxResponse, WalletCommand, - WalletResponse, - }, -}; +use crate::{calc_progress, checker::TxChecker, client::{BlockMeta, TxEntry}, config::ExtendedNetwork, deserialize_base64, serialize_base64, source::BitcoinRpc, store::{ChainState, LiveSnapshot, RolloutEntry, Sha256}, spaces::{COMMIT_BLOCK_INTERVAL, ROOT_ANCHORS_COUNT}, wallets::{ + AddressKind, ListSpacesResponse, RpcWallet, TxInfo, TxResponse, WalletCommand, + WalletResponse, +}}; +use crate::wallets::WalletInfoWithProgress; pub(crate) type Responder = oneshot::Sender; @@ -229,7 +221,7 @@ pub trait Rpc { ) -> Result; #[method(name = "walletgetinfo")] - async fn wallet_get_info(&self, name: &str) -> Result; + async fn wallet_get_info(&self, name: &str) -> Result; #[method(name = "walletexport")] async fn wallet_export(&self, name: &str) -> Result; @@ -870,7 +862,7 @@ impl RpcServer for RpcServerImpl { .map_err(|error| ErrorObjectOwned::owned(-1, error.to_string(), None::)) } - async fn wallet_get_info(&self, wallet: &str) -> Result { + async fn wallet_get_info(&self, wallet: &str) -> Result { self.wallet(&wallet) .await? .send_get_info() @@ -1584,6 +1576,14 @@ async fn get_server_info(client: &reqwest::Client, rpc: &BitcoinRpc, tip: ChainA .await .map_err(|e| anyhow!("Could not retrieve blockchain info ({})", e))?; + let start_block = if info.chain == "main" { + 871_222 + } else if info.chain.starts_with("test") { + 50_000 + } else { + 0 + }; + Ok(ServerInfo { network: info.chain, tip, @@ -1591,10 +1591,6 @@ async fn get_server_info(client: &reqwest::Client, rpc: &BitcoinRpc, tip: ChainA blocks: info.blocks, headers: info.headers, }, - progress: if info.headers != 0 && info.headers >= tip.height { - tip.height as f32 / info.headers as f32 - } else { - 0.0 - }, + progress: calc_progress(start_block, tip.height, info.headers), }) } \ No newline at end of file diff --git a/client/src/source.rs b/client/src/source.rs index 228d408..a096d1a 100644 --- a/client/src/source.rs +++ b/client/src/source.rs @@ -22,6 +22,7 @@ use threadpool::ThreadPool; use tokio::time::Instant; use spaces_protocol::bitcoin::Network; use crate::{client::BlockSource, std_wait}; +use crate::client::BlockFilterRpc; const BITCOIN_RPC_IN_WARMUP: i32 = -28; // Client still warming up const BITCOIN_RPC_CLIENT_NOT_CONNECTED: i32 = -9; // Bitcoin is not connected @@ -98,7 +99,7 @@ impl From for BlockFetchError { #[derive(Serialize, Deserialize)] pub struct JsonRpcResponse { - pub result: Option, + pub result: T, pub error: Option, pub id: String, } @@ -179,6 +180,16 @@ impl BitcoinRpc { let params = serde_json::json!([]); self.make_request("getblockchaininfo", params) } + pub fn get_block_filter_by_height(&self, height: u32) -> BitcoinRpcRequest { + let params = serde_json::json!([height]); + self.make_request("getblockfilterbyheight", params) + } + + pub fn queue_blocks(&self, heights: Vec) -> BitcoinRpcRequest { + let params = serde_json::json!([heights]); + self.make_request("queueblocks", params) + } + pub fn get_mempool_entry(&self, txid: &Txid) -> BitcoinRpcRequest { let params = serde_json::json!([txid]); @@ -345,7 +356,7 @@ impl BitcoinRpc { } fn parse_error_bytes(status: StatusCode, res_bytes: &[u8]) -> BitcoinRpcError { - let parsed_response: Result, serde_json::Error> = + let parsed_response: Result>, serde_json::Error> = serde_json::from_slice(&res_bytes); match parsed_response { @@ -562,12 +573,14 @@ impl BlockFetcher { raw } else { // fallback to decoding json - let hex_block: JsonRpcResponse = serde_json::from_slice(raw.as_slice()) - .map_err(|e| BitcoinRpcError::Other(e.to_string()))?; + let hex_block: JsonRpcResponse> = serde_json::from_slice(raw.as_slice()) + .map_err(|e| BitcoinRpcError::Other(format!("fetch block {}: {}",hash, e.to_string())))?; if let Some(e) = hex_block.error { return Err(BitcoinRpcError::Rpc(e)); } - hex_block.result.unwrap().into_bytes() + let block = hex_block.result + .ok_or(BitcoinRpcError::Other(format!("could not find block with hash {}", hash)))?; + block.into_bytes() }; if hex_block.len() % 2 != 0 { @@ -803,7 +816,7 @@ impl ErrorForRpc for reqwest::Response { return Err(BitcoinRpcError::Rpc(e)); } - Ok(rpc_res.result.unwrap()) + Ok(rpc_res.result) } } @@ -814,10 +827,17 @@ impl ErrorForRpcBlocking for reqwest::blocking::Response { return Err(BitcoinRpcError::Rpc(e)); } - Ok(rpc_res.result.unwrap()) + Ok(rpc_res.result) } } +#[derive(Debug, Serialize, Deserialize)] +pub struct BlockQueueResult { + pub pending: u32, + pub completed: u32, +} + + #[derive(Clone)] pub struct BitcoinBlockSource { pub client: reqwest::blocking::Client, @@ -838,10 +858,8 @@ impl BlockSource for BitcoinBlockSource { .send_json_blocking(&self.client, &self.rpc.get_block_hash(height))?) } - fn get_block(&self, hash: &BlockHash) -> Result { - Ok(self - .rpc - .send_json_blocking(&self.client, &self.rpc.get_block(hash))?) + fn get_block(&self, hash: &BlockHash) -> Result, BitcoinRpcError> { + BlockFetcher::fetch_block(self, hash).map(|b| Some(b)) } fn get_median_time(&self) -> Result { @@ -887,6 +905,7 @@ impl BlockSource for BitcoinBlockSource { .send_json_blocking(&self.client, &self.rpc.get_block_count())?) } + fn get_best_chain(&self, tip: Option, expected_chain: Network) -> Result, BitcoinRpcError> { #[derive(Deserialize)] struct Info { @@ -938,4 +957,28 @@ impl BlockSource for BitcoinBlockSource { Ok(Some(best_chain)) } + + fn get_blockchain_info(&self) -> anyhow::Result { + let mut info: crate::client::BlockchainInfo = self + .rpc + .send_json_blocking(&self.client, &self.rpc.get_blockchain_info())?; + if info.chain.starts_with("test") { + info.chain = "test".to_string() + } + Ok(info) + } + + fn get_block_filter_by_height(&self, height: u32) -> anyhow::Result, BitcoinRpcError> { + let filter: Option = self + .rpc + .send_json_blocking(&self.client, &self.rpc.get_block_filter_by_height(height))?; + Ok(filter) + } + + fn queue_blocks(&self, heights: Vec) -> anyhow::Result<(), BitcoinRpcError> { + self + .rpc + .send_json_blocking::<()>(&self.client, &self.rpc.queue_blocks(heights))?; + Ok(()) + } } diff --git a/client/src/sync.rs b/client/src/spaces.rs similarity index 99% rename from client/src/sync.rs rename to client/src/spaces.rs index 9afd773..c2774f1 100644 --- a/client/src/sync.rs +++ b/client/src/spaces.rs @@ -45,6 +45,7 @@ pub struct Spaced { pub num_workers: usize, pub anchors_path: Option, pub synced: bool, + pub cbf: bool, } impl Spaced { diff --git a/client/src/wallets.rs b/client/src/wallets.rs index 7f1fb2e..8d88cf7 100644 --- a/client/src/wallets.rs +++ b/client/src/wallets.rs @@ -14,6 +14,7 @@ use spaces_protocol::{ slabel::SLabel, FullSpaceOut, SpaceOut, }; + use spaces_wallet::{ address::SpaceAddress, bdk_wallet::{ @@ -27,6 +28,7 @@ use spaces_wallet::{ tx_event::{TxEvent, TxEventKind, TxRecord}, Balance, DoubleUtxo, Listing, SpacesWallet, WalletInfo, WalletOutput, }; + use tabled::Tabled; use tokio::{ select, @@ -34,17 +36,10 @@ use tokio::{ time::Instant, }; -use crate::{ - checker::TxChecker, - client::BlockSource, - config::ExtendedNetwork, - rpc::{RpcWalletRequest, RpcWalletTxBuilder, WalletLoadRequest}, - source::{ - BitcoinBlockSource, BitcoinRpc, BitcoinRpcError, BlockEvent, BlockFetchError, BlockFetcher, - }, - std_wait, - store::{ChainState, LiveSnapshot, Sha256}, -}; +use crate::{calc_progress, checker::TxChecker, client::BlockSource, config::ExtendedNetwork, rpc::{RpcWalletRequest, RpcWalletTxBuilder, WalletLoadRequest}, source::{ + BitcoinBlockSource, BitcoinRpc, BitcoinRpcError, BlockEvent, BlockFetchError, BlockFetcher, +}, std_wait, store::{ChainState, LiveSnapshot, Sha256}}; +use crate::cbf::{CompactFilterSync}; const MEMPOOL_CHECK_INTERVAL: Duration = Duration::from_secs(if cfg!(debug_assertions) { 1 } else { 5 * 60 }); @@ -59,6 +54,49 @@ pub struct TxResponse { pub raw: Option, } +#[derive(Copy, Clone, Serialize, Deserialize, Debug)] +#[serde(tag = "status")] +pub enum WalletProgressUpdate { + #[serde(rename = "source_sync")] + SourceSync { + total: u32, + completed: u32, + }, + #[serde(rename = "cbf_filter_sync")] + CbfFilterSync { + total: u32, + completed: u32, + }, + #[serde(rename = "cbf_process_filters")] + CbfProcessFilters { + total: u32, + completed: u32, + }, + #[serde(rename = "cbf_download_matching_blocks")] + CbfDownloadMatchingBlocks { + total: u32, + completed: u32, + }, + #[serde(rename = "cbf_process_matching_blocks")] + CbfProcessMatchingBlocks { + total: u32, + completed: u32, + }, + #[serde(rename = "cbf_apply_update")] + CbfApplyUpdate, + #[serde(rename = "syncing")] + Syncing, + #[serde(rename = "complete")] + Complete, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WalletInfoWithProgress { + #[serde(flatten)] + pub info: WalletInfo, + pub status: WalletProgressUpdate +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ListSpacesResponse { pub winning: Vec, @@ -118,7 +156,7 @@ pub struct WalletResponse { pub enum WalletCommand { GetInfo { - resp: crate::rpc::Responder>, + resp: crate::rpc::Responder>, }, BatchTx { request: RpcWalletTxBuilder, @@ -362,19 +400,22 @@ impl RpcWallet { mut state: &mut LiveSnapshot, wallet: &mut SpacesWallet, command: WalletCommand, - synced: bool, + progress_update: WalletProgressUpdate, ) -> anyhow::Result<()> { + let synced = matches!(progress_update, WalletProgressUpdate::Complete); match command { WalletCommand::GetInfo { resp } =>{ - let mut wallet_info = wallet.get_info(); + let mut wallet_info = WalletInfoWithProgress { + info: wallet.get_info(), + status: progress_update, + }; + let best_chain = source - .get_best_chain(Some(wallet_info.tip), wallet.config.network); + .get_best_chain(Some(wallet_info.info.tip), wallet.config.network); if let Ok(Some(best_chain)) = best_chain { - wallet_info.progress = if best_chain.height >= wallet_info.tip { - wallet_info.tip as f32 / best_chain.height as f32 - } else { - 0.0 - } + wallet_info.info.progress = calc_progress( + wallet.config.start_block, + wallet_info.info.tip, best_chain.height); } _ = resp.send(Ok(wallet_info)) @@ -516,6 +557,7 @@ impl RpcWallet { mut commands: Receiver, shutdown: broadcast::Sender<()>, num_workers: usize, + cbf: bool, ) -> anyhow::Result<()> { let (fetcher, receiver) = BlockFetcher::new(network.fallback_network(), source.clone(), @@ -530,25 +572,61 @@ impl RpcWallet { }; let mut shutdown_recv = shutdown.subscribe(); - fetcher.start(wallet_tip); + + let mut cbf_sync = if cbf { + Some(CompactFilterSync::new(&wallet)) + } else { + fetcher.start(wallet_tip); + None + }; + let mut synced_at_least_once = false; let mut last_mempool_check = Instant::now(); + let mut wallet_progress = WalletProgressUpdate::Syncing; + loop { if shutdown_recv.try_recv().is_ok() { info!("Shutting down wallet sync"); break; } + + // Wallet Commands: if let Ok(command) = commands.try_recv() { let synced = Self::all_synced(&source, &mut state, &wallet).is_some(); + if synced { + wallet_progress = WalletProgressUpdate::Complete; + } + Self::wallet_handle_commands( network, &source, &mut state, &mut wallet, command, - synced, + wallet_progress, )?; } + + // Compact Filter Sync: + if let Some(cbf_sync) = cbf_sync.as_mut() { + cbf_sync.sync_next(&mut wallet, &source, &mut wallet_progress)?; + + // Once compact filter sync is complete + // start the block fetcher + if cbf_sync.synced() { + wallet_tip = { + let tip = wallet.local_chain().tip(); + ChainAnchor { + height: tip.height(), + hash: tip.hash(), + } + }; + fetcher.start(wallet_tip); + } + continue; + } + + // Block fetcher events: if let Ok(event) = receiver.try_recv() { match event { BlockEvent::Tip(_) => { @@ -567,6 +645,7 @@ impl RpcWallet { wallet_tip.height = id.height; wallet_tip.hash = id.hash; + info!("wallet({}): block={} height={}", wallet.name(), wallet_tip.hash, wallet_tip.height); if id.height % 12 == 0 { wallet.commit()?; } @@ -1182,6 +1261,7 @@ impl RpcWallet { mut channel: Receiver, shutdown: broadcast::Sender<()>, num_workers: usize, + cbf: bool, ) -> anyhow::Result<()> { let mut shutdown_signal = shutdown.subscribe(); let mut wallet_results = FuturesUnordered::new(); @@ -1212,7 +1292,8 @@ impl RpcWallet { wallet, loaded.rx, wallet_shutdown, - num_workers + num_workers, + cbf )); } Err(err) => { @@ -1239,7 +1320,7 @@ impl RpcWallet { Ok(()) } - pub async fn send_get_info(&self) -> anyhow::Result { + pub async fn send_get_info(&self) -> anyhow::Result { let (resp, resp_rx) = oneshot::channel(); self.sender.send(WalletCommand::GetInfo { resp }).await?; resp_rx.await? @@ -1412,3 +1493,5 @@ async fn named_future( ) -> (String, Result) { (name, rx.await) } + + diff --git a/testutil/src/lib.rs b/testutil/src/lib.rs index 0d2188a..454b786 100644 --- a/testutil/src/lib.rs +++ b/testutil/src/lib.rs @@ -170,7 +170,7 @@ impl TestRig { .expect("handle")? as u32; let info = self.spaced.client.wallet_get_info(wallet_name).await?; - if count == info.tip { + if count == info.info.tip { return Ok(()); } tokio::time::sleep(Duration::from_millis(100)).await; diff --git a/wallet/src/lib.rs b/wallet/src/lib.rs index 0230c09..daa2276 100644 --- a/wallet/src/lib.rs +++ b/wallet/src/lib.rs @@ -1,5 +1,4 @@ use std::{collections::BTreeMap, fmt::Debug, fs, ops::Mul, path::PathBuf, str::FromStr}; - use anyhow::{anyhow, Context}; use bdk_wallet::{ chain, @@ -15,6 +14,7 @@ use bdk_wallet::{ AddressInfo, KeychainKind, LocalOutput, PersistedWallet, SignOptions, TxBuilder, Update, Wallet, WalletTx, WeightedUtxo, }; +use bdk_wallet::chain::keychain_txout::KeychainTxOutIndex; use bincode::config; use bitcoin::{ absolute::{Height, LockTime}, @@ -210,11 +210,11 @@ impl SpacesWallet { config.space_descriptors.external.clone(), config.space_descriptors.internal.clone(), ) - .lookahead(50) - .network(config.network) - .genesis_hash(genesis_hash) - .create_wallet(&mut conn) - .context("could not create wallet")? + .lookahead(50) + .network(config.network) + .genesis_hash(genesis_hash) + .create_wallet(&mut conn) + .context("could not create wallet")? }; let tx = conn @@ -232,6 +232,10 @@ impl SpacesWallet { Ok(wallet) } + pub fn spk_index(&self) -> &KeychainTxOutIndex { + self.internal.spk_index() + } + pub fn balance(&mut self) -> anyhow::Result { let unspent = self.list_unspent(); let balance = self.internal.balance(); @@ -277,7 +281,7 @@ impl SpacesWallet { }) } - pub fn transactions(&self) -> impl Iterator + '_ { + pub fn transactions(&self) -> impl Iterator + '_ { self.internal .transactions() .filter(|tx| !is_revert_tx(tx) && self.internal.spk_index().is_tx_relevant(&tx.tx_node)) @@ -375,11 +379,11 @@ impl SpacesWallet { self.internal.is_mine(script) } - pub fn list_unspent(&self) -> impl Iterator + '_ { + pub fn list_unspent(&self) -> impl Iterator + '_ { self.internal.list_unspent() } - pub fn list_output(&self) -> impl Iterator + '_ { + pub fn list_output(&self) -> impl Iterator + '_ { self.internal.list_output() } @@ -610,6 +614,14 @@ impl SpacesWallet { Ok(()) } + pub fn apply_update( + &mut self, + update: impl Into, + ) -> Result<(), CannotConnectError> { + self.internal + .apply_update(update) + } + pub fn apply_unconfirmed_tx(&mut self, tx: Transaction, seen: u64) { self.internal.apply_unconfirmed_txs(vec![(tx, seen)]); } @@ -640,7 +652,7 @@ impl SpacesWallet { event.previous_spaceout, event.details, ) - .context("could not insert tx event into wallet db")?; + .context("could not insert tx event into wallet db")?; } db_tx .commit() @@ -742,7 +754,7 @@ impl SpacesWallet { signature: listing.signature, sighash_type: TapSighashType::SinglePlusAnyoneCanPay, } - .to_vec(), + .to_vec(), ); let funded_psbt = { @@ -1192,7 +1204,7 @@ impl SpacesWallet { signature, sighash_type, } - .to_vec(), + .to_vec(), ); witness.push(&signing_info.script); witness.push(&signing_info.control_block.serialize()); @@ -1213,6 +1225,10 @@ impl SpacesWallet { let info = TxEvent::get_signing_info(&db_tx, previous_output.txid, script)?; Ok(info) } + + pub fn peek_address(&self, keychain_kind: KeychainKind, index: u32) -> AddressInfo { + self.internal.peek_address(keychain_kind, index) + } } #[derive(Debug)]