Skip to content

Commit 42b3df9

Browse files
Add Socks5 proxy transport for RPC
The `ProxyTransport` implements a socks5 stream connection with the core rpc host, and request/receive http data from it. `Transport` trait is implemented on `ProxyTransport` to make it compatible with existing `bitcoincore-rpc::Client`.
1 parent 64e88f0 commit 42b3df9

File tree

2 files changed

+242
-1
lines changed

2 files changed

+242
-1
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ compact_filters = ["rocksdb", "socks", "lazy_static", "cc"]
5959
key-value-db = ["sled"]
6060
all-keys = ["keys-bip39"]
6161
keys-bip39 = ["bip39"]
62-
rpc = ["bitcoincore-rpc"]
62+
rpc = ["bitcoincore-rpc", "socks"]
6363

6464
# We currently provide mulitple implementations of `Blockchain`, all are
6565
# blocking except for the `EsploraBlockchain` which can be either async or

src/blockchain/rpc_proxy.rs

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
// Bitcoin Dev Kit
2+
// Written in 2021 by Rajarshi Maitra <rajarshi149@gmail.com>
3+
//
4+
// Copyright (c) 2020-2021 Bitcoin Dev Kit Developers
5+
//
6+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
7+
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
8+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
9+
// You may not use this file except in accordance with one or both of these
10+
// licenses.
11+
12+
//! A SOCKS5 proxy transport implementation for RPC blockchain.
13+
//!
14+
//! This is currently internal to the lib and only compatible with
15+
//! Bitcoin Core RPC.
16+
17+
use super::rpc::Auth;
18+
use bitcoin::base64;
19+
use bitcoincore_rpc::jsonrpc::Error as JSONRPC_Error;
20+
use bitcoincore_rpc::jsonrpc::{Request, Response, Transport};
21+
use bitcoincore_rpc::Error as RPC_Error;
22+
use socks::Socks5Stream;
23+
use std::fs::File;
24+
use std::io::{BufRead, BufReader, Write};
25+
use std::time::{Duration, Instant};
26+
27+
/// Errors that can be thrown by [`ProxyTransport`](crate::blockchain::rpc_proxy::ProxyTransport)
28+
#[derive(Debug)]
29+
pub enum RpcProxyError {
30+
/// Bitcoin core rpc error
31+
CoreRpc(bitcoincore_rpc::Error),
32+
/// IO error
33+
Io(std::io::Error),
34+
/// Invalid RPC url
35+
InvalidUrl,
36+
/// Error serializing or deserializing JSON data
37+
Json(serde_json::Error),
38+
/// RPC timeout error
39+
RpcTimeout,
40+
/// Http Parsing Error
41+
HttpParsing,
42+
/// Http Timeout Error
43+
HttpTimeout,
44+
/// Http Response Code
45+
HttpResponseCode(u16),
46+
}
47+
48+
impl std::fmt::Display for RpcProxyError {
49+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50+
write!(f, "{:?}", self)
51+
}
52+
}
53+
54+
impl std::error::Error for RpcProxyError {}
55+
56+
impl_error!(bitcoincore_rpc::Error, CoreRpc, RpcProxyError);
57+
impl_error!(std::io::Error, Io, RpcProxyError);
58+
impl_error!(serde_json::Error, Json, RpcProxyError);
59+
60+
// We need to backport RpcProxyError to satisfy Transport trait bound
61+
impl From<RpcProxyError> for JSONRPC_Error {
62+
fn from(e: RpcProxyError) -> Self {
63+
Self::Transport(Box::new(e))
64+
}
65+
}
66+
67+
/// SOCKS5 proxy transport
68+
/// This is currently designed to work only with Bitcoin Core RPC
69+
pub(crate) struct ProxyTransport {
70+
proxy_addr: String,
71+
target_addr: String,
72+
proxy_credential: Option<(String, String)>,
73+
wallet_path: String,
74+
rpc_auth: Option<String>,
75+
timeout: Duration,
76+
}
77+
78+
impl ProxyTransport {
79+
/// Create a new ProxyTransport
80+
pub(crate) fn new(
81+
proxy_addr: &str,
82+
rpc_url: &str,
83+
proxy_credential: Option<(String, String)>,
84+
rpc_auth: &Auth,
85+
) -> Result<Self, RpcProxyError> {
86+
// Fetch the RPC address:port and wallet path from url
87+
let (target_addr, wallet_path) = {
88+
// the url will be of form "http://<rpc-host>:<rpc-port>/wallet/<wallet-file-name>"
89+
(rpc_url[7..22].to_owned(), rpc_url[22..].to_owned())
90+
};
91+
92+
// fetch username password from rpc authentication
93+
let rpc_auth = {
94+
if let (Some(user), Some(pass)) = Self::get_user_pass(rpc_auth)? {
95+
let mut auth = user;
96+
auth.push(':');
97+
auth.push_str(&pass[..]);
98+
Some(format!("Basic {}", &base64::encode(auth.as_bytes())))
99+
} else {
100+
None
101+
}
102+
};
103+
104+
Ok(ProxyTransport {
105+
proxy_addr: proxy_addr.to_owned(),
106+
target_addr,
107+
proxy_credential,
108+
wallet_path,
109+
rpc_auth,
110+
timeout: Duration::from_secs(15), // Same as regular RPC default
111+
})
112+
}
113+
114+
// Helper function to parse username:password pair for rpc Auth
115+
fn get_user_pass(auth: &Auth) -> Result<(Option<String>, Option<String>), RpcProxyError> {
116+
use std::io::Read;
117+
match auth {
118+
Auth::None => Ok((None, None)),
119+
Auth::UserPass { username, password } => {
120+
Ok((Some(username.clone()), Some(password.clone())))
121+
}
122+
Auth::Cookie { file } => {
123+
let mut file = File::open(file)?;
124+
let mut contents = String::new();
125+
file.read_to_string(&mut contents)?;
126+
let mut split = contents.splitn(2, ':');
127+
Ok((
128+
Some(split.next().ok_or(RPC_Error::InvalidCookieFile)?.into()),
129+
Some(split.next().ok_or(RPC_Error::InvalidCookieFile)?.into()),
130+
))
131+
}
132+
}
133+
}
134+
135+
// Try to read a line from a buffered reader. If no line can be read till the deadline is reached
136+
// return a timeout error.
137+
fn get_line<R: BufRead>(reader: &mut R, deadline: Instant) -> Result<String, RpcProxyError> {
138+
let mut line = String::new();
139+
while deadline > Instant::now() {
140+
match reader.read_line(&mut line) {
141+
// EOF reached for now, try again later
142+
Ok(0) => std::thread::sleep(Duration::from_millis(5)),
143+
// received useful data, return it
144+
Ok(_) => return Ok(line),
145+
// io error occurred, abort
146+
Err(e) => return Err(e.into()),
147+
}
148+
}
149+
Err(RpcProxyError::RpcTimeout)
150+
}
151+
152+
// Http request and response over SOCKS5
153+
fn request<R>(&self, req: impl serde::Serialize) -> Result<R, RpcProxyError>
154+
where
155+
R: for<'a> serde::de::Deserialize<'a>,
156+
{
157+
let request_deadline = Instant::now() + self.timeout;
158+
159+
// Open connection
160+
let mut socks_stream = if let Some((username, password)) = &self.proxy_credential {
161+
Socks5Stream::connect_with_password(
162+
&self.proxy_addr[..],
163+
&self.target_addr[..],
164+
&username[..],
165+
&password[..],
166+
)?
167+
} else {
168+
Socks5Stream::connect(&self.proxy_addr[..], &self.target_addr[..])?
169+
};
170+
171+
let socks_stream = socks_stream.get_mut();
172+
173+
// Serialize the body first so we can set the Content-Length header.
174+
let body = serde_json::to_vec(&req)?;
175+
176+
// Send HTTP request
177+
socks_stream.write_all(b"POST ")?;
178+
socks_stream.write_all(self.wallet_path.as_bytes())?;
179+
socks_stream.write_all(b" HTTP/1.1\r\n")?;
180+
// Write headers
181+
socks_stream.write_all(b"Content-Type: application/json-rpc\r\n")?;
182+
socks_stream.write_all(b"Content-Length: ")?;
183+
socks_stream.write_all(body.len().to_string().as_bytes())?;
184+
socks_stream.write_all(b"\r\n")?;
185+
if let Some(ref auth) = self.rpc_auth {
186+
socks_stream.write_all(b"Authorization: ")?;
187+
socks_stream.write_all(auth.as_ref())?;
188+
socks_stream.write_all(b"\r\n")?;
189+
}
190+
// Write body
191+
socks_stream.write_all(b"\r\n")?;
192+
socks_stream.write_all(&body)?;
193+
socks_stream.flush()?;
194+
195+
// Receive response
196+
let mut reader = BufReader::new(socks_stream);
197+
198+
// Parse first HTTP response header line
199+
let http_response = Self::get_line(&mut reader, request_deadline)?;
200+
if http_response.len() < 12 || !http_response.starts_with("HTTP/1.1 ") {
201+
return Err(RpcProxyError::HttpParsing);
202+
}
203+
let response_code = match http_response[9..12].parse::<u16>() {
204+
Ok(n) => n,
205+
Err(_) => return Err(RpcProxyError::HttpParsing),
206+
};
207+
208+
// Skip response header fields
209+
while Self::get_line(&mut reader, request_deadline)? != "\r\n" {}
210+
211+
// Even if it's != 200, we parse the response as we may get a JSONRPC error instead
212+
// of the less meaningful HTTP error code.
213+
let resp_body = Self::get_line(&mut reader, request_deadline)?;
214+
match serde_json::from_str(&resp_body) {
215+
Ok(s) => Ok(s),
216+
Err(e) => {
217+
if response_code != 200 {
218+
Err(RpcProxyError::HttpResponseCode(response_code))
219+
} else {
220+
// If it was 200 then probably it was legitimately a parse error
221+
Err(e.into())
222+
}
223+
}
224+
}
225+
}
226+
}
227+
228+
// Make ProxyTransport usable in bitcoin_core::rpc::Client
229+
impl Transport for ProxyTransport {
230+
fn send_request(&self, req: Request) -> Result<Response, JSONRPC_Error> {
231+
Ok(self.request(req)?)
232+
}
233+
234+
fn send_batch(&self, reqs: &[Request]) -> Result<Vec<Response>, JSONRPC_Error> {
235+
Ok(self.request(reqs)?)
236+
}
237+
238+
fn fmt_target(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
239+
write!(f, "{}/{}", self.target_addr, self.wallet_path)
240+
}
241+
}

0 commit comments

Comments
 (0)