Skip to content

Commit 42f8350

Browse files
new port rx and tx loops
1 parent cb8aad9 commit 42f8350

File tree

13 files changed

+621
-124
lines changed

13 files changed

+621
-124
lines changed

core/src/ffi/dpdk.rs

Lines changed: 118 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use crate::{debug, error};
2222
use anyhow::Result;
2323
use capsule_ffi as cffi;
2424
use std::fmt;
25-
use std::ops::DerefMut;
25+
use std::mem;
26+
use std::ops::{Deref, DerefMut};
2627
use std::os::raw;
2728
use std::panic::{self, AssertUnwindSafe};
2829
use std::ptr;
@@ -90,6 +91,15 @@ impl From<raw::c_int> for SocketId {
9091
/// A `rte_mempool` pointer.
9192
pub(crate) type MempoolPtr = EasyPtr<cffi::rte_mempool>;
9293

94+
impl Clone for MempoolPtr {
95+
fn clone(&self) -> Self {
96+
self.0.clone().into()
97+
}
98+
}
99+
100+
// Allows the pointer to go across thread/lcore boundaries.
101+
unsafe impl Send for MempoolPtr {}
102+
93103
/// Creates a mbuf pool.
94104
pub(crate) fn pktmbuf_pool_create<S: Into<String>>(
95105
name: S,
@@ -125,9 +135,14 @@ pub(crate) fn mempool_lookup<S: Into<String>>(name: S) -> Result<MempoolPtr> {
125135
Ok(EasyPtr(ptr))
126136
}
127137

138+
/// Returns the number of elements which have been allocated from the mempool.
139+
pub(crate) fn mempool_in_use_count(mp: &MempoolPtr) -> usize {
140+
unsafe { cffi::rte_mempool_in_use_count(mp.deref()) as usize }
141+
}
142+
128143
/// Frees a mempool.
129-
pub(crate) fn mempool_free(ptr: &mut MempoolPtr) {
130-
unsafe { cffi::rte_mempool_free(ptr.deref_mut()) };
144+
pub(crate) fn mempool_free(mp: &mut MempoolPtr) {
145+
unsafe { cffi::rte_mempool_free(mp.deref_mut()) };
131146
}
132147

133148
/// An opaque identifier for a logical execution unit of the processor.
@@ -353,10 +368,26 @@ pub(crate) fn eth_dev_configure(
353368
}
354369
}
355370

371+
/// An opaque identifier for a port's receive queue.
372+
#[derive(Copy, Clone)]
373+
pub(crate) struct PortRxQueueId(u16);
374+
375+
impl fmt::Debug for PortRxQueueId {
376+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377+
write!(f, "rxq{}", self.0)
378+
}
379+
}
380+
381+
impl From<usize> for PortRxQueueId {
382+
fn from(id: usize) -> Self {
383+
PortRxQueueId(id as u16)
384+
}
385+
}
386+
356387
/// Allocates and sets up a receive queue for a device.
357388
pub(crate) fn eth_rx_queue_setup(
358389
port_id: PortId,
359-
rx_queue_id: usize,
390+
rx_queue_id: PortRxQueueId,
360391
nb_rx_desc: usize,
361392
socket_id: SocketId,
362393
rx_conf: Option<&cffi::rte_eth_rxconf>,
@@ -365,7 +396,7 @@ pub(crate) fn eth_rx_queue_setup(
365396
unsafe {
366397
cffi::rte_eth_rx_queue_setup(
367398
port_id.0,
368-
rx_queue_id as u16,
399+
rx_queue_id.0,
369400
nb_rx_desc as u16,
370401
socket_id.0 as raw::c_uint,
371402
rx_conf.map_or(ptr::null(), |conf| conf),
@@ -376,18 +407,50 @@ pub(crate) fn eth_rx_queue_setup(
376407
}
377408
}
378409

410+
/// Retrieves a burst of input packets from a receive queue of a device.
411+
pub(crate) fn eth_rx_burst(port_id: PortId, queue_id: PortRxQueueId, rx_pkts: &mut Vec<MbufPtr>) {
412+
let nb_pkts = rx_pkts.capacity();
413+
414+
unsafe {
415+
let len = cffi::_rte_eth_rx_burst(
416+
port_id.0,
417+
queue_id.0,
418+
rx_pkts.as_mut_ptr() as *mut *mut cffi::rte_mbuf,
419+
nb_pkts as u16,
420+
);
421+
422+
rx_pkts.set_len(len as usize);
423+
}
424+
}
425+
426+
/// An opaque identifier for a port's transmit queue.
427+
#[derive(Copy, Clone)]
428+
pub(crate) struct PortTxQueueId(u16);
429+
430+
impl fmt::Debug for PortTxQueueId {
431+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
432+
write!(f, "txq{}", self.0)
433+
}
434+
}
435+
436+
impl From<usize> for PortTxQueueId {
437+
fn from(id: usize) -> Self {
438+
PortTxQueueId(id as u16)
439+
}
440+
}
441+
379442
/// Allocates and sets up a transmit queue for a device.
380443
pub(crate) fn eth_tx_queue_setup(
381444
port_id: PortId,
382-
tx_queue_id: usize,
445+
tx_queue_id: PortTxQueueId,
383446
nb_tx_desc: usize,
384447
socket_id: SocketId,
385448
tx_conf: Option<&cffi::rte_eth_txconf>,
386449
) -> Result<()> {
387450
unsafe {
388451
cffi::rte_eth_tx_queue_setup(
389452
port_id.0,
390-
tx_queue_id as u16,
453+
tx_queue_id.0,
391454
nb_tx_desc as u16,
392455
socket_id.0 as raw::c_uint,
393456
tx_conf.map_or(ptr::null(), |conf| conf),
@@ -397,9 +460,57 @@ pub(crate) fn eth_tx_queue_setup(
397460
}
398461
}
399462

463+
/// Sends a burst of output packets on a transmit queue of a device.
464+
pub(crate) fn eth_tx_burst(
465+
port_id: PortId,
466+
queue_id: PortTxQueueId,
467+
tx_pkts: &mut Vec<MbufPtr>,
468+
) -> usize {
469+
let nb_pkts = tx_pkts.len();
470+
471+
let sent = unsafe {
472+
cffi::_rte_eth_tx_burst(
473+
port_id.0,
474+
queue_id.0,
475+
tx_pkts.as_mut_ptr() as *mut *mut cffi::rte_mbuf,
476+
nb_pkts as u16,
477+
)
478+
} as usize;
479+
480+
if nb_pkts > sent {
481+
// wasn't able to send everything.
482+
mem::forget(tx_pkts.drain(..sent));
483+
} else {
484+
unsafe {
485+
tx_pkts.set_len(0);
486+
}
487+
}
488+
489+
sent
490+
}
491+
492+
/// Starts a device.
493+
pub(crate) fn eth_dev_start(port_id: PortId) -> Result<()> {
494+
unsafe {
495+
cffi::rte_eth_dev_start(port_id.0)
496+
.into_result(DpdkError::from_errno)
497+
.map(|_| ())
498+
}
499+
}
500+
501+
/// Stops a device.
502+
pub(crate) fn eth_dev_stop(port_id: PortId) {
503+
unsafe {
504+
cffi::rte_eth_dev_stop(port_id.0);
505+
}
506+
}
507+
400508
/// A `rte_mbuf` pointer.
401509
pub(crate) type MbufPtr = EasyPtr<cffi::rte_mbuf>;
402510

511+
// Allows the pointer to go across thread/lcore boundaries.
512+
unsafe impl Send for MbufPtr {}
513+
403514
/// Allocates a new mbuf from a mempool.
404515
pub(crate) fn pktmbuf_alloc(mp: &mut MempoolPtr) -> Result<MbufPtr> {
405516
let ptr =

core/src/rt2/lcore.rs

Lines changed: 26 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,51 +16,22 @@
1616
* SPDX-License-Identifier: Apache-2.0
1717
*/
1818

19+
use super::ShutdownTrigger;
1920
use crate::ffi::dpdk::{self, LcoreId};
2021
use crate::{debug, info};
21-
use anyhow::{anyhow, Result};
22-
use async_channel::{self, Receiver, Recv, Sender};
22+
use anyhow::Result;
2323
use async_executor::Executor;
2424
use futures_lite::future;
2525
use std::collections::HashMap;
2626
use std::future::Future;
2727
use std::sync::Arc;
28-
29-
/// Trigger for the shutdown.
30-
pub(crate) struct Trigger(Sender<()>);
31-
32-
impl Trigger {
33-
/// Triggers the shutdown.
34-
pub(crate) fn fire(self) {
35-
drop(self.0)
36-
}
37-
}
38-
39-
/// Shutdown wait handle.
40-
pub(crate) struct Shutdown(Receiver<()>);
41-
42-
impl Shutdown {
43-
/// A future that waits till the trigger is fired.
44-
pub(crate) fn wait(&self) -> Recv<'_, ()> {
45-
self.0.recv()
46-
}
47-
}
48-
49-
/// Creates a shutdown and trigger pair.
50-
///
51-
/// Leverages the behavior of an async channel. When the sender is dropped
52-
/// from scope, it closes the channel and causes the receiver side future
53-
/// in the executor queue to resolve.
54-
pub(crate) fn shutdown_trigger() -> (Trigger, Shutdown) {
55-
let (s, r) = async_channel::unbounded();
56-
(Trigger(s), Shutdown(r))
57-
}
28+
use thiserror::Error;
5829

5930
/// An async executor abstraction on top of a DPDK logical core.
6031
pub(crate) struct Lcore {
6132
id: LcoreId,
6233
executor: Arc<Executor<'static>>,
63-
trigger: Option<Trigger>,
34+
shutdown: Option<ShutdownTrigger>,
6435
}
6536

6637
impl Lcore {
@@ -71,23 +42,29 @@ impl Lcore {
7142
/// Returns `DpdkError` if the executor fails to run on the given lcore.
7243
fn new(id: LcoreId) -> Result<Self> {
7344
debug!(?id, "starting lcore.");
45+
let trigger = ShutdownTrigger::new();
7446
let executor = Arc::new(Executor::new());
75-
let (trigger, shutdown) = shutdown_trigger();
7647

48+
let handle = trigger.get_wait();
7749
let executor2 = Arc::clone(&executor);
7850
dpdk::eal_remote_launch(id, move || {
7951
info!(?id, "lcore started.");
80-
let _ = future::block_on(executor2.run(shutdown.wait()));
52+
let _ = future::block_on(executor2.run(handle.wait()));
8153
info!(?id, "lcore stopped.");
8254
})?;
8355

8456
Ok(Lcore {
8557
id,
8658
executor,
87-
trigger: Some(trigger),
59+
shutdown: Some(trigger),
8860
})
8961
}
9062

63+
/// Returns the lcore id.
64+
pub(crate) fn id(&self) -> LcoreId {
65+
self.id
66+
}
67+
9168
/// Spawns an async task and waits for it to complete.
9269
pub(crate) fn block_on<T: Send + 'static>(
9370
&self,
@@ -105,22 +82,30 @@ impl Lcore {
10582

10683
impl Drop for Lcore {
10784
fn drop(&mut self) {
108-
if let Some(trigger) = self.trigger.take() {
85+
if let Some(trigger) = self.shutdown.take() {
10986
debug!(id = ?self.id, "stopping lcore.");
11087
trigger.fire();
11188
}
11289
}
11390
}
11491

92+
/// Lcore not found error.
93+
#[derive(Debug, Error)]
94+
#[error("lcore not found.")]
95+
pub(crate) struct LcoreNotFound;
96+
11597
/// Map to lookup the lcore by the assigned id.
11698
pub(crate) struct LcoreMap(HashMap<usize, Lcore>);
11799

118100
impl LcoreMap {
119101
/// Returns the lcore with the assigned id.
120-
fn get(&self, id: usize) -> Result<&Lcore> {
121-
self.0
122-
.get(&id)
123-
.ok_or_else(|| anyhow!("lcore with id '{}' not found.", id))
102+
pub(crate) fn get(&self, id: usize) -> Result<&Lcore> {
103+
self.0.get(&id).ok_or_else(|| LcoreNotFound.into())
104+
}
105+
106+
/// Returns a lcore iterator.
107+
pub(crate) fn iter(&self) -> impl Iterator<Item = &Lcore> {
108+
self.0.values()
124109
}
125110
}
126111

0 commit comments

Comments
 (0)