- 
                Notifications
    You must be signed in to change notification settings 
- Fork 138
Closed
Description
This is async version ironrdp example code + tauri.
use anyhow::{Context, Result};
use base64::{engine::general_purpose, Engine as _};
use ironrdp::connector::ConnectionResult;
use ironrdp::connector::{self, Credentials};
use ironrdp::pdu::gcc::KeyboardType;
use ironrdp::pdu::rdp::{capability_sets::MajorPlatformType, client_info::PerformanceFlags};
use ironrdp::session::image::DecodedImage;
use ironrdp::session::{ActiveStage, ActiveStageOutput};
use std::sync::Arc;
use tauri::{AppHandle, Emitter, State};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, Mutex};
pub mod network_client;
#[derive(Clone)]
struct RdpState {
    tx: Arc<Mutex<Option<mpsc::Sender<RdpCommand>>>>,
}
enum RdpCommand {
    MouseEvent {
        x: u16,
        y: u16,
        button: Option<String>,
        is_pressed: bool,
    },
}
#[tauri::command]
async fn start_rdp_stream(
    app_handle: AppHandle,
    state: State<'_, RdpState>,
    hostname: String,
    port: u16,
    username: String,
    password: String,
    domain: String,
) -> Result<(), String> {
    let state = Arc::new(state.inner().clone());
    let app_handle = app_handle.clone();
    tokio::spawn(async move {
        if let Err(e) = run_rdp_session(
            app_handle, state, hostname, port, username, password, domain,
        )
        .await
        {
            eprintln!("RDP session error: {}", e);
        }
    });
    Ok(())
}
async fn run_rdp_session(
    app_handle: AppHandle,
    state: Arc<RdpState>,
    hostname: String,
    port: u16,
    username: String,
    password: String,
    domain: String,
) -> Result<(), String> {
    let config = build_config(username, password, Some(domain));
    let (connection_result, mut framed) = connect(config, hostname, port)
        .await
        .context("connect")
        .map_err(|e| e.to_string())?;
    let mut image = DecodedImage::new(
        ironrdp_graphics::image_processing::PixelFormat::RgbA32,
        connection_result.desktop_size.width,
        connection_result.desktop_size.height,
    );
    let (tx, mut rx) = mpsc::channel(100);
    {
        let mut state_tx = state.tx.lock().await;
        *state_tx = Some(tx);
    }
    let mut active_stage = ActiveStage::new(connection_result);
    loop {
        tokio::select! {
            frame_result = framed.read_pdu() => {
                match frame_result {
                    Ok((action, payload)) => {
                        match active_stage.process(&mut image, action, &payload) {
                            Ok(outputs) => {
                                for out in outputs {
                                    handle_active_stage_output(&mut framed, &app_handle, &mut image, out).await?;
                                }
                            }
                            Err(e) => {
                                eprintln!("Error processing RDP event: {:?}", e);
                            }
                        }
                    }
                    Err(e) => {
                        if e.kind() == std::io::ErrorKind::WouldBlock {
                            continue;
                        } else {
                            if let Err(emit_err) = app_handle.emit("rdp-error", format!("RDP read error: {:?}", e)) {
                                eprintln!("Failed to emit RDP error: {}", emit_err);
                            }
                            break;
                        }
                    }
                }
            }
            Some(command) = rx.recv() => {
                match command {
                    RdpCommand::MouseEvent { x, y, button, is_pressed } => {
                        println!("Mouse event: x={}, y={}, button={:?}, is_pressed={}", x, y, button, is_pressed);
                        // Handle mouse event
                    }
                }
            }
        }
    }
    Ok(())
}
async fn handle_active_stage_output(
    framed: &mut UpgradedFramed,
    app_handle: &AppHandle,
    image: &mut DecodedImage,
    output: ActiveStageOutput,
) -> Result<(), String> {
    match output {
        ActiveStageOutput::GraphicsUpdate(_region) => {
            let mut bmp = bmp::Image::new(u32::from(image.width()), u32::from(image.height()));
            image
                .data()
                .chunks_exact(
                    usize::from(image.width())
                        .checked_mul(4)
                        .expect("never overflow"),
                )
                .enumerate()
                .for_each(|(y, row)| {
                    row.chunks_exact(4).enumerate().for_each(|(x, pixel)| {
                        let r = pixel[0];
                        let g = pixel[1];
                        let b = pixel[2];
                        let _a = pixel[3];
                        bmp.set_pixel(
                            u32::try_from(x).unwrap(),
                            u32::try_from(y).unwrap(),
                            bmp::Pixel::new(r, g, b),
                        );
                    })
                });
            let mut bmp_data = Vec::new();
            bmp.to_writer(&mut bmp_data)
                .expect("Failed to write BMP data");
            let base64_image = general_purpose::STANDARD.encode(&bmp_data);
            if let Err(e) = app_handle.emit("rdp-frame", &base64_image) {
                eprintln!("Failed to send frame: {}", e);
            }
        }
        ActiveStageOutput::ResponseFrame(frame) => {
            if let Err(e) = framed.write_all(&frame).await {
                eprintln!("Failed to write response frame: {}", e);
            }
        }
        ActiveStageOutput::PointerDefault => {
            // default pointer
        }
        ActiveStageOutput::PointerHidden => {
            // hidden pointer
        }
        ActiveStageOutput::Terminate(_) => return Ok(()),
        _ => {}
    }
    Ok(())
}
#[tauri::command]
async fn send_mouse_event(
    state: State<'_, RdpState>,
    x: u16,
    y: u16,
    button: Option<String>,
    is_pressed: bool,
) -> Result<(), String> {
    let tx = state.tx.lock().await;
    if let Some(tx) = tx.as_ref() {
        tx.send(RdpCommand::MouseEvent {
            x,
            y,
            button,
            is_pressed,
        })
        .await
        .map_err(|e| format!("Failed to send mouse event: {}", e))
    } else {
        Err("RDP session not started".to_string())
    }
}
type UpgradedFramed = ironrdp_tokio::TokioFramed<tokio_rustls::client::TlsStream<TcpStream>>;
async fn connect(
    config: connector::Config,
    server_name: String,
    port: u16,
) -> anyhow::Result<(ConnectionResult, UpgradedFramed)> {
    let server_addr = lookup_addr(&server_name, port).context("lookup addr")?;
    let tcp_stream = TcpStream::connect(server_addr)
        .await
        .context("TCP connect")?;
    tcp_stream.set_nodelay(true)?;
    let mut framed = ironrdp_tokio::TokioFramed::new(tcp_stream);
    let mut connector = connector::ClientConnector::new(config).with_server_addr(server_addr);
    let should_upgrade = ironrdp_tokio::connect_begin(&mut framed, &mut connector)
        .await
        .context("begin connection")?;
    // Ensure there is no leftover
    let initial_stream = framed.into_inner_no_leftover();
    let (upgraded_stream, server_public_key) =
        ironrdp_tls::upgrade(initial_stream, &server_addr.to_string())
            .await
            .map_err(|e| connector::custom_err!("TLS upgrade", e))?;
    let upgraded = ironrdp_tokio::mark_as_upgraded(should_upgrade, &mut connector);
    let mut upgraded_framed = ironrdp_tokio::TokioFramed::new(upgraded_stream);
    let mut network_client = crate::network_client::ReqwestNetworkClient::new();
    let connection_result = ironrdp_tokio::connect_finalize(
        upgraded,
        &mut upgraded_framed,
        connector,
        server_name.into(),
        server_public_key,
        Some(&mut network_client),
        None,
    )
    .await
    .context("finalize connection")?;
    Ok((connection_result, upgraded_framed))
}
fn lookup_addr(hostname: &str, port: u16) -> anyhow::Result<std::net::SocketAddr> {
    use std::net::ToSocketAddrs as _;
    let addr = (hostname, port).to_socket_addrs()?.next().unwrap();
    Ok(addr)
}
fn build_config(username: String, password: String, domain: Option<String>) -> connector::Config {
    connector::Config {
        credentials: Credentials::UsernamePassword { username, password },
        domain,
        enable_tls: true,
        enable_credssp: false,
        keyboard_type: KeyboardType::IbmEnhanced,
        keyboard_subtype: 0,
        keyboard_layout: 0,
        keyboard_functional_keys_count: 12,
        ime_file_name: String::new(),
        dig_product_id: String::new(),
        desktop_size: connector::DesktopSize {
            width: 1280,
            height: 1024,
        },
        bitmap: None,
        client_build: 0,
        client_name: "ironrdp".to_owned(),
        client_dir: "C:\\Windows\\System32\\mstscax.dll".to_owned(),
        #[cfg(windows)]
        platform: MajorPlatformType::WINDOWS,
        #[cfg(target_os = "macos")]
        platform: MajorPlatformType::MACINTOSH,
        #[cfg(target_os = "ios")]
        platform: MajorPlatformType::IOS,
        #[cfg(target_os = "linux")]
        platform: MajorPlatformType::UNIX,
        #[cfg(target_os = "android")]
        platform: MajorPlatformType::ANDROID,
        #[cfg(target_os = "freebsd")]
        platform: MajorPlatformType::UNIX,
        #[cfg(target_os = "dragonfly")]
        platform: MajorPlatformType::UNIX,
        #[cfg(target_os = "openbsd")]
        platform: MajorPlatformType::UNIX,
        #[cfg(target_os = "netbsd")]
        platform: MajorPlatformType::UNIX,
        // Disable custom pointers (there is no user interaction anyway)
        no_server_pointer: true,
        autologon: false,
        pointer_software_rendering: true,
        performance_flags: PerformanceFlags::default(),
        desktop_scale_factor: 0,
    }
}
#[cfg_attr(mobile, tauri::mobile_entry_point)]
pub fn run() {
    tauri::Builder::default()
        .manage(RdpState {
            tx: Arc::new(Mutex::new(None)),
        })
        .invoke_handler(tauri::generate_handler![start_rdp_stream, send_mouse_event])
        .run(tauri::generate_context!())
        .expect("error while running tauri application");
}And I got this error.
error: future cannot be sent between threads safely
   --> src/lib.rs:43:5
    |
43  | /     tokio::spawn(async move {
44  | |         if let Err(e) = run_rdp_session(
45  | |             app_handle, state, hostname, port, username, password, domain,
46  | |         )
...   |
50  | |         }
51  | |     });
    | |______^ future created by async block is not `Send`
    |
    = help: within `{async block@src/lib.rs:43:18: 51:6}`, the trait `Send` is not implemented for `Rc<DecodedPointer>`, which is required by `{async block@src/lib.rs:43:18: 51:6}: Send`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:80:44
    |
72  |     let mut image = DecodedImage::new(
    |         --------- has type `DecodedImage` which is not `Send`
...
80  |         let mut state_tx = state.tx.lock().await;
    |                                            ^^^^^ await occurs here, with `mut image` maybe used later
note: required by a bound in `tokio::spawn`
   --> /home/hj/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.40.0/src/task/spawn.rs:167:21
    |
165 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
166 |     where
167 |         F: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`
error[E0277]: `dyn AsyncNetworkClient` cannot be sent between threads safely
   --> src/lib.rs:43:5
    |
43  |        tokio::spawn(async move {
    |   _____^____________-
    |  |_____|
    | ||
44  | ||         if let Err(e) = run_rdp_session(
45  | ||             app_handle, state, hostname, port, username, password, domain,
46  | ||         )
...   ||
50  | ||         }
51  | ||     });
    | ||_____-^ `dyn AsyncNetworkClient` cannot be sent between threads safely
    |  |_____|
    |        within this `{async block@src/lib.rs:43:18: 51:6}`
    |
    = help: within `{async block@src/lib.rs:43:18: 51:6}`, the trait `Send` is not implemented for `dyn AsyncNetworkClient`, which is required by `{async block@src/lib.rs:43:18: 51:6}: Send`
    = note: required because it appears within the type `&mut dyn AsyncNetworkClient`
note: required because it appears within the type `Option<&mut dyn AsyncNetworkClient>`
   --> /home/hj/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/option.rs:574:10
    |
574 | pub enum Option<T> {
    |          ^^^^^^
note: required because it's used within this `async` fn body
   --> /home/hj/.cargo/git/checkouts/ironrdp-cc592b0b50166368/703b245/crates/ironrdp-async/src/connector.rs:47:1
    |
47  | #[instrument(skip_all)]
    | ^^^^^^^^^^^^^^^^^^^^^^^
note: required because it's used within this `async` fn body
   --> src/lib.rs:216:57
    |
216 |   ) -> anyhow::Result<(ConnectionResult, UpgradedFramed)> {
    |  _________________________________________________________^
217 | |     let server_addr = lookup_addr(&server_name, port).context("lookup addr")?;
218 | |
219 | |     let tcp_stream = TcpStream::connect(server_addr)
...   |
259 | |     Ok((connection_result, upgraded_framed))
260 | | }
    | |_^
note: required because it's used within this `async` fn body
   --> src/lib.rs:64:25
    |
64  |   ) -> Result<(), String> {
    |  _________________________^
65  | |     let config = build_config(username, password, Some(domain));
66  | |
67  | |     let (connection_result, mut framed) = connect(config, hostname, port)
...   |
125 | |     Ok(())
126 | | }
    | |_^
note: required because it's used within this `async` block
   --> src/lib.rs:43:18
    |
43  |       tokio::spawn(async move {
    |  __________________^
44  | |         if let Err(e) = run_rdp_session(
45  | |             app_handle, state, hostname, port, username, password, domain,
46  | |         )
...   |
50  | |         }
51  | |     });
    | |_____^
note: required by a bound in `tokio::spawn`
   --> /home/hj/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.40.0/src/task/spawn.rs:167:21
    |
165 |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
    |            ----- required by a bound in this function
166 |     where
167 |         F: Future + Send + 'static,
    |                     ^^^^ required by this bound in `spawn`
    = note: the full name for the type has been written to '/home/hj/multirdp/src-tauri/target/debug/deps/multirdp_lib-4eb0194ff033d779.long-type-150430135109493735.txt'
    = note: consider using `--verbose` to print the full type name to the console
    = note: this error originates in the attribute macro `instrument` (in Nightly builds, run with -Z macro-backtrace for more info)
error[E0277]: `dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>` cannot be sent between threads safely
    --> src/lib.rs:43:5
     |
43   | /     tokio::spawn(async move {
44   | |         if let Err(e) = run_rdp_session(
45   | |             app_handle, state, hostname, port, username, password, domain,
46   | |         )
...    |
50   | |         }
51   | |     });
     | |______^ `dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>` cannot be sent between threads safely
     |
     = help: the trait `Send` is not implemented for `dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>`, which is required by `{async block@src/lib.rs:43:18: 51:6}: Send`
     = note: required for `Unique<dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>>` to implement `Send`
note: required because it appears within the type `Box<dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>>`
    --> /home/hj/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/alloc/src/boxed.rs:237:12
     |
237  | pub struct Box<
     |            ^^^
note: required because it appears within the type `Pin<Box<dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>>>`
    --> /home/hj/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/pin.rs:1090:12
     |
1090 | pub struct Pin<Ptr> {
     |            ^^^
note: required because it's used within this `async` fn body
    --> /home/hj/.cargo/git/checkouts/ironrdp-cc592b0b50166368/703b245/crates/ironrdp-async/src/connector.rs:91:35
     |
91   |   ) -> ConnectorResult<ClientState> {
     |  ___________________________________^
92   | |     let mut state = generator.start();
93   | |
94   | |     loop {
...    |
105  | |     }
106  | | }
     | |_^
note: required because it's used within this `async` block
    --> /home/hj/.cargo/git/checkouts/ironrdp-cc592b0b50166368/703b245/crates/ironrdp-async/src/connector.rs:108:1
     |
108  | #[instrument(level = "trace", skip_all)]
     | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: required because it's used within this `async` fn body
    --> /home/hj/.cargo/git/checkouts/ironrdp-cc592b0b50166368/703b245/crates/ironrdp-async/src/connector.rs:108:1
     |
108  | #[instrument(level = "trace", skip_all)]
     | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
note: required because it's used within this `async` block
    --> /home/hj/.cargo/git/checkouts/ironrdp-cc592b0b50166368/703b245/crates/ironrdp-async/src/connector.rs:47:1
     |
47   | #[instrument(skip_all)]
     | ^^^^^^^^^^^^^^^^^^^^^^^
note: required because it's used within this `async` fn body
    --> /home/hj/.cargo/git/checkouts/ironrdp-cc592b0b50166368/703b245/crates/ironrdp-async/src/connector.rs:47:1
     |
47   | #[instrument(skip_all)]
     | ^^^^^^^^^^^^^^^^^^^^^^^
note: required because it's used within this `async` fn body
    --> src/lib.rs:216:57
     |
216  |   ) -> anyhow::Result<(ConnectionResult, UpgradedFramed)> {
     |  _________________________________________________________^
217  | |     let server_addr = lookup_addr(&server_name, port).context("lookup addr")?;
218  | |
219  | |     let tcp_stream = TcpStream::connect(server_addr)
...    |
259  | |     Ok((connection_result, upgraded_framed))
260  | | }
     | |_^
note: required because it's used within this `async` fn body
    --> src/lib.rs:64:25
     |
64   |   ) -> Result<(), String> {
     |  _________________________^
65   | |     let config = build_config(username, password, Some(domain));
66   | |
67   | |     let (connection_result, mut framed) = connect(config, hostname, port)
...    |
125  | |     Ok(())
126  | | }
     | |_^
note: required because it's used within this `async` block
    --> src/lib.rs:43:18
     |
43   |       tokio::spawn(async move {
     |  __________________^
44   | |         if let Err(e) = run_rdp_session(
45   | |             app_handle, state, hostname, port, username, password, domain,
46   | |         )
...    |
50   | |         }
51   | |     });
     | |_____^
note: required by a bound in `tokio::spawn`
    --> /home/hj/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.40.0/src/task/spawn.rs:167:21
     |
165  |     pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
     |            ----- required by a bound in this function
166  |     where
167  |         F: Future + Send + 'static,
     |                     ^^^^ required by this bound in `spawn`
First error is solved when I change ironrdp library from Rc to Arc. but i don't have any idea about how can I solve second error "the trait Send is not implemented for dyn Future<Output = Result<Vec<u8>, ironrdp_error::Error<ConnectorErrorKind>>>"
Metadata
Metadata
Assignees
Labels
No labels