Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ anyhow = "1.0"
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.107"
bitcoin = { version = "0.31.1", features = ["serde", "rand", "base64"] }
rayon = "1.10.0"
futures = "0.3"
log = "0.4"
async-trait = "0.1"
reqwest = { version = "0.12.4", features = ["rustls-tls", "gzip", "json"], default-features = false, optional = true }
hex = { version = "0.4.3", features = ["serde"], optional = true }
bdk_coin_select = "0.4.0"
minreq = { version = "2.14.1", features = ["https-bundled-probe", "json-using-serde", "serde_json"], optional = true }
url = "2.5.7"

[features]
blindbit-backend = ["reqwest", "hex"]
default = ["blindbit-backend"]
blindbit-backend = ["minreq", "hex"]
13 changes: 5 additions & 8 deletions src/backend/backend.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
use std::{ops::RangeInclusive, pin::Pin};
use std::{ops::RangeInclusive, sync::mpsc};

use anyhow::Result;
use async_trait::async_trait;
use bitcoin::{absolute::Height, Amount};
use futures::Stream;

use super::structs::{BlockData, SpentIndexData, UtxoData};

#[async_trait]
pub trait ChainBackend {
fn get_block_data_for_range(
&self,
range: RangeInclusive<u32>,
dust_limit: Amount,
with_cutthrough: bool,
) -> Pin<Box<dyn Stream<Item = Result<BlockData>> + Send>>;
) -> mpsc::Receiver<Result<BlockData>>;

async fn spent_index(&self, block_height: Height) -> Result<SpentIndexData>;
fn spent_index(&self, block_height: Height) -> Result<SpentIndexData>;

async fn utxos(&self, block_height: Height) -> Result<Vec<UtxoData>>;
fn utxos(&self, block_height: Height) -> Result<Vec<UtxoData>>;

async fn block_height(&self) -> Result<Height>;
fn block_height(&self) -> Result<Height>;
}
108 changes: 71 additions & 37 deletions src/backend/blindbit/backend/backend.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{ops::RangeInclusive, pin::Pin, sync::Arc};
use std::{ops::RangeInclusive, sync::mpsc, thread};

use async_trait::async_trait;
use bitcoin::{absolute::Height, Amount};
use futures::{stream, Stream, StreamExt};

use anyhow::Result;

use crate::{backend::blindbit::BlindbitClient, BlockData, ChainBackend, SpentIndexData, UtxoData};
use crate::{
backend::blindbit::BlindbitClient, utils::ThreadPool, BlockData, ChainBackend, SpentIndexData,
UtxoData,
};

const CONCURRENT_FILTER_REQUESTS: usize = 200;

Expand All @@ -23,7 +24,44 @@ impl BlindbitBackend {
}
}

#[async_trait]
macro_rules! request {
($result: expr, $sender: ident) => {
match $result {
Ok(r) => r,
Err(e) => {
$sender.send(Err(e)).unwrap();
return;
}
}
};
}

fn get_block_data(
client: BlindbitClient,
sender: mpsc::Sender<Result<BlockData>>,
block_height: Height,
dust_limit: Amount,
with_cutthrough: bool,
) {
//
let tweaks = match with_cutthrough {
true => request!(client.tweaks(block_height, dust_limit), sender),
false => request!(client.tweak_index(block_height, dust_limit), sender),
};
let new_utxo_filter = request!(client.filter_new_utxos(block_height), sender);
let spent_filter = request!(client.filter_spent(block_height), sender);
let blkhash = new_utxo_filter.block_hash;
sender
.send(Ok(BlockData {
blkheight: block_height,
blkhash,
tweaks,
new_utxo_filter: new_utxo_filter.into(),
spent_filter: spent_filter.into(),
}))
.unwrap()
}

impl ChainBackend for BlindbitBackend {
/// High-level function to get block data for a range of blocks.
/// Block data includes all the information needed to determine if a block is relevant for scanning,
Expand All @@ -34,51 +72,47 @@ impl ChainBackend for BlindbitBackend {
range: RangeInclusive<u32>,
dust_limit: Amount,
with_cutthrough: bool,
) -> Pin<Box<dyn Stream<Item = Result<BlockData>> + Send>> {
let client = Arc::new(self.client.clone());
) -> mpsc::Receiver<Result<BlockData>> {
let client = self.client.clone();
let (sender, receiver) = mpsc::channel::<Result<BlockData>>();

let res = stream::iter(range)
.map(move |n| {
let client = client.clone();
thread::spawn(move || {
let pool = ThreadPool::new(CONCURRENT_FILTER_REQUESTS);

async move {
let blkheight = Height::from_consensus(n)?;
let tweaks = match with_cutthrough {
true => client.tweaks(blkheight, dust_limit).await?,
false => client.tweak_index(blkheight, dust_limit).await?,
};
let new_utxo_filter = client.filter_new_utxos(blkheight).await?;
let spent_filter = client.filter_spent(blkheight).await?;
let blkhash = new_utxo_filter.block_hash;
Ok(BlockData {
blkheight,
blkhash,
tweaks,
new_utxo_filter: new_utxo_filter.into(),
spent_filter: spent_filter.into(),
})
}
})
.buffered(CONCURRENT_FILTER_REQUESTS);
for block_height in range {
let block_height = match Height::from_consensus(block_height) {
Ok(r) => r,
Err(e) => {
sender.send(Err(e.into())).unwrap();
// NOTE: as we return here, the pool will be dropped
return;
}
};
let client = client.clone();
let sender = sender.clone();
pool.execute(move || {
get_block_data(client, sender, block_height, dust_limit, with_cutthrough);
});
}
});

Box::pin(res)
receiver
}

async fn spent_index(&self, block_height: Height) -> Result<SpentIndexData> {
self.client.spent_index(block_height).await.map(Into::into)
fn spent_index(&self, block_height: Height) -> Result<SpentIndexData> {
self.client.spent_index(block_height).map(Into::into)
}

async fn utxos(&self, block_height: Height) -> Result<Vec<UtxoData>> {
fn utxos(&self, block_height: Height) -> Result<Vec<UtxoData>> {
Ok(self
.client
.utxos(block_height)
.await?
.utxos(block_height)?
.into_iter()
.map(Into::into)
.collect())
}

async fn block_height(&self) -> Result<Height> {
self.client.block_height().await
fn block_height(&self) -> Result<Height> {
self.client.block_height()
}
}
97 changes: 37 additions & 60 deletions src/backend/blindbit/client/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use std::time::Duration;

use bitcoin::{absolute::Height, secp256k1::PublicKey, Amount, Txid};
use reqwest::{Client, Url};
use url::Url;

use anyhow::Result;

Expand All @@ -13,116 +11,95 @@ use super::structs::{

#[derive(Clone, Debug)]
pub struct BlindbitClient {
client: Client,
host_url: Url,
}

impl BlindbitClient {
pub fn new(host_url: String) -> Result<Self> {
let mut host_url = Url::parse(&host_url)?;
let client = reqwest::Client::new();

// we need a trailing slash, if not present we append it
if !host_url.path().ends_with('/') {
host_url.set_path(&format!("{}/", host_url.path()));
}

Ok(BlindbitClient { client, host_url })
Ok(BlindbitClient { host_url })
}

pub async fn block_height(&self) -> Result<Height> {
pub fn block_height(&self) -> Result<Height> {
let url = self.host_url.join("block-height")?;

let res = self
.client
.get(url)
.timeout(Duration::from_secs(5))
.send()
.await?;
let blkheight: BlockHeightResponse = serde_json::from_str(&res.text().await?)?;
let res = minreq::get(url).with_timeout(5).send()?;
let blkheight: BlockHeightResponse = res.json()?;
Ok(blkheight.block_height)
}

pub async fn tweaks(&self, block_height: Height, dust_limit: Amount) -> Result<Vec<PublicKey>> {
let url = self.host_url.join(&format!("tweaks/{}", block_height))?;
pub fn tweaks(&self, block_height: Height, dust_limit: Amount) -> Result<Vec<PublicKey>> {
let mut url = self.host_url.join(&format!("tweaks/{}", block_height))?;

url.set_query(Some(&format!("dustLimit={}", dust_limit.to_sat())));

let res = self
.client
.get(url)
.query(&[("dustLimit", format!("{}", dust_limit.to_sat()))])
.send()
.await?;
Ok(serde_json::from_str(&res.text().await?)?)
let res = minreq::get(url).send()?;
Ok(res.json()?)
}

pub async fn tweak_index(
&self,
block_height: Height,
dust_limit: Amount,
) -> Result<Vec<PublicKey>> {
let url = self
pub fn tweak_index(&self, block_height: Height, dust_limit: Amount) -> Result<Vec<PublicKey>> {
let mut url = self
.host_url
.join(&format!("tweak-index/{}", block_height))?;
url.set_query(Some(&format!("dustLimit={}", dust_limit.to_sat())));

let res = self
.client
.get(url)
.query(&[("dustLimit", format!("{}", dust_limit.to_sat()))])
.send()
.await?;
Ok(serde_json::from_str(&res.text().await?)?)
let res = minreq::get(url).send()?;
Ok(res.json()?)
}

pub async fn utxos(&self, block_height: Height) -> Result<Vec<UtxoResponse>> {
pub fn utxos(&self, block_height: Height) -> Result<Vec<UtxoResponse>> {
let url = self.host_url.join(&format!("utxos/{}", block_height))?;
let res = self.client.get(url).send().await?;

Ok(serde_json::from_str(&res.text().await?)?)
let res = minreq::get(url).send()?;
Ok(res.json()?)
}

pub async fn spent_index(&self, block_height: Height) -> Result<SpentIndexResponse> {
pub fn spent_index(&self, block_height: Height) -> Result<SpentIndexResponse> {
let url = self
.host_url
.join(&format!("spent-index/{}", block_height))?;
let res = self.client.get(url).send().await?;

Ok(serde_json::from_str(&res.text().await?)?)
let res = minreq::get(url).send()?;
Ok(res.json()?)
}

pub async fn filter_new_utxos(&self, block_height: Height) -> Result<FilterResponse> {
pub fn filter_new_utxos(&self, block_height: Height) -> Result<FilterResponse> {
let url = self
.host_url
.join(&format!("filter/new-utxos/{}", block_height))?;

let res = self.client.get(url).send().await?;

Ok(serde_json::from_str(&res.text().await?)?)
let res = minreq::get(url).send()?;
Ok(res.json()?)
}

pub async fn filter_spent(&self, block_height: Height) -> Result<FilterResponse> {
pub fn filter_spent(&self, block_height: Height) -> Result<FilterResponse> {
let url = self
.host_url
.join(&format!("filter/spent/{}", block_height))?;

let res = self.client.get(url).send().await?;

Ok(serde_json::from_str(&res.text().await?)?)
let res = minreq::get(url).send()?;
Ok(res.json()?)
}

pub async fn forward_tx(&self, tx_hex: String) -> Result<Txid> {
pub fn forward_tx(&self, tx_hex: String) -> Result<Txid> {
let url = self.host_url.join("forward-tx")?;

let body = ForwardTxRequest::new(tx_hex);

let res = self.client.post(url).json(&body).send().await?;

Ok(serde_json::from_str(&res.text().await?)?)
let res = minreq::post(url.as_str())
.with_body(serde_json::to_string(&body)?)
.with_header("Content-Type", "application/json")
.send()?;
Ok(res.json()?)
}

pub async fn info(&self) -> Result<InfoResponse> {
pub fn info(&self) -> Result<InfoResponse> {
let url = self.host_url.join("info")?;

let res = self.client.get(url).send().await?;
Ok(serde_json::from_str(&res.text().await?)?)
let res = minreq::get(url).send()?;
Ok(res.json()?)
}
}
Loading
Loading