From 76708c28366465df48b4f57c98e8b8c8f1c01627 Mon Sep 17 00:00:00 2001 From: Buffrr Date: Tue, 15 Apr 2025 11:01:31 -0700 Subject: [PATCH 1/3] Refactor sync checks --- client/src/client.rs | 4 +-- client/src/config.rs | 4 +++ client/src/rpc.rs | 47 +++++++++++++++++++++------- client/src/source.rs | 72 ++++++++++++++++++++++++++++++++++--------- client/src/sync.rs | 6 +++- client/src/wallets.rs | 37 +++++++++++++++++----- wallet/src/lib.rs | 2 ++ 7 files changed, 136 insertions(+), 36 deletions(-) diff --git a/client/src/client.rs b/client/src/client.rs index 61b943c..efa59ec 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -14,7 +14,7 @@ use spaces_protocol::{ validate::{TxChangeSet, UpdateKind, Validator}, Bytes, Covenant, FullSpaceOut, RevokeReason, SpaceOut, }; -use spaces_wallet::bitcoin::Transaction; +use spaces_wallet::bitcoin::{Network, Transaction}; use crate::{ source::BitcoinRpcError, @@ -27,7 +27,7 @@ pub trait BlockSource { fn get_median_time(&self) -> Result; fn in_mempool(&self, txid: &Txid, height: u32) -> Result; fn get_block_count(&self) -> Result; - fn get_best_chain(&self) -> Result; + fn get_best_chain(&self, tip: Option, expected_chain: Network) -> Result, BitcoinRpcError>; } #[derive(Debug, Clone)] diff --git a/client/src/config.rs b/client/src/config.rs index 9d95a62..77be6f8 100644 --- a/client/src/config.rs +++ b/client/src/config.rs @@ -79,6 +79,9 @@ pub struct Args { /// Skip maintaining historical root anchors #[arg(long, env = "SPACED_SKIP_ANCHORS", default_value = "false")] skip_anchors: bool, + /// The specified Bitcoin RPC is a light client + #[arg(long, env = "SPACED_BITCOIN_RPC_LIGHT", default_value = "false")] + bitcoin_rpc_light: bool, } #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, ValueEnum, Serialize, Deserialize)] @@ -164,6 +167,7 @@ impl Args { let rpc = BitcoinRpc::new( &args.bitcoin_rpc_url.expect("bitcoin rpc url"), bitcoin_rpc_auth, + !args.bitcoin_rpc_light ); let genesis = Spaced::genesis(&rpc, args.chain).await?; diff --git a/client/src/rpc.rs b/client/src/rpc.rs index e6d4317..4593fd0 100644 --- a/client/src/rpc.rs +++ b/client/src/rpc.rs @@ -66,8 +66,9 @@ pub(crate) type Responder = oneshot::Sender; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ServerInfo { - pub chain: ExtendedNetwork, + pub chain: String, pub tip: ChainAnchor, + pub progress: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -99,8 +100,8 @@ pub enum ChainStateCommand { txs: Vec, resp: Responder>>>, }, - GetTip { - resp: Responder>, + GetServerInfo { + resp: Responder>, }, GetSpace { hash: SpaceKey, @@ -723,13 +724,12 @@ impl RpcServerImpl { #[async_trait] impl RpcServer for RpcServerImpl { async fn get_server_info(&self) -> Result { - let chain = self.wallet_manager.network; - let tip = self + let info = self .store - .get_tip() + .get_server_info() .await .map_err(|error| ErrorObjectOwned::owned(-1, error.to_string(), None::))?; - Ok(ServerInfo { chain, tip }) + Ok(info) } async fn get_space( @@ -1092,6 +1092,7 @@ impl AsyncChainState { .find(|tx| &tx.changeset.txid == txid)) } + async fn get_indexed_block( index: &mut Option, height_or_hash: HeightOrHash, @@ -1173,9 +1174,9 @@ impl AsyncChainState { let result = emulator.apply_package(tip.height + 1, txs); let _ = resp.send(result); } - ChainStateCommand::GetTip { resp } => { + ChainStateCommand::GetServerInfo { resp } => { let tip = chain_state.tip.read().expect("read meta").clone(); - _ = resp.send(Ok(tip)) + _ = resp.send(get_server_info(client, rpc, tip).await) } ChainStateCommand::GetSpace { hash, resp } => { let result = chain_state.get_space_info(&hash); @@ -1498,9 +1499,9 @@ impl AsyncChainState { resp_rx.await? } - pub async fn get_tip(&self) -> anyhow::Result { + pub async fn get_server_info(&self) -> anyhow::Result { let (resp, resp_rx) = oneshot::channel(); - self.sender.send(ChainStateCommand::GetTip { resp }).await?; + self.sender.send(ChainStateCommand::GetServerInfo { resp }).await?; resp_rx.await? } @@ -1561,3 +1562,27 @@ fn get_space_key(space_or_hash: &str) -> Result { Ok(SpaceKey::from(hash)) } + + +async fn get_server_info(client: &reqwest::Client, rpc: &BitcoinRpc, tip: ChainAnchor) -> anyhow::Result { + #[derive(Deserialize)] + struct Info { + pub chain: String, + pub headers: u32, + } + + let info: Info = rpc + .send_json(client, &rpc.get_blockchain_info()) + .await + .map_err(|e| anyhow!("Could not retrieve blockchain info ({})", e))?; + + Ok(ServerInfo { + chain: info.chain, + tip, + progress: if info.headers >= tip.height { + Some(tip.height as f32 / info.headers as f32) + } else { + None + } + }) +} \ No newline at end of file diff --git a/client/src/source.rs b/client/src/source.rs index ec91e91..cbabe44 100644 --- a/client/src/source.rs +++ b/client/src/source.rs @@ -12,7 +12,7 @@ use std::{ use base64::Engine; use bitcoin::{Block, BlockHash, Txid}; use hex::FromHexError; -use log::error; +use log::{error, warn}; use reqwest::StatusCode; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::Value; @@ -20,7 +20,7 @@ use spaces_protocol::constants::ChainAnchor; use spaces_wallet::{bitcoin, bitcoin::Transaction}; use threadpool::ThreadPool; use tokio::time::Instant; - +use spaces_protocol::bitcoin::Network; use crate::{client::BlockSource, std_wait}; const BITCOIN_RPC_IN_WARMUP: i32 = -28; // Client still warming up @@ -34,9 +34,11 @@ pub struct BitcoinRpc { id: Arc, auth_token: Option, url: String, + legacy: bool } pub struct BlockFetcher { + chain: Network, src: BitcoinBlockSource, job_id: Arc, sender: std::sync::mpsc::SyncSender, @@ -121,18 +123,23 @@ trait ErrorForRpcBlocking { } impl BitcoinRpc { - pub fn new(url: &str, auth: BitcoinRpcAuth) -> Self { + pub fn new(url: &str, auth: BitcoinRpcAuth, legacy: bool) -> Self { Self { id: Default::default(), auth_token: auth.to_token(), url: url.to_string(), + legacy, } } - pub fn make_request(&self, method: &str, params: serde_json::Value) -> BitcoinRpcRequest { + pub fn make_request(&self, method: &str, params: Value) -> BitcoinRpcRequest { let id = self.id.fetch_add(1, Ordering::Relaxed); let body = serde_json::json!({ - "jsonrpc": "1.0", + "jsonrpc": if self.legacy { + "1.0" + } else { + "2.0" + }, "id": id.to_string(), "method": method, "params": params, @@ -381,12 +388,14 @@ impl BitcoinRpcAuth { impl BlockFetcher { pub fn new( + chain: Network, src: BitcoinBlockSource, num_workers: usize, ) -> (Self, std::sync::mpsc::Receiver) { let (tx, rx) = std::sync::mpsc::sync_channel(12); ( Self { + chain, src, job_id: Arc::new(AtomicUsize::new(0)), sender: tx, @@ -401,10 +410,15 @@ impl BlockFetcher { } fn should_sync( + expected_chain: Network, source: &BitcoinBlockSource, start: ChainAnchor, ) -> Result, BlockFetchError> { - let tip = source.get_best_chain()?; + let tip = match source.get_best_chain(Some(start.height), expected_chain)? { + Some(tip) => tip, + None => return Ok(None), + }; + if start.height > tip.height { return Err(BlockFetchError::BlockMismatch); } @@ -437,6 +451,7 @@ impl BlockFetcher { let current_task = self.job_id.clone(); let task_sender = self.sender.clone(); let num_workers = self.num_workers; + let chain = self.chain; _ = std::thread::spawn(move || { let mut last_check = Instant::now() - Duration::from_secs(2); @@ -451,7 +466,7 @@ impl BlockFetcher { } last_check = Instant::now(); - let tip = match BlockFetcher::should_sync(&task_src, checkpoint) { + let tip = match BlockFetcher::should_sync(chain, &task_src, checkpoint) { Ok(t) => t, Err(e) => { _ = task_sender.send(BlockEvent::Error(e)); @@ -872,21 +887,48 @@ impl BlockSource for BitcoinBlockSource { .send_json_blocking(&self.client, &self.rpc.get_block_count())?) } - fn get_best_chain(&self) -> Result { + fn get_best_chain(&self, tip: Option, expected_chain: Network) -> Result, BitcoinRpcError> { #[derive(Deserialize)] struct Info { - #[serde(rename = "blocks")] - height: u64, + pub chain: String, + pub blocks: u32, + pub headers: u32, #[serde(rename = "bestblockhash")] - hash: BlockHash, + pub best_block_hash: BlockHash, } let info: Info = self .rpc .send_json_blocking(&self.client, &self.rpc.get_blockchain_info())?; - Ok(ChainAnchor { - hash: info.hash, - height: info.height as _, - }) + let expected_chain = match expected_chain { + Network::Bitcoin => "main", + Network::Regtest => "regtest", + _ => "test" + }; + if info.chain != expected_chain { + warn!("Invalid chain from connected rpc node - expected {}, got {}", expected_chain, info.chain); + return Ok(None); + } + + let synced = info.headers == info.blocks; + let best_chain = if !synced { + let block_hash = self.get_block_hash(info.blocks)?; + ChainAnchor { + hash: block_hash, + height: info.blocks, + } + } else { + ChainAnchor { + hash: info.best_block_hash, + height: info.headers, + } + }; + + // If the source is still syncing, and we have a higher tip, wait. + if !synced && tip.is_some_and(|tip| tip > info.blocks) { + return Ok(None); + } + + Ok(Some(best_chain)) } } diff --git a/client/src/sync.rs b/client/src/sync.rs index 8636051..0d05a5d 100644 --- a/client/src/sync.rs +++ b/client/src/sync.rs @@ -189,7 +189,11 @@ impl Spaced { start_block.hash, start_block.height ); - let (fetcher, receiver) = BlockFetcher::new(source.clone(), self.num_workers); + let (fetcher, receiver) = BlockFetcher::new( + self.network.fallback_network(), + source.clone(), + self.num_workers, + ); fetcher.start(start_block); let mut shutdown_signal = shutdown.subscribe(); diff --git a/client/src/wallets.rs b/client/src/wallets.rs index e4b5f5e..17bf202 100644 --- a/client/src/wallets.rs +++ b/client/src/wallets.rs @@ -357,7 +357,20 @@ impl RpcWallet { synced: bool, ) -> anyhow::Result<()> { match command { - WalletCommand::GetInfo { resp } => _ = resp.send(Ok(wallet.get_info())), + WalletCommand::GetInfo { resp } =>{ + let mut wallet_info = wallet.get_info(); + let best_chain = source + .get_best_chain(Some(wallet_info.tip), wallet.config.network); + if let Ok(Some(best_chain)) = best_chain { + wallet_info.progress = if best_chain.height >= wallet_info.tip { + Some(wallet_info.tip as f32 / best_chain.height as f32) + } else { + None + } + } + + _ = resp.send(Ok(wallet_info)) + }, WalletCommand::BatchTx { request, resp } => { if !synced && !request.force { _ = resp.send(Err(anyhow::anyhow!("Wallet is syncing"))); @@ -462,14 +475,17 @@ impl RpcWallet { protocol: &mut LiveSnapshot, wallet: &SpacesWallet, ) -> Option { - let bitcoin_tip = match bitcoin.get_best_chain() { - Ok(tip) => tip, + let wallet_tip = wallet.local_chain().tip(); + + let bitcoin_tip = match bitcoin.get_best_chain(Some(wallet_tip.height()), wallet.config.network) { + Ok(Some(tip)) => tip, + Ok(None) => return None, Err(e) => { warn!("Sync check failed: {}", e); return None; } }; - let wallet_tip = wallet.local_chain().tip(); + let protocol_tip = match protocol.tip.read() { Ok(tip) => tip.clone(), Err(e) => { @@ -493,7 +509,9 @@ impl RpcWallet { shutdown: broadcast::Sender<()>, num_workers: usize, ) -> anyhow::Result<()> { - let (fetcher, receiver) = BlockFetcher::new(source.clone(), num_workers); + let (fetcher, receiver) = BlockFetcher::new(network.fallback_network(), + source.clone(), + num_workers); let mut wallet_tip = { let tip = wallet.local_chain().tip(); @@ -547,8 +565,13 @@ impl RpcWallet { } BlockEvent::Error(e) if matches!(e, BlockFetchError::BlockMismatch) => { let mut checkpoint_in_chain = None; - let best_chain = match source.get_best_chain() { - Ok(best) => best, + let best_chain = match source.get_best_chain(Some(wallet_tip.height), network.fallback_network()) { + Ok(Some(best)) => best, + Ok(None) => { + warn!("Waiting for source to sync"); + fetcher.restart(wallet_tip, &receiver); + continue; + } Err(error) => { warn!("Wallet error: {}", error); fetcher.restart(wallet_tip, &receiver); diff --git a/wallet/src/lib.rs b/wallet/src/lib.rs index 548d0fb..ccf2226 100644 --- a/wallet/src/lib.rs +++ b/wallet/src/lib.rs @@ -104,6 +104,7 @@ pub struct WalletInfo { pub start_block: u32, pub tip: u32, pub descriptors: Vec, + pub progress: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -589,6 +590,7 @@ impl SpacesWallet { start_block: self.config.start_block, tip: self.internal.local_chain().tip().height(), descriptors, + progress: None, } } From 42ed35fd8f09e2404b5eb8c339cf2e74150d5bed Mon Sep 17 00:00:00 2001 From: Buffrr Date: Wed, 16 Apr 2025 14:41:22 -0700 Subject: [PATCH 2/3] Update getserverinfo structure --- client/src/format.rs | 2 +- client/src/rpc.rs | 30 +++++++++++++++++++++--------- client/src/wallets.rs | 4 ++-- wallet/src/lib.rs | 4 ++-- 4 files changed, 26 insertions(+), 14 deletions(-) diff --git a/client/src/format.rs b/client/src/format.rs index 6e14512..fa11f81 100644 --- a/client/src/format.rs +++ b/client/src/format.rs @@ -152,7 +152,7 @@ pub fn print_list_unspent(utxos: Vec, format: Format) { pub fn print_server_info(info: ServerInfo, format: Format) { match format { Format::Text => { - println!("CHAIN: {}", info.chain); + println!("CHAIN: {}", info.network); println!(" Height {}", info.tip.height); println!(" Hash {}", info.tip.hash); } diff --git a/client/src/rpc.rs b/client/src/rpc.rs index 4593fd0..04b1c86 100644 --- a/client/src/rpc.rs +++ b/client/src/rpc.rs @@ -66,9 +66,16 @@ pub(crate) type Responder = oneshot::Sender; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ServerInfo { - pub chain: String, + pub network: String, pub tip: ChainAnchor, - pub progress: Option, + pub chain: ChainInfo, + pub progress: f32, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChainInfo { + blocks: u32, + headers: u32, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -1083,7 +1090,7 @@ impl AsyncChainState { rpc, chain_state, ) - .await?; + .await?; Ok(block .block_meta @@ -1205,7 +1212,7 @@ impl AsyncChainState { rpc, chain_state, ) - .await; + .await; let _ = resp.send(res); } ChainStateCommand::GetTxMeta { txid, resp } => { @@ -1267,7 +1274,7 @@ impl AsyncChainState { File::open(anchors_path) .or_else(|e| Err(anyhow!("Could not open anchors file: {}", e)))?, ) - .or_else(|e| Err(anyhow!("Could not read anchors file: {}", e)))?; + .or_else(|e| Err(anyhow!("Could not read anchors file: {}", e)))?; return Ok(anchors); } @@ -1569,6 +1576,7 @@ async fn get_server_info(client: &reqwest::Client, rpc: &BitcoinRpc, tip: ChainA struct Info { pub chain: String, pub headers: u32, + pub blocks: u32, } let info: Info = rpc @@ -1577,12 +1585,16 @@ async fn get_server_info(client: &reqwest::Client, rpc: &BitcoinRpc, tip: ChainA .map_err(|e| anyhow!("Could not retrieve blockchain info ({})", e))?; Ok(ServerInfo { - chain: info.chain, + network: info.chain, tip, + chain: ChainInfo { + blocks: info.blocks, + headers: info.headers, + }, progress: if info.headers >= tip.height { - Some(tip.height as f32 / info.headers as f32) + tip.height as f32 / info.headers as f32 } else { - None - } + 0.0 + }, }) } \ No newline at end of file diff --git a/client/src/wallets.rs b/client/src/wallets.rs index 17bf202..6bcf65c 100644 --- a/client/src/wallets.rs +++ b/client/src/wallets.rs @@ -363,9 +363,9 @@ impl RpcWallet { .get_best_chain(Some(wallet_info.tip), wallet.config.network); if let Ok(Some(best_chain)) = best_chain { wallet_info.progress = if best_chain.height >= wallet_info.tip { - Some(wallet_info.tip as f32 / best_chain.height as f32) + wallet_info.tip as f32 / best_chain.height as f32 } else { - None + 0.0 } } diff --git a/wallet/src/lib.rs b/wallet/src/lib.rs index ccf2226..0230c09 100644 --- a/wallet/src/lib.rs +++ b/wallet/src/lib.rs @@ -104,7 +104,7 @@ pub struct WalletInfo { pub start_block: u32, pub tip: u32, pub descriptors: Vec, - pub progress: Option, + pub progress: f32, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -590,7 +590,7 @@ impl SpacesWallet { start_block: self.config.start_block, tip: self.internal.local_chain().tip().height(), descriptors, - progress: None, + progress: 0.0, } } From 5f12d2ba3717a1df8c0ab9c30e631eec9ade5728 Mon Sep 17 00:00:00 2001 From: Buffrr Date: Wed, 16 Apr 2025 14:47:20 -0700 Subject: [PATCH 3/3] Fix tests --- client/src/format.rs | 7 ++++--- client/src/rpc.rs | 2 +- client/tests/fetcher_tests.rs | 4 +++- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/client/src/format.rs b/client/src/format.rs index fa11f81..1ddaa5c 100644 --- a/client/src/format.rs +++ b/client/src/format.rs @@ -152,9 +152,10 @@ pub fn print_list_unspent(utxos: Vec, format: Format) { pub fn print_server_info(info: ServerInfo, format: Format) { match format { Format::Text => { - println!("CHAIN: {}", info.network); - println!(" Height {}", info.tip.height); - println!(" Hash {}", info.tip.hash); + println!("Network: {}", info.network); + println!("Height {}", info.tip.height); + println!("Hash {}", info.tip.hash); + println!("Progress {:.2}%", info.progress * 100.0); } Format::Json => { println!("{}", serde_json::to_string_pretty(&info).unwrap()); diff --git a/client/src/rpc.rs b/client/src/rpc.rs index 04b1c86..dcfc133 100644 --- a/client/src/rpc.rs +++ b/client/src/rpc.rs @@ -1591,7 +1591,7 @@ async fn get_server_info(client: &reqwest::Client, rpc: &BitcoinRpc, tip: ChainA blocks: info.blocks, headers: info.headers, }, - progress: if info.headers >= tip.height { + progress: if info.headers != 0 && info.headers >= tip.height { tip.height as f32 / info.headers as f32 } else { 0.0 diff --git a/client/tests/fetcher_tests.rs b/client/tests/fetcher_tests.rs index 83cf911..f3b426f 100644 --- a/client/tests/fetcher_tests.rs +++ b/client/tests/fetcher_tests.rs @@ -8,6 +8,7 @@ use spaces_client::source::{ BitcoinBlockSource, BitcoinRpc, BitcoinRpcAuth, BlockEvent, BlockFetcher, }; use spaces_protocol::{bitcoin::BlockHash, constants::ChainAnchor}; +use spaces_protocol::bitcoin::Network; use spaces_testutil::TestRig; async fn setup(blocks: u64) -> Result<(TestRig, u64, BlockHash)> { @@ -27,8 +28,9 @@ fn test_block_fetching_from_bitcoin_rpc() -> Result<()> { let fetcher_rpc = BitcoinBlockSource::new(BitcoinRpc::new( &rig.bitcoind.rpc_url(), BitcoinRpcAuth::UserPass("user".to_string(), "password".to_string()), + true )); - let (fetcher, receiver) = BlockFetcher::new(fetcher_rpc.clone(), 8); + let (fetcher, receiver) = BlockFetcher::new(Network::Regtest, fetcher_rpc.clone(), 8); fetcher.start(ChainAnchor { hash, height: 0 }); let timeout = Duration::from_secs(5);