Skip to content

Commit df3735a

Browse files
committed
refactor: move postgres specific code to new module
1 parent aae892e commit df3735a

File tree

5 files changed

+567
-123
lines changed

5 files changed

+567
-123
lines changed
File renamed without changes.

src/lib.rs

Lines changed: 19 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -2,37 +2,22 @@
22
33
#![warn(missing_docs)]
44

5+
mod postgres;
6+
7+
#[cfg(test)]
8+
mod test;
9+
510
use std::future::Future;
611
use std::pin::Pin;
7-
use std::str::FromStr;
812
use std::sync::Arc;
913

10-
use bdk_chain::{
11-
local_chain, miniscript, tx_graph, Anchor, ConfirmationBlockTime, DescriptorExt, DescriptorId,
12-
Merge,
13-
};
14-
use bdk_wallet::bitcoin::{
15-
self,
16-
consensus::{self, Decodable},
17-
hashes::Hash,
18-
Amount, BlockHash, Network, OutPoint, ScriptBuf, TxOut, Txid,
19-
};
14+
use bdk_chain::miniscript;
15+
use bdk_wallet::bitcoin;
2016
use bdk_wallet::chain as bdk_chain;
21-
use bdk_wallet::descriptor::{Descriptor, DescriptorPublicKey, ExtendedDescriptor};
22-
use bdk_wallet::KeychainKind::{External, Internal};
23-
use bdk_wallet::{AsyncWalletPersister, ChangeSet, KeychainKind};
24-
use serde_json::json;
25-
use sqlx::migrate::Migrator;
26-
use sqlx::postgres::PgRow;
27-
use sqlx::{
28-
postgres::{PgPool, Postgres},
29-
FromRow, Pool, Row, Transaction,
30-
};
31-
use tokio::sync::Mutex;
32-
use tracing::info;
3317

34-
#[cfg(test)]
35-
mod test;
18+
use sqlx::Database;
19+
use sqlx::Pool;
20+
use tokio::sync::Mutex;
3621

3722
/// Crate error
3823
#[derive(Debug, thiserror::Error)]
@@ -53,76 +38,12 @@ pub enum BdkSqlxError {
5338

5439
/// Manages a pool of database connections.
5540
#[derive(Debug)]
56-
pub struct Store {
57-
pub(crate) pool: Arc<Mutex<Pool<Postgres>>>,
41+
pub struct Store<DB: Database> {
42+
pub(crate) pool: Arc<Mutex<Pool<DB>>>,
5843
wallet_name: String,
5944
migration: bool,
6045
}
6146

62-
impl Store {
63-
/// Construct a new [`Store`] with an existing pg connection.
64-
#[tracing::instrument]
65-
pub async fn new(
66-
pool: Arc<Mutex<Pool<Postgres>>>,
67-
wallet_name: Option<String>,
68-
migration: bool,
69-
) -> Result<Self, BdkSqlxError> {
70-
info!("new store");
71-
72-
let wallet_name = wallet_name.unwrap_or_else(|| "bdk_pg_wallet".to_string());
73-
74-
Ok(Self {
75-
pool,
76-
wallet_name,
77-
migration,
78-
})
79-
}
80-
81-
/// Construct a new [`Store`] without an existing pg connection.
82-
#[tracing::instrument]
83-
pub async fn new_with_url(
84-
url: String,
85-
wallet_name: Option<String>,
86-
) -> Result<Store, BdkSqlxError> {
87-
info!("new store with url");
88-
89-
let pool = PgPool::connect(url.as_str()).await?;
90-
let pool = Arc::new(Mutex::new(pool));
91-
let wallet_name = wallet_name.unwrap_or_else(|| "bdk_pg_wallet".to_string());
92-
93-
Ok(Self {
94-
pool,
95-
wallet_name,
96-
migration: true,
97-
})
98-
}
99-
}
100-
101-
impl AsyncWalletPersister for Store {
102-
type Error = BdkSqlxError;
103-
104-
#[tracing::instrument]
105-
fn initialize<'a>(store: &'a mut Self) -> FutureResult<'a, ChangeSet, Self::Error>
106-
where
107-
Self: 'a,
108-
{
109-
info!("initialize store");
110-
Box::pin(store.migrate_and_read())
111-
}
112-
113-
#[tracing::instrument]
114-
fn persist<'a>(
115-
store: &'a mut Self,
116-
changeset: &'a ChangeSet,
117-
) -> FutureResult<'a, (), Self::Error>
118-
where
119-
Self: 'a,
120-
{
121-
info!("persist store");
122-
Box::pin(store.write(changeset))
123-
}
124-
}
125-
12647
type FutureResult<'a, T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>;
12748

12849
impl Store {
@@ -159,7 +80,7 @@ impl Store {
15980
dbg!(&row);
16081

16182
if let Some(row) = row {
162-
Self::changeset_from_row(&mut tx, &mut changeset, row, &self.wallet_name).await?;
83+
Self::changeset_from_row(&mut tx, &mut changeset, row).await?;
16384
}
16485

16586
Ok(changeset)
@@ -170,7 +91,6 @@ impl Store {
17091
tx: &mut Transaction<'_, Postgres>,
17192
changeset: &mut ChangeSet,
17293
row: PgRow,
173-
wallet_name: &str,
17494
) -> Result<(), BdkSqlxError> {
17595
info!("changeset from row");
17696

@@ -200,8 +120,8 @@ impl Store {
200120
}
201121
}
202122

203-
changeset.tx_graph = tx_graph_changeset_from_postgres(tx, wallet_name).await?;
204-
changeset.local_chain = local_chain_changeset_from_postgres(tx, wallet_name).await?;
123+
changeset.tx_graph = tx_graph_changeset_from_postgres(tx).await?;
124+
changeset.local_chain = local_chain_changeset_from_postgres(tx).await?;
205125
Ok(())
206126
}
207127

@@ -318,14 +238,12 @@ async fn update_last_revealed(
318238
#[tracing::instrument]
319239
pub async fn tx_graph_changeset_from_postgres(
320240
db_tx: &mut Transaction<'_, Postgres>,
321-
wallet_name: &str,
322241
) -> Result<tx_graph::ChangeSet<ConfirmationBlockTime>, BdkSqlxError> {
323242
info!("tx graph changeset from postgres");
324243
let mut changeset = tx_graph::ChangeSet::default();
325244

326245
// Fetch transactions
327-
let rows = sqlx::query("SELECT txid, whole_tx, last_seen FROM tx WHERE wallet_name = $1")
328-
.bind(wallet_name)
246+
let rows = sqlx::query("SELECT txid, whole_tx, last_seen FROM tx")
329247
.fetch_all(&mut **db_tx)
330248
.await?;
331249

@@ -346,8 +264,7 @@ pub async fn tx_graph_changeset_from_postgres(
346264
}
347265

348266
// Fetch txouts
349-
let rows = sqlx::query("SELECT txid, vout, value, script FROM txout WHERE wallet_name = $1")
350-
.bind(wallet_name)
267+
let rows = sqlx::query("SELECT txid, vout, value, script FROM txout")
351268
.fetch_all(&mut **db_tx)
352269
.await?;
353270

@@ -371,8 +288,7 @@ pub async fn tx_graph_changeset_from_postgres(
371288
}
372289

373290
// Fetch anchors
374-
let rows = sqlx::query("SELECT anchor, txid FROM anchor_tx WHERE wallet_name = $1")
375-
.bind(wallet_name)
291+
let rows = sqlx::query("SELECT anchor, txid FROM anchor_tx")
376292
.fetch_all(&mut **db_tx)
377293
.await?;
378294

@@ -454,13 +370,11 @@ pub async fn tx_graph_changeset_persist_to_postgres(
454370
#[tracing::instrument]
455371
pub async fn local_chain_changeset_from_postgres(
456372
db_tx: &mut Transaction<'_, Postgres>,
457-
wallet_name: &str,
458373
) -> Result<local_chain::ChangeSet, BdkSqlxError> {
459374
info!("local chain changeset from postgres");
460375
let mut changeset = local_chain::ChangeSet::default();
461376

462-
let rows = sqlx::query("SELECT hash, height FROM block WHERE wallet_name = $1")
463-
.bind(wallet_name)
377+
let rows = sqlx::query("SELECT hash, height FROM block")
464378
.fetch_all(&mut **db_tx)
465379
.await?;
466380

src/main.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use bdk_wallet::bitcoin::Network;
99
use bdk_wallet::{KeychainKind, PersistedWallet, Wallet};
1010
use better_panic::Settings;
1111
use rustls::crypto::ring::default_provider;
12+
use sqlx::Postgres;
1213
use tracing_subscriber::layer::SubscriberExt;
1314
use tracing_subscriber::util::SubscriberInitExt;
1415
use tracing_subscriber::EnvFilter;
@@ -59,7 +60,8 @@ async fn main() -> anyhow::Result<()> {
5960
NETWORK,
6061
&secp,
6162
)?;
62-
let mut store = bdk_sqlx::Store::new_with_url(url.clone(), Some(wallet_name)).await?;
63+
let mut store =
64+
bdk_sqlx::Store::<Postgres>::new_with_url(url.clone(), Some(wallet_name)).await?;
6365

6466
let mut wallet = match Wallet::load().load_wallet_async(&mut store).await? {
6567
Some(wallet) => wallet,
@@ -114,7 +116,7 @@ async fn main() -> anyhow::Result<()> {
114116
Ok(())
115117
}
116118

117-
fn electrum(wallet: &mut PersistedWallet<Store>) -> anyhow::Result<()> {
119+
fn electrum(wallet: &mut PersistedWallet<Store<Postgres>>) -> anyhow::Result<()> {
118120
let client = BdkElectrumClient::new(electrum_client::Client::new(ELECTRUM_URL)?);
119121

120122
// Populate the electrum client's transaction cache so it doesn't redownload transaction we

0 commit comments

Comments
 (0)