diff --git a/Cargo.toml b/Cargo.toml index d8e963a..ae4927d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,13 +13,12 @@ anyhow = "1.0" serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.107" bitcoin = { version = "0.31.1", features = ["serde", "rand", "base64"] } -rayon = "1.10.0" -futures = "0.3" log = "0.4" -async-trait = "0.1" -reqwest = { version = "0.12.4", features = ["rustls-tls", "gzip", "json"], default-features = false, optional = true } hex = { version = "0.4.3", features = ["serde"], optional = true } bdk_coin_select = "0.4.0" +minreq = { version = "2.14.1", features = ["https-bundled-probe", "json-using-serde", "serde_json"], optional = true } +url = "2.5.7" [features] -blindbit-backend = ["reqwest", "hex"] +default = ["blindbit-backend"] +blindbit-backend = ["minreq", "hex"] diff --git a/src/backend/backend.rs b/src/backend/backend.rs index d1892f0..c754f34 100644 --- a/src/backend/backend.rs +++ b/src/backend/backend.rs @@ -1,24 +1,21 @@ -use std::{ops::RangeInclusive, pin::Pin}; +use std::{ops::RangeInclusive, sync::mpsc}; use anyhow::Result; -use async_trait::async_trait; use bitcoin::{absolute::Height, Amount}; -use futures::Stream; use super::structs::{BlockData, SpentIndexData, UtxoData}; -#[async_trait] pub trait ChainBackend { fn get_block_data_for_range( &self, range: RangeInclusive, dust_limit: Amount, with_cutthrough: bool, - ) -> Pin> + Send>>; + ) -> mpsc::Receiver>; - async fn spent_index(&self, block_height: Height) -> Result; + fn spent_index(&self, block_height: Height) -> Result; - async fn utxos(&self, block_height: Height) -> Result>; + fn utxos(&self, block_height: Height) -> Result>; - async fn block_height(&self) -> Result; + fn block_height(&self) -> Result; } diff --git a/src/backend/blindbit/backend/backend.rs b/src/backend/blindbit/backend/backend.rs index ba47e70..9811e09 100644 --- a/src/backend/blindbit/backend/backend.rs +++ b/src/backend/blindbit/backend/backend.rs @@ -1,12 +1,13 @@ -use std::{ops::RangeInclusive, pin::Pin, sync::Arc}; +use std::{ops::RangeInclusive, sync::mpsc, thread}; -use async_trait::async_trait; use bitcoin::{absolute::Height, Amount}; -use futures::{stream, Stream, StreamExt}; use anyhow::Result; -use crate::{backend::blindbit::BlindbitClient, BlockData, ChainBackend, SpentIndexData, UtxoData}; +use crate::{ + backend::blindbit::BlindbitClient, utils::ThreadPool, BlockData, ChainBackend, SpentIndexData, + UtxoData, +}; const CONCURRENT_FILTER_REQUESTS: usize = 200; @@ -23,7 +24,44 @@ impl BlindbitBackend { } } -#[async_trait] +macro_rules! request { + ($result: expr, $sender: ident) => { + match $result { + Ok(r) => r, + Err(e) => { + $sender.send(Err(e)).unwrap(); + return; + } + } + }; +} + +fn get_block_data( + client: BlindbitClient, + sender: mpsc::Sender>, + block_height: Height, + dust_limit: Amount, + with_cutthrough: bool, +) { + // + let tweaks = match with_cutthrough { + true => request!(client.tweaks(block_height, dust_limit), sender), + false => request!(client.tweak_index(block_height, dust_limit), sender), + }; + let new_utxo_filter = request!(client.filter_new_utxos(block_height), sender); + let spent_filter = request!(client.filter_spent(block_height), sender); + let blkhash = new_utxo_filter.block_hash; + sender + .send(Ok(BlockData { + blkheight: block_height, + blkhash, + tweaks, + new_utxo_filter: new_utxo_filter.into(), + spent_filter: spent_filter.into(), + })) + .unwrap() +} + impl ChainBackend for BlindbitBackend { /// High-level function to get block data for a range of blocks. /// Block data includes all the information needed to determine if a block is relevant for scanning, @@ -34,51 +72,47 @@ impl ChainBackend for BlindbitBackend { range: RangeInclusive, dust_limit: Amount, with_cutthrough: bool, - ) -> Pin> + Send>> { - let client = Arc::new(self.client.clone()); + ) -> mpsc::Receiver> { + let client = self.client.clone(); + let (sender, receiver) = mpsc::channel::>(); - let res = stream::iter(range) - .map(move |n| { - let client = client.clone(); + thread::spawn(move || { + let pool = ThreadPool::new(CONCURRENT_FILTER_REQUESTS); - async move { - let blkheight = Height::from_consensus(n)?; - let tweaks = match with_cutthrough { - true => client.tweaks(blkheight, dust_limit).await?, - false => client.tweak_index(blkheight, dust_limit).await?, - }; - let new_utxo_filter = client.filter_new_utxos(blkheight).await?; - let spent_filter = client.filter_spent(blkheight).await?; - let blkhash = new_utxo_filter.block_hash; - Ok(BlockData { - blkheight, - blkhash, - tweaks, - new_utxo_filter: new_utxo_filter.into(), - spent_filter: spent_filter.into(), - }) - } - }) - .buffered(CONCURRENT_FILTER_REQUESTS); + for block_height in range { + let block_height = match Height::from_consensus(block_height) { + Ok(r) => r, + Err(e) => { + sender.send(Err(e.into())).unwrap(); + // NOTE: as we return here, the pool will be dropped + return; + } + }; + let client = client.clone(); + let sender = sender.clone(); + pool.execute(move || { + get_block_data(client, sender, block_height, dust_limit, with_cutthrough); + }); + } + }); - Box::pin(res) + receiver } - async fn spent_index(&self, block_height: Height) -> Result { - self.client.spent_index(block_height).await.map(Into::into) + fn spent_index(&self, block_height: Height) -> Result { + self.client.spent_index(block_height).map(Into::into) } - async fn utxos(&self, block_height: Height) -> Result> { + fn utxos(&self, block_height: Height) -> Result> { Ok(self .client - .utxos(block_height) - .await? + .utxos(block_height)? .into_iter() .map(Into::into) .collect()) } - async fn block_height(&self) -> Result { - self.client.block_height().await + fn block_height(&self) -> Result { + self.client.block_height() } } diff --git a/src/backend/blindbit/client/client.rs b/src/backend/blindbit/client/client.rs index 8b1e730..f3df197 100644 --- a/src/backend/blindbit/client/client.rs +++ b/src/backend/blindbit/client/client.rs @@ -1,7 +1,5 @@ -use std::time::Duration; - use bitcoin::{absolute::Height, secp256k1::PublicKey, Amount, Txid}; -use reqwest::{Client, Url}; +use url::Url; use anyhow::Result; @@ -13,116 +11,95 @@ use super::structs::{ #[derive(Clone, Debug)] pub struct BlindbitClient { - client: Client, host_url: Url, } impl BlindbitClient { pub fn new(host_url: String) -> Result { let mut host_url = Url::parse(&host_url)?; - let client = reqwest::Client::new(); // we need a trailing slash, if not present we append it if !host_url.path().ends_with('/') { host_url.set_path(&format!("{}/", host_url.path())); } - Ok(BlindbitClient { client, host_url }) + Ok(BlindbitClient { host_url }) } - pub async fn block_height(&self) -> Result { + pub fn block_height(&self) -> Result { let url = self.host_url.join("block-height")?; - let res = self - .client - .get(url) - .timeout(Duration::from_secs(5)) - .send() - .await?; - let blkheight: BlockHeightResponse = serde_json::from_str(&res.text().await?)?; + let res = minreq::get(url).with_timeout(5).send()?; + let blkheight: BlockHeightResponse = res.json()?; Ok(blkheight.block_height) } - pub async fn tweaks(&self, block_height: Height, dust_limit: Amount) -> Result> { - let url = self.host_url.join(&format!("tweaks/{}", block_height))?; + pub fn tweaks(&self, block_height: Height, dust_limit: Amount) -> Result> { + let mut url = self.host_url.join(&format!("tweaks/{}", block_height))?; + + url.set_query(Some(&format!("dustLimit={}", dust_limit.to_sat()))); - let res = self - .client - .get(url) - .query(&[("dustLimit", format!("{}", dust_limit.to_sat()))]) - .send() - .await?; - Ok(serde_json::from_str(&res.text().await?)?) + let res = minreq::get(url).send()?; + Ok(res.json()?) } - pub async fn tweak_index( - &self, - block_height: Height, - dust_limit: Amount, - ) -> Result> { - let url = self + pub fn tweak_index(&self, block_height: Height, dust_limit: Amount) -> Result> { + let mut url = self .host_url .join(&format!("tweak-index/{}", block_height))?; + url.set_query(Some(&format!("dustLimit={}", dust_limit.to_sat()))); - let res = self - .client - .get(url) - .query(&[("dustLimit", format!("{}", dust_limit.to_sat()))]) - .send() - .await?; - Ok(serde_json::from_str(&res.text().await?)?) + let res = minreq::get(url).send()?; + Ok(res.json()?) } - pub async fn utxos(&self, block_height: Height) -> Result> { + pub fn utxos(&self, block_height: Height) -> Result> { let url = self.host_url.join(&format!("utxos/{}", block_height))?; - let res = self.client.get(url).send().await?; - - Ok(serde_json::from_str(&res.text().await?)?) + let res = minreq::get(url).send()?; + Ok(res.json()?) } - pub async fn spent_index(&self, block_height: Height) -> Result { + pub fn spent_index(&self, block_height: Height) -> Result { let url = self .host_url .join(&format!("spent-index/{}", block_height))?; - let res = self.client.get(url).send().await?; - - Ok(serde_json::from_str(&res.text().await?)?) + let res = minreq::get(url).send()?; + Ok(res.json()?) } - pub async fn filter_new_utxos(&self, block_height: Height) -> Result { + pub fn filter_new_utxos(&self, block_height: Height) -> Result { let url = self .host_url .join(&format!("filter/new-utxos/{}", block_height))?; - let res = self.client.get(url).send().await?; - - Ok(serde_json::from_str(&res.text().await?)?) + let res = minreq::get(url).send()?; + Ok(res.json()?) } - pub async fn filter_spent(&self, block_height: Height) -> Result { + pub fn filter_spent(&self, block_height: Height) -> Result { let url = self .host_url .join(&format!("filter/spent/{}", block_height))?; - - let res = self.client.get(url).send().await?; - - Ok(serde_json::from_str(&res.text().await?)?) + let res = minreq::get(url).send()?; + Ok(res.json()?) } - pub async fn forward_tx(&self, tx_hex: String) -> Result { + pub fn forward_tx(&self, tx_hex: String) -> Result { let url = self.host_url.join("forward-tx")?; let body = ForwardTxRequest::new(tx_hex); - let res = self.client.post(url).json(&body).send().await?; - - Ok(serde_json::from_str(&res.text().await?)?) + let res = minreq::post(url.as_str()) + .with_body(serde_json::to_string(&body)?) + .with_header("Content-Type", "application/json") + .send()?; + Ok(res.json()?) } - pub async fn info(&self) -> Result { + pub fn info(&self) -> Result { let url = self.host_url.join("info")?; - let res = self.client.get(url).send().await?; - Ok(serde_json::from_str(&res.text().await?)?) + let res = minreq::get(url).send()?; + Ok(res.json()?) } } diff --git a/src/client/client.rs b/src/client/client.rs index 0a4c1d0..7952d7d 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -1,3 +1,4 @@ +use std::sync::mpsc; use std::{collections::HashMap, io::Write, str::FromStr}; use bitcoin::hashes::Hash; @@ -19,6 +20,7 @@ use silentpayments::{ use anyhow::{Error, Result}; use crate::constants::NUMS; +use crate::utils::ThreadPool; use super::SpendKey; @@ -108,25 +110,37 @@ impl SpClient { &self, tweak_data_vec: Vec, ) -> Result> { - use rayon::prelude::*; - let b_scan = &self.get_scan_key(); - - let shared_secrets: Vec = tweak_data_vec - .into_par_iter() - .map(|tweak| sp_utils::receiving::calculate_ecdh_shared_secret(&tweak, b_scan)) - .collect(); - - let items: Result> = shared_secrets - .into_par_iter() - .map(|secret| { - let spks = self.sp_receiver.get_spks_from_shared_secret(&secret)?; + let b_scan = self.get_scan_key(); + + let pool = ThreadPool::new(20); + // TODO: maybe create a receiver pool to avoid cloning too much + + fn process_spks_maps( + tweak: PublicKey, + b_scan: SecretKey, + sender: mpsc::Sender<(PublicKey, Vec<[u8; 34]>)>, + sp_receiver: Receiver, + ) { + let secret = sp_utils::receiving::calculate_ecdh_shared_secret(&tweak, &b_scan); + let values = sp_receiver + .get_spks_from_shared_secret(&secret) + .unwrap() + .into_values() + .collect(); + sender.send((secret, values)).unwrap() + } - Ok((secret, spks.into_values())) - }) - .collect(); + let len = tweak_data_vec.len(); + let (sender, receiver) = mpsc::channel(); + for tweak in tweak_data_vec { + let sender = sender.clone(); + let sp_receiver = self.sp_receiver.clone(); + pool.execute(move || process_spks_maps(tweak, b_scan, sender, sp_receiver)); + } let mut res = HashMap::new(); - for (secret, spks) in items? { + for _ in 0..len { + let (secret, spks) = receiver.recv().unwrap(); for spk in spks { res.insert(spk, secret); } diff --git a/src/client/spend.rs b/src/client/spend.rs index bb14afe..240b5f9 100644 --- a/src/client/spend.rs +++ b/src/client/spend.rs @@ -41,7 +41,7 @@ impl SpClient { .iter() .any(|(_, o)| o.spend_status != OutputSpendStatus::Unspent) { - return Err(Error::msg(format!("All outputs must be unspent"))); + return Err(Error::msg("All outputs must be unspent".to_string())); } // used to estimate the size of a taproot output @@ -98,7 +98,7 @@ impl SpClient { ))) } else { let mut op_return = PushBytesBuf::with_capacity(data_len); - op_return.extend_from_slice(&data)?; + op_return.extend_from_slice(data)?; let script_pubkey = ScriptBuf::new_op_return(op_return); Ok(TxOut { @@ -145,8 +145,7 @@ impl SpClient { let change = coin_selector.drain(target, change_policy); let change_value = if change.is_some() { change.value } else { 0 }; if change_value > 0 { - let change_address = - SilentPaymentAddress::try_from(self.sp_receiver.get_change_address())?; + let change_address = self.sp_receiver.get_change_address(); recipients.push(Recipient { address: RecipientAddress::SpAddress(change_address), amount: Amount::from_sat(change_value), @@ -177,7 +176,7 @@ impl SpClient { .iter() .any(|(_, o)| o.spend_status != OutputSpendStatus::Unspent) { - return Err(Error::msg(format!("All outputs must be unspent"))); + return Err(Error::msg("All outputs must be unspent".to_string())); } // used to estimate the size of a taproot output @@ -345,7 +344,7 @@ impl SpClient { ))); } let mut op_return = PushBytesBuf::with_capacity(data_len); - op_return.extend_from_slice(&data)?; + op_return.extend_from_slice(data)?; let script = ScriptBuf::new_op_return(op_return); Ok(TxOut { value: recipient.amount, diff --git a/src/client/structs.rs b/src/client/structs.rs index dccc587..f69741d 100644 --- a/src/client/structs.rs +++ b/src/client/structs.rs @@ -44,7 +44,7 @@ impl TryFrom for RecipientAddress { type Error = anyhow::Error; fn try_from(value: String) -> Result { if let Ok(sp_address) = SilentPaymentAddress::try_from(value.as_str()) { - Ok(Self::SpAddress(sp_address.into())) + Ok(Self::SpAddress(sp_address)) } else if let Ok(legacy_address) = Address::from_str(&value) { Ok(Self::LegacyAddress(legacy_address)) } else if let Ok(data) = Vec::from_hex(&value) { @@ -109,6 +109,7 @@ impl From<&SpendKey> for PublicKey { } } +#[allow(clippy::unconditional_recursion)] impl From for PublicKey { fn from(value: SpendKey) -> Self { value.into() diff --git a/src/lib.rs b/src/lib.rs index c055981..26d6428 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,10 @@ +#![allow(clippy::module_inception)] mod backend; mod client; pub mod constants; mod scanner; mod updater; +pub mod utils; pub use bdk_coin_select::FeeRate; pub use bitcoin; diff --git a/src/scanner/scanner.rs b/src/scanner/scanner.rs index 5ecc765..4775d3f 100644 --- a/src/scanner/scanner.rs +++ b/src/scanner/scanner.rs @@ -1,6 +1,6 @@ use std::{ collections::{HashMap, HashSet}, - sync::atomic::AtomicBool, + sync::{atomic::AtomicBool, mpsc}, time::{Duration, Instant}, }; @@ -12,7 +12,6 @@ use bitcoin::{ secp256k1::{PublicKey, Scalar}, Amount, BlockHash, OutPoint, Txid, XOnlyPublicKey, }; -use futures::{pin_mut, Stream, StreamExt}; use log::info; use silentpayments::receiving::Label; @@ -47,7 +46,7 @@ impl<'a> SpScanner<'a> { } } - pub async fn scan_blocks( + pub fn scan_blocks( &mut self, start: Height, end: Height, @@ -68,7 +67,7 @@ impl<'a> SpScanner<'a> { .get_block_data_for_range(range, dust_limit, with_cutthrough); // process blocks using block data stream - self.process_blocks(start, end, block_data_stream).await?; + self.process_blocks(start, end, block_data_stream)?; // time elapsed for the scan info!( @@ -79,17 +78,15 @@ impl<'a> SpScanner<'a> { Ok(()) } - async fn process_blocks( + fn process_blocks( &mut self, start: Height, end: Height, - block_data_stream: impl Stream>, + block_data_stream: mpsc::Receiver>, ) -> Result<()> { - pin_mut!(block_data_stream); - let mut update_time: Instant = Instant::now(); - while let Some(blockdata) = block_data_stream.next().await { + while let Ok(blockdata) = block_data_stream.recv() { let blockdata = blockdata?; let blkheight = blockdata.blkheight; let blkhash = blockdata.blkhash; @@ -107,7 +104,7 @@ impl<'a> SpScanner<'a> { save_to_storage = true; } - let (found_outputs, found_inputs) = self.process_block(blockdata).await?; + let (found_outputs, found_inputs) = self.process_block(blockdata)?; if !found_outputs.is_empty() { save_to_storage = true; @@ -133,7 +130,7 @@ impl<'a> SpScanner<'a> { Ok(()) } - async fn process_block( + fn process_block( &mut self, blockdata: BlockData, ) -> Result<(HashMap, HashSet)> { @@ -145,14 +142,12 @@ impl<'a> SpScanner<'a> { .. } = blockdata; - let outs = self - .process_block_outputs(blkheight, tweaks, new_utxo_filter) - .await?; + let outs = self.process_block_outputs(blkheight, tweaks, new_utxo_filter)?; // after processing outputs, we add the found outputs to our list self.owned_outpoints.extend(outs.keys()); - let ins = self.process_block_inputs(blkheight, spent_filter).await?; + let ins = self.process_block_inputs(blkheight, spent_filter)?; // after processing inputs, we remove the found inputs self.owned_outpoints.retain(|item| !ins.contains(item)); @@ -160,7 +155,7 @@ impl<'a> SpScanner<'a> { Ok((outs, ins)) } - async fn process_block_outputs( + fn process_block_outputs( &self, blkheight: Height, tweaks: Vec, @@ -183,7 +178,7 @@ impl<'a> SpScanner<'a> { //if match: fetch and scan utxos if matched_outputs { info!("matched outputs on: {}", blkheight); - let found = self.scan_utxos(blkheight, secrets_map).await?; + let found = self.scan_utxos(blkheight, secrets_map)?; if !found.is_empty() { for (label, utxo, tweak) in found { @@ -209,7 +204,7 @@ impl<'a> SpScanner<'a> { Ok(res) } - async fn process_block_inputs( + fn process_block_inputs( &self, blkheight: Height, spent_filter: FilterData, @@ -232,7 +227,7 @@ impl<'a> SpScanner<'a> { // if match: download spent data, collect the outpoints that are spent if matched_inputs { info!("matched inputs on: {}", blkheight); - let spent = self.backend.spent_index(blkheight).await?.data; + let spent = self.backend.spent_index(blkheight)?.data; for spent in spent { let hex: &[u8] = spent.as_ref(); @@ -245,12 +240,12 @@ impl<'a> SpScanner<'a> { Ok(res) } - async fn scan_utxos( + fn scan_utxos( &self, blkheight: Height, secrets_map: HashMap<[u8; 34], PublicKey>, ) -> Result, UtxoData, Scalar)>> { - let utxos = self.backend.utxos(blkheight).await?; + let utxos = self.backend.utxos(blkheight)?; let mut res: Vec<(Option