Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ sysinfo = { version = "0.30", optional = true }
hex = { version = "0.4", optional = true }
mime_guess = { version = "2.0", optional = true }
sentry = "0.34"
reqwest = { version = "0.12" }

[features]
default = ["console", "gateway", "media", "connector", "cert_utils"]
Expand Down
20 changes: 18 additions & 2 deletions bin/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync
sender: Sender<crate::rpc::Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
edge_secure: Arc<ES>,
gateway_secure: Arc<GS>,
ext_auth_uri: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let token_service: OpenApiService<_, ()> = OpenApiService::new(api_token::TokenApis::<GS>::new(), "App APIs", env!("CARGO_PKG_VERSION")).server("/token/");
let token_ui = token_service.swagger_ui();
Expand All @@ -132,7 +133,14 @@ pub async fn run_gateway_http_server<ES: 'static + MediaEdgeSecure + Send + Sync
.nest("/token/", token_service.data(api_token::TokenServerCtx { secure: gateway_secure }))
.nest("/token/ui", token_ui)
.at("/token/spec", poem::endpoint::make_sync(move |_| token_spec.clone()))
.nest("/", media_service.data(api_media::MediaServerCtx { sender, secure: edge_secure }))
.nest(
"/",
media_service.data(api_media::MediaServerCtx {
sender,
secure: edge_secure,
ext_auth_uri,
}),
)
.nest("/ui", media_ui)
.at("/spec", poem::endpoint::make_sync(move |_| media_spec.clone()))
.with(Cors::new());
Expand All @@ -147,6 +155,7 @@ pub async fn run_media_http_server<ES: 'static + MediaEdgeSecure + Send + Sync,
sender: Sender<crate::rpc::Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
edge_secure: Arc<ES>,
gateway_secure: Option<Arc<GS>>,
ext_auth_uri: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut route = Route::new();

Expand All @@ -170,7 +179,14 @@ pub async fn run_media_http_server<ES: 'static + MediaEdgeSecure + Send + Sync,

let route = route
.nest("/samples", samples)
.nest("/", media_service.data(api_media::MediaServerCtx { sender, secure: edge_secure }))
.nest(
"/",
media_service.data(api_media::MediaServerCtx {
sender,
secure: edge_secure,
ext_auth_uri,
}),
)
.nest("/ui", media_ui)
.at("/spec", poem::endpoint::make_sync(move |_| media_spec.clone()))
.with(Cors::new());
Expand Down
77 changes: 72 additions & 5 deletions bin/src/http/api_media.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{marker::PhantomData, sync::Arc};
use std::{marker::PhantomData, sync::Arc, time::Duration};

use media_server_protocol::{
cluster::gen_cluster_session_id,
Expand Down Expand Up @@ -28,13 +28,15 @@ use super::utils::{ApplicationSdp, ApplicationSdpPatch, CustomHttpResponse, Prot
pub struct MediaServerCtx<S: MediaEdgeSecure + Send + Sync> {
pub(crate) sender: tokio::sync::mpsc::Sender<Rpc<RpcReq<ClusterConnId>, RpcRes<ClusterConnId>>>,
pub(crate) secure: Arc<S>,
pub(crate) ext_auth_uri: Option<String>,
}

impl<S: MediaEdgeSecure + Send + Sync> Clone for MediaServerCtx<S> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
secure: self.secure.clone(),
ext_auth_uri: self.ext_auth_uri.clone(),
}
}
}
Expand All @@ -60,7 +62,22 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
body: ApplicationSdp<String>,
) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
let session_id = gen_cluster_session_id();
let token = ctx.secure.decode_obj::<WhipToken>("whip", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let token_str = &token.token;
let token = ctx.secure.decode_obj::<WhipToken>("whip", token_str).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
if let Some(auth_uri) = &ctx.ext_auth_uri {
log::info!(
"[MediaAPIs][whip/create] trying to authenticate peer {} for room {} using external URI provided: {}",
token.peer,
token.room,
auth_uri
);
let client = reqwest::Client::new();
let res = client.get(format!("{}/whip/create", auth_uri)).timeout(Duration::from_secs(5)).bearer_auth(token_str).send().await;
if res.is_err() || res.is_ok_and(|r| !r.status().is_success()) {
log::info!("[MediaAPIs] failed to authenticate user");
return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED));
}
}
log::info!("[MediaAPIs] create whip endpoint with token {:?}, ip {}, user_agent {}", token, ip_addr, user_agent);
let (req, rx) = Rpc::new(RpcReq::Whip(whip::RpcReq::Connect(WhipConnectReq {
session_id,
Expand Down Expand Up @@ -158,7 +175,22 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
body: ApplicationSdp<String>,
) -> Result<CustomHttpResponse<ApplicationSdp<String>>> {
let session_id = gen_cluster_session_id();
let token = ctx.secure.decode_obj::<WhepToken>("whep", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let token_str = &token.token;
let token = ctx.secure.decode_obj::<WhepToken>("whep", token_str).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
if let Some(auth_uri) = &ctx.ext_auth_uri {
log::info!(
"[MediaAPIs][whep/create] trying to authenticate peer {:?} for room {} using external URI provided: {}",
token.peer,
token.room,
auth_uri
);
let client = reqwest::Client::new();
let res = client.get(format!("{}/whep/create", auth_uri)).timeout(Duration::from_secs(5)).bearer_auth(token_str).send().await;
if res.is_err() || res.is_ok_and(|r| !r.status().is_success()) {
log::info!("[MediaAPIs] failed to authenticate user");
return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED));
}
}
log::info!("[MediaAPIs] create whep endpoint with token {:?}, ip {}, user_agent {}", token, ip_addr, user_agent);
let (req, rx) = Rpc::new(RpcReq::Whep(whep::RpcReq::Connect(WhepConnectReq {
session_id,
Expand Down Expand Up @@ -255,7 +287,22 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
connect: Protobuf<ConnectRequest>,
) -> Result<HttpResponse<Protobuf<ConnectResponse>>> {
let session_id = gen_cluster_session_id();
let token = ctx.secure.decode_obj::<WebrtcToken>("webrtc", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let token_str = &token.token;
let token = ctx.secure.decode_obj::<WebrtcToken>("webrtc", token_str).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
if let Some(auth_uri) = &ctx.ext_auth_uri {
log::info!(
"[MediaAPIs][webrtc/connect] trying to authenticate peer {:?} for room {:?} using external URI provided: {}",
token.peer,
token.room,
auth_uri
);
let client = reqwest::Client::new();
let res = client.get(format!("{}/webrtc/connect", auth_uri)).timeout(Duration::from_secs(5)).bearer_auth(token_str).send().await;
if res.is_err() || res.is_ok_and(|r| !r.status().is_success()) {
log::info!("[MediaAPIs] failed to authenticate user");
return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED));
}
}
log::info!("[MediaAPIs] create webrtc with token {:?}, ip {}, user_agent {}, request {:?}", token, ip_addr, user_agent, connect);
if let Some(join) = &connect.join {
if token.room != Some(join.room.clone()) {
Expand Down Expand Up @@ -324,7 +371,27 @@ impl<S: 'static + MediaEdgeSecure + Send + Sync> MediaApis<S> {
connect: Protobuf<ConnectRequest>,
) -> Result<HttpResponse<Protobuf<ConnectResponse>>> {
let conn_id2 = conn_id.0.parse().map_err(|_e| poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let token = ctx.secure.decode_obj::<WebrtcToken>("webrtc", &token.token).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
let token_str = &token.token;
let token = ctx.secure.decode_obj::<WebrtcToken>("webrtc", token_str).ok_or(poem::Error::from_status(StatusCode::BAD_REQUEST))?;
if let Some(auth_uri) = &ctx.ext_auth_uri {
log::info!(
"[MediaAPIs][webrtc/restart-ice] trying to authenticate peer {:?} for room {:?} using external URI provided: {}",
token.peer,
token.room,
auth_uri
);
let client = reqwest::Client::new();
let res = client
.get(format!("{}/webrtc/restart-ice", auth_uri))
.timeout(Duration::from_secs(5))
.bearer_auth(token_str)
.send()
.await;
if res.is_err() || res.is_ok_and(|r| !r.status().is_success()) {
log::info!("[MediaAPIs] failed to authenticate user");
return Err(poem::Error::from_status(StatusCode::UNAUTHORIZED));
}
}
if let Some(join) = &connect.join {
if token.room != Some(join.room.clone()) {
return Err(poem::Error::from_string("Wrong room".to_string(), StatusCode::FORBIDDEN));
Expand Down
7 changes: 6 additions & 1 deletion bin/src/server/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ pub struct Args {
/// The port for binding the RTPengine command UDP socket.
#[arg(env, long)]
rtpengine_cmd_addr: Option<SocketAddr>,

/// External HTTP endpoint URI for third-party authentication and authorization.
/// The URI should be in the format of `http(s)://<host>:<port>` or `http(s)://example.com`, without the backslash at the end.
#[arg(env, long)]
ext_auth_uri: Option<String>,
}

pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: NodeConfig, args: Args) {
Expand All @@ -96,7 +101,7 @@ pub async fn run_media_gateway(workers: usize, http_port: Option<u16>, node: Nod
let req_tx = req_tx.clone();
let secure2 = edge_secure.clone();
tokio::spawn(async move {
if let Err(e) = run_gateway_http_server(http_port, req_tx, secure2, gateway_secure).await {
if let Err(e) = run_gateway_http_server(http_port, req_tx, secure2, gateway_secure, args.ext_auth_uri).await {
log::error!("HTTP Error: {}", e);
}
});
Expand Down
7 changes: 6 additions & 1 deletion bin/src/server/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ pub struct Args {
/// Number of workers for uploading recordings.
#[arg(env, long, default_value_t = 5)]
record_upload_worker: usize,

/// External HTTP endpoint URI for third-party authentication and authorization.
/// The URI should be in the format of `http(s)://<host>:<port>` or `http(s)://example.com`, without the backslash at the end.
#[arg(env, long)]
ext_auth_uri: Option<String>,
}

pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: NodeConfig, args: Args) {
Expand All @@ -96,7 +101,7 @@ pub async fn run_media_server(workers: usize, http_port: Option<u16>, node: Node
let req_tx = req_tx.clone();
let secure = secure.clone();
tokio::spawn(async move {
if let Err(e) = run_media_http_server(http_port, req_tx, secure, secure2).await {
if let Err(e) = run_media_http_server(http_port, req_tx, secure, secure2, args.ext_auth_uri).await {
log::error!("HTTP Error: {}", e);
}
});
Expand Down