Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ documentation = "https://docs.rs/linux-perf-data/"
repository = "https://github.com/mstange/linux-perf-data/"
exclude = ["/.github", "/.vscode", "/tests"]

[features]
default = ["zstd"]
zstd = ["zstd-safe"]

[dependencies]
byteorder = "1.4.3"
memchr = "2.4.1"
Expand All @@ -21,6 +25,7 @@ linux-perf-event-reader = "0.10.0"
linear-map = "1.2.0"
prost = { version = "0.14", default-features = false, features = ["std"] }
prost-derive = "0.14"
zstd-safe = { version = "7.2", optional = true }

[dev-dependencies]
yaxpeax-arch = { version = "0.3", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub const PERF_RECORD_EVENT_UPDATE: u32 = 78;
pub const PERF_RECORD_TIME_CONV: u32 = 79;
pub const PERF_RECORD_HEADER_FEATURE: u32 = 80;
pub const PERF_RECORD_COMPRESSED: u32 = 81;
pub const PERF_RECORD_COMPRESSED2: u32 = 83;

// pub const SIMPLE_PERF_RECORD_TYPE_START: u32 = 32768;

Expand Down
69 changes: 69 additions & 0 deletions src/decompression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use zstd_safe::{DCtx, InBuffer, OutBuffer};

/// A zstd decompressor for PERF_RECORD_COMPRESSED records.
pub struct ZstdDecompressor {
dctx: Option<DCtx<'static>>,
/// Buffer for partial perf records that span multiple compressed chunks
partial_record_buffer: Vec<u8>,
}

impl Default for ZstdDecompressor {
fn default() -> Self {
Self::new()
}
}

impl ZstdDecompressor {
pub fn new() -> Self {
Self {
dctx: None,
partial_record_buffer: Vec::new(),
}
}

/// Decompress a chunk of zstd data.
pub fn decompress(&mut self, compressed_data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
let dctx = self.dctx.get_or_insert_with(DCtx::create);

let mut decompressed = vec![0; compressed_data.len() * 4];
let mut in_buffer = InBuffer::around(compressed_data);
let mut total_out = 0;

while in_buffer.pos < in_buffer.src.len() {
let available = decompressed.len() - total_out;
let mut out_buffer = OutBuffer::around(&mut decompressed[total_out..]);

match dctx.decompress_stream(&mut out_buffer, &mut in_buffer) {
Ok(_) => {
total_out += out_buffer.pos();
if out_buffer.pos() == available {
decompressed.resize(decompressed.len() + compressed_data.len() * 4, 0);
}
}
Err(code) => {
let error_name = zstd_safe::get_error_name(code);
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Zstd decompression failed: {}", error_name),
));
}
}
}

decompressed.truncate(total_out);

// Prepend any partial record data from the previous chunk
if !self.partial_record_buffer.is_empty() {
let mut combined = std::mem::take(&mut self.partial_record_buffer);
combined.extend_from_slice(&decompressed);
decompressed = combined;
}

Ok(decompressed)
}

/// Save partial record data that spans to the next compressed chunk.
pub fn save_partial_record(&mut self, data: &[u8]) {
self.partial_record_buffer = data.to_vec();
}
}
34 changes: 34 additions & 0 deletions src/feature_sections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,40 @@ impl SampleTimeRange {
}
}

/// Information about compression used in the perf.data file.
#[derive(Debug, Clone, Copy)]
pub struct CompressionInfo {
pub version: u32,
/// Compression algorithm type. 1 = Zstd
pub type_: u32,
/// Compression level (e.g., 1-22 for Zstd)
pub level: u32,
/// Compression ratio achieved
pub ratio: u32,
/// mmap buffer size
pub mmap_len: u32,
}

impl CompressionInfo {
pub const STRUCT_SIZE: usize = 4 + 4 + 4 + 4 + 4;
pub const ZSTD_TYPE: u32 = 1;

pub fn parse<R: Read, T: ByteOrder>(mut reader: R) -> Result<Self, std::io::Error> {
let version = reader.read_u32::<T>()?;
let type_ = reader.read_u32::<T>()?;
let level = reader.read_u32::<T>()?;
let ratio = reader.read_u32::<T>()?;
let mmap_len = reader.read_u32::<T>()?;
Ok(Self {
version,
type_,
level,
ratio,
mmap_len,
})
}
}

pub struct HeaderString;

impl HeaderString {
Expand Down
133 changes: 129 additions & 4 deletions src/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use linux_perf_event_reader::{
use std::collections::{HashMap, VecDeque};
use std::io::{Cursor, Read, Seek, SeekFrom};

#[cfg(feature = "zstd")]
use crate::decompression::ZstdDecompressor;

use super::error::{Error, ReadError};
use super::feature_sections::AttributeDescription;
use super::features::Feature;
Expand Down Expand Up @@ -196,6 +199,8 @@ impl<C: Read + Seek> PerfFileReader<C> {
buffers_for_recycling: VecDeque::new(),
current_event_body: Vec::new(),
pending_first_record: None,
#[cfg(feature = "zstd")]
zstd_decompressor: ZstdDecompressor::new(),
};

Ok(Self {
Expand Down Expand Up @@ -366,6 +371,8 @@ impl<R: Read> PerfFileReader<R> {
buffers_for_recycling: VecDeque::new(),
current_event_body: Vec::new(),
pending_first_record,
#[cfg(feature = "zstd")]
zstd_decompressor: ZstdDecompressor::new(),
};

Ok(Self {
Expand All @@ -391,6 +398,9 @@ pub struct PerfRecordIter<R: Read> {
buffers_for_recycling: VecDeque<Vec<u8>>,
/// For pipe mode: the first non-metadata record that was read during initialization
pending_first_record: Option<(PerfEventHeader, Vec<u8>)>,
/// Zstd decompressor for handling COMPRESSED records
#[cfg(feature = "zstd")]
zstd_decompressor: ZstdDecompressor,
}

impl<R: Read> PerfRecordIter<R> {
Expand Down Expand Up @@ -459,9 +469,9 @@ impl<R: Read> PerfRecordIter<R> {
}
self.read_offset += u64::from(header.size);

if UserRecordType::try_from(RecordType(header.type_))
== Some(UserRecordType::PERF_FINISHED_ROUND)
{
let user_record_type = UserRecordType::try_from(RecordType(header.type_));

if user_record_type == Some(UserRecordType::PERF_FINISHED_ROUND) {
self.sorter.finish_round();
if self.sorter.has_more() {
// The sorter is non-empty. We're done.
Expand All @@ -476,7 +486,6 @@ impl<R: Read> PerfRecordIter<R> {
let event_body_len = size - PerfEventHeader::STRUCT_SIZE;
let mut buffer = self.buffers_for_recycling.pop_front().unwrap_or_default();
buffer.resize(event_body_len, 0);

// Try to read the event body. For pipe mode, EOF here also means end of stream.
match self.reader.read_exact(&mut buffer) {
Ok(()) => {}
Expand All @@ -491,6 +500,34 @@ impl<R: Read> PerfRecordIter<R> {
}
}

if user_record_type == Some(UserRecordType::PERF_COMPRESSED) {
#[cfg(not(feature = "zstd"))]
{
return Err(Error::IoError(std::io::Error::new(std::io::ErrorKind::Unsupported,
"Compression support is not enabled. Please rebuild with the 'zstd' feature flag.",
)));
}
#[cfg(feature = "zstd")]
{
self.decompress_and_process_compressed::<T>(&buffer)?;
continue;
}
}

if user_record_type == Some(UserRecordType::PERF_COMPRESSED2) {
#[cfg(not(feature = "zstd"))]
{
return Err(Error::IoError(std::io::Error::new(std::io::ErrorKind::Unsupported,
"Compression support is not enabled. Please rebuild with the 'zstd' feature flag.",
)));
}
#[cfg(feature = "zstd")]
{
self.decompress_and_process_compressed2::<T>(&buffer)?;
continue;
}
}

self.process_record::<T>(header, buffer, offset)?;
}

Expand Down Expand Up @@ -542,7 +579,95 @@ impl<R: Read> PerfRecordIter<R> {
attr_index,
};
self.sorter.insert_unordered(sort_key, pending_record);
Ok(())
}

/// Decompresses a PERF_RECORD_COMPRESSED record and processes its sub-records.
///
/// PERF_RECORD_COMPRESSED (type 81) was introduced in Linux 5.2 (2019).
/// Format: header (8 bytes) + compressed data (header.size - 8 bytes)
/// The compressed data size is implicit from the header size.
#[cfg(feature = "zstd")]
fn decompress_and_process_compressed<T: ByteOrder>(
&mut self,
buffer: &[u8],
) -> Result<(), Error> {
// For COMPRESSED, the entire buffer is compressed data
// (no data_size field - size is implicit from header.size)
let compressed_data = buffer;

let decompressed = self.zstd_decompressor.decompress(compressed_data)?;
self.process_decompressed_records::<T>(&decompressed)
}

/// Decompresses a PERF_RECORD_COMPRESSED2 record and processes its sub-records.
///
/// PERF_RECORD_COMPRESSED2 (type 83) was introduced in Linux 6.x (May 2025)
/// to fix 8-byte alignment issues with the original format.
/// Format: header (8 bytes) + data_size (8 bytes) + compressed data + padding
/// The header.size includes padding for 8-byte alignment; data_size has the actual size.
#[cfg(feature = "zstd")]
fn decompress_and_process_compressed2<T: ByteOrder>(
&mut self,
buffer: &[u8],
) -> Result<(), Error> {
if buffer.len() < 8 {
return Err(ReadError::PerfEventData.into());
}
let data_size = T::read_u64(&buffer[0..8]) as usize;
if data_size > buffer.len() - 8 {
return Err(ReadError::PerfEventData.into());
}
let compressed_data = &buffer[8..8 + data_size];

let decompressed = self.zstd_decompressor.decompress(compressed_data)?;
self.process_decompressed_records::<T>(&decompressed)
}

/// Processes decompressed data as a sequence of perf records.
/// Shared by both COMPRESSED and COMPRESSED2 handlers.
#[cfg(feature = "zstd")]
fn process_decompressed_records<T: ByteOrder>(
&mut self,
decompressed: &[u8],
) -> Result<(), Error> {
let mut cursor = Cursor::new(decompressed);
let mut offset = 0u64;

while (cursor.position() as usize) < decompressed.len() {
let header_start = cursor.position() as usize;
// Check if we have enough bytes for a header
let remaining = decompressed.len() - header_start;
if remaining < PerfEventHeader::STRUCT_SIZE {
self.zstd_decompressor
.save_partial_record(&decompressed[header_start..]);
break;
}

let sub_header = PerfEventHeader::parse::<_, T>(&mut cursor)?;
let sub_size = sub_header.size as usize;
if sub_size < PerfEventHeader::STRUCT_SIZE {
return Err(Error::InvalidPerfEventSize);
}

let sub_event_body_len = sub_size - PerfEventHeader::STRUCT_SIZE;
// Check if we have enough bytes for the sub-record body
let remaining_after_header = decompressed.len() - cursor.position() as usize;
if sub_event_body_len > remaining_after_header {
self.zstd_decompressor
.save_partial_record(&decompressed[header_start..]);
break;
}

let mut sub_buffer = self.buffers_for_recycling.pop_front().unwrap_or_default();
sub_buffer.resize(sub_event_body_len, 0);
cursor
.read_exact(&mut sub_buffer)
.map_err(|_| ReadError::PerfEventData)?;

self.process_record::<T>(sub_header, sub_buffer, offset)?;
offset += sub_size as u64;
}
Ok(())
}

Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@

mod build_id_event;
mod constants;
#[cfg(feature = "zstd")]
mod decompression;
mod dso_info;
mod dso_key;
mod error;
Expand Down Expand Up @@ -91,7 +93,7 @@ pub use linux_perf_event_reader::Endianness;
pub use dso_info::DsoInfo;
pub use dso_key::DsoKey;
pub use error::{Error, ReadError};
pub use feature_sections::{AttributeDescription, NrCpus, SampleTimeRange};
pub use feature_sections::{AttributeDescription, CompressionInfo, NrCpus, SampleTimeRange};
pub use features::{Feature, FeatureSet, FeatureSetIter};
pub use file_reader::{PerfFileReader, PerfRecordIter};
pub use perf_file::PerfFile;
Expand Down
14 changes: 13 additions & 1 deletion src/perf_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::dso_info::DsoInfo;
use super::dso_key::DsoKey;
use super::error::Error;
use super::feature_sections::{
AttributeDescription, ClockData, NrCpus, PmuMappings, SampleTimeRange,
AttributeDescription, ClockData, CompressionInfo, NrCpus, PmuMappings, SampleTimeRange,
};
use super::features::{Feature, FeatureSet};
use super::simpleperf;
Expand Down Expand Up @@ -213,6 +213,18 @@ impl PerfFile {
.transpose()
}

/// Information about compression used in the perf.data file
pub fn compression_info(&self) -> Result<Option<CompressionInfo>, Error> {
self.feature_section_data(Feature::COMPRESSED)
.map(|section| {
Ok(match self.endian {
Endianness::LittleEndian => CompressionInfo::parse::<_, LittleEndian>(section),
Endianness::BigEndian => CompressionInfo::parse::<_, BigEndian>(section),
}?)
})
.transpose()
}

/// The meta info map, if this is a Simpleperf profile.
pub fn simpleperf_meta_info(&self) -> Result<Option<HashMap<&str, &str>>, Error> {
match self.feature_section_data(Feature::SIMPLEPERF_META_INFO) {
Expand Down
2 changes: 2 additions & 0 deletions src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl UserRecordType {
pub const PERF_TIME_CONV: Self = Self(RecordType(PERF_RECORD_TIME_CONV));
pub const PERF_HEADER_FEATURE: Self = Self(RecordType(PERF_RECORD_HEADER_FEATURE));
pub const PERF_COMPRESSED: Self = Self(RecordType(PERF_RECORD_COMPRESSED));
pub const PERF_COMPRESSED2: Self = Self(RecordType(PERF_RECORD_COMPRESSED2));

pub const SIMPLEPERF_KERNEL_SYMBOL: Self = Self(RecordType(SIMPLE_PERF_RECORD_KERNEL_SYMBOL));
pub const SIMPLEPERF_DSO: Self = Self(RecordType(SIMPLE_PERF_RECORD_DSO));
Expand Down Expand Up @@ -107,6 +108,7 @@ impl std::fmt::Debug for UserRecordType {
Self::PERF_TIME_CONV => "PERF_TIME_CONV".fmt(f),
Self::PERF_HEADER_FEATURE => "PERF_HEADER_FEATURE".fmt(f),
Self::PERF_COMPRESSED => "PERF_COMPRESSED".fmt(f),
Self::PERF_COMPRESSED2 => "PERF_COMPRESSED2".fmt(f),
Self::SIMPLEPERF_KERNEL_SYMBOL => "SIMPLEPERF_KERNEL_SYMBOL".fmt(f),
Self::SIMPLEPERF_DSO => "SIMPLEPERF_DSO".fmt(f),
Self::SIMPLEPERF_SYMBOL => "SIMPLEPERF_SYMBOL".fmt(f),
Expand Down
Loading