diff --git a/Cargo.lock b/Cargo.lock index b01d907..8c40637 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -211,6 +211,15 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "colored" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -523,10 +532,12 @@ dependencies = [ "anyhow", "chrono", "clap", + "colored", "log", "rdkafka", "serde", "serde_json", + "similar", "tabled", "tokio", ] @@ -702,6 +713,12 @@ dependencies = [ "libc", ] +[[package]] +name = "similar" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" + [[package]] name = "slab" version = "0.4.11" diff --git a/Cargo.toml b/Cargo.toml index fd5ffb1..25be55d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,11 @@ edition = "2021" anyhow = "1.0.100" chrono = "0.4.42" clap = { version = "4.5.48", features = ["derive", "env"] } +colored = "3.0.0" log = "0.4.28" rdkafka = { version = "0.38.0", features = ["cmake-build"] } serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.145" +similar = "2.7.0" tabled = "0.20.0" tokio = { version = "1.47.1", features = ["full"] } diff --git a/README.md b/README.md index 767956a..cbbbcc5 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,34 @@ One way to handle messages individually is to maintain a database table that tra This approach allows **selective processing, auditing, and safe reprocessing**. > Note: This is just a conceptual approach; it is not implemented in this project. +## Usage Example +``` +# 1. Check what's in the DLQ +cargo run list-messages dlq-user-events + +# Output: +# | id | reason | retries | original_topic | +# | f6fcc3f0-f911-... | Max retries | 5 | user-events | + +# 2. View the problematic message +cargo run view-message dlq-user-events f6fcc3f0-f911-4083-a30a-bf72780096be + +# 3. Create fixed-payload.json with corrected data +echo '{"userId": "12345", "email": "valid@example.com"}' > fixed-payload.json + +# 4. Fix and republish (shows diff, asks confirmation) +cargo run edit-message dlq-user-events f6fcc3f0-f911-4083-a30a-bf72780096be \ + --payload-file fixed-payload.json + +# Shows diff, confirms, then: +# - Publishes to user-events (original topic) +# - Removes from dlq-user-events (commits offset) + +# 5. Verify it's gone from DLQ +cargo run list-messages dlq-user-events +# (Should be empty or not show that message) +``` + ## Running Locally ### 1. Start Kafka Infrastructure diff --git a/fixed_payload.json b/fixed_payload.json new file mode 100644 index 0000000..6973cd6 --- /dev/null +++ b/fixed_payload.json @@ -0,0 +1,8 @@ +{ + "action": "test", + "data": { + "corrupted": true, + "email": "correct@email.com" + }, + "userId": "user-123" + } \ No newline at end of file diff --git a/send-message.sh b/send-message.sh index 1918182..36bbf8d 100755 --- a/send-message.sh +++ b/send-message.sh @@ -22,7 +22,7 @@ case $TYPE in MSG='{"id":"'$ID'","correlationId":"'$CORR_ID'","metadata":{"source":"test","retryCount":3,"failureReason":"Invalid format"},"payload":{"userId":"user-123","action":"test","data":{"email":"invalid","age":"not-number"}}}' ;; "dlq") - MSG='{"id":"'$ID'","correlationId":"'$CORR_ID'","metadata":{"source":"test","retryCount":5,"failureReason":"Max retries","dlqTopic":"dlq-test","originalTopic":"test-topic","movedToDlqAt":"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"},"payload":{"userId":"user-123","action":"test","data":{"email":"malformed","corrupted":true}}}' + MSG='{"id":"'$ID'","correlationId":"'$CORR_ID'","metadata":{"source":"test","retryCount":5,"failureReason":"Max retries","originalTopic":"user-events","movedToDlqAt":"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"},"payload":{"userId":"user-123","action":"test","data":{"email":"malformed","corrupted":true}}}' ;; *) MSG='{"id":"'$ID'","correlationId":"'$CORR_ID'","metadata":{"source":"test","retryCount":0},"payload":{"userId":"user-123","action":"test","data":{"email":"user@example.com","age":25}}}' diff --git a/src/cli/commands.rs b/src/cli/commands.rs index 13e4228..bf932dc 100644 --- a/src/cli/commands.rs +++ b/src/cli/commands.rs @@ -20,17 +20,27 @@ pub enum Commands { topic: String, message_id: String, }, - /// Edit a message. - EditMessage { - message_id: String, - }, /// Archive a message. ArchiveMessage { topic: String, message_id: String, }, - /// Republish a message to the original topic. + /// Republish a DLQ message to its original topic. + /// + /// Finds a message in the DLQ by ID, optionally replaces its payload with a fixed + /// version from a JSON file, shows a diff preview, then publishes to the original + /// topic and removes the message from the DLQ. RepublishMessage { + /// DLQ topic to read from + topic: String, + /// Message ID or correlation ID to find message_id: String, + /// Path to JSON file with fixed payload (optional) + /// If not provided, republishes the original payload as-is + #[arg(long, value_name = "FILE", help = "Path to fixed payload file")] + payload_file: Option, + /// Preview changes without publishing (dry run) (default: false) + #[arg(long, default_value = "false")] + dry_run: bool, }, } \ No newline at end of file diff --git a/src/cli/mod.rs b/src/cli/mod.rs index 2547b4c..68380aa 100644 --- a/src/cli/mod.rs +++ b/src/cli/mod.rs @@ -4,10 +4,12 @@ pub mod list_messages; pub mod view_message_by_id; pub mod archive_message; pub mod model; +pub mod republish_message; pub use commands::*; pub use list_topics::*; pub use list_messages::*; pub use view_message_by_id::*; pub use archive_message::*; -pub use model::*; \ No newline at end of file +pub use model::*; +pub use republish_message::*; \ No newline at end of file diff --git a/src/cli/model.rs b/src/cli/model.rs index 84195ec..beda16f 100644 --- a/src/cli/model.rs +++ b/src/cli/model.rs @@ -1,3 +1,4 @@ +use serde::{Deserialize, Serialize}; use tabled::Tabled; // DLQ message field names @@ -13,7 +14,7 @@ pub const METADATA_RETRY_COUNT: &str = "retryCount"; pub const METADATA_ORIGINAL_TOPIC: &str = "originalTopic"; pub const METADATA_MOVED_TO_DLQ_AT: &str = "movedToDlqAt"; -#[derive(Tabled)] +#[derive(Tabled, Serialize, Deserialize)] pub struct DlqMessage { pub payload: String, pub metadata: String, diff --git a/src/cli/republish_message.rs b/src/cli/republish_message.rs new file mode 100644 index 0000000..70585fe --- /dev/null +++ b/src/cli/republish_message.rs @@ -0,0 +1,404 @@ +use std::{ + io::{self, Write}, + path::Path, + time::Duration, +}; + +use anyhow::{anyhow, Context}; +use colored::Colorize; +use log::{info, warn}; +use rdkafka::{ + config::RDKafkaLogLevel, + consumer::{CommitMode, Consumer}, + message::{BorrowedMessage, Headers, OwnedHeaders}, + producer::{FutureProducer, FutureRecord}, + ClientConfig, Message, +}; +use serde_json::from_str; +use similar::{ChangeTag, TextDiff}; + +use crate::{ + cli::DlqMessage, + kafka::{ + CustomContext, LoggingConsumer, AUTO_OFFSET_RESET, BOOTSTRAP_SERVERS, ENABLE_AUTO_COMMIT, + ENABLE_PARTITION_EOF, GROUP_ID, MESSAGE_HEADER_CORRELATION_ID, MESSAGE_HEADER_MESSAGE_ID, + MESSAGE_HEADER_REPUBLISHED_AT, MESSAGE_TIMEOUT_MS, SESSION_TIMEOUT_MS, TIMEOUT_MS, + }, +}; + +pub async fn republish_message( + brokers: &str, + group_id: &str, + dlq_topic: &str, + message_id: &str, + payload_file: Option<&Path>, + dry_run: bool, +) -> Result<(), anyhow::Error> { + // find message in dlq + let context = CustomContext; + let mut config = ClientConfig::new(); + + config + .set(GROUP_ID, group_id) + .set(BOOTSTRAP_SERVERS, brokers) + .set(ENABLE_PARTITION_EOF, "true") // Enable EOF detection + .set(AUTO_OFFSET_RESET, "earliest") // Start from beginning + .set(ENABLE_AUTO_COMMIT, "false") // Don't auto-commit + .set(SESSION_TIMEOUT_MS, TIMEOUT_MS) + .set_log_level(RDKafkaLogLevel::Debug); + + let consumer: LoggingConsumer = config + .create_with_context(context) + .context("Consumer creation failed")?; + + let topics = &[dlq_topic]; + consumer + .subscribe(topics) + .map_err(|e| anyhow!("Failed to subscribe to topic: {}", e))?; + + let mut dlq_message: Option = None; + let mut borrowed_message: Option = None; + loop { + match consumer.recv().await { + Err(e) => { + if let rdkafka::error::KafkaError::PartitionEOF(_) = e { + info!("Reached end of partition (EOF)"); + break; + } else { + warn!("Kafka error: {}", e); + break; // Exit on other errors + } + } + Ok(m) => { + let payload = match m.payload_view::() { + None => { + warn!("No payload found in message, skipping"); + continue; + } + Some(Ok(s)) => s, + Some(Err(e)) => { + warn!("Error while deserializing message payload: {:?}", e); + continue; + } + }; + + let json: serde_json::Value = match from_str(payload) { + Ok(j) => j, + Err(e) => { + warn!("Failed to parse message as JSON: {}, skipping", e); + continue; + } + }; + + let item = DlqMessage::parse(json); + if item.id.eq(message_id) || item.correlation_id.eq(message_id) { + dlq_message = Some(item); + borrowed_message = Some(m); + break; + } + } + }; + } + + let dlq_message = dlq_message.with_context(|| "Message not found")?; + let original_topic = dlq_message.original_topic.as_str(); + let mut message_json = + serde_json::to_value(&dlq_message).context("Failed to convert DLQ message to JSON")?; + + let mut headers = OwnedHeaders::new(); + headers = headers + .insert(rdkafka::message::Header { + key: MESSAGE_HEADER_MESSAGE_ID, + value: Some(dlq_message.id.as_bytes()), + }) + .insert(rdkafka::message::Header { + key: MESSAGE_HEADER_REPUBLISHED_AT, + value: Some(chrono::Utc::now().to_rfc3339().as_bytes()), + }); + + if !dlq_message.correlation_id.is_empty() { + headers = headers.insert(rdkafka::message::Header { + key: MESSAGE_HEADER_CORRELATION_ID, + value: Some(dlq_message.correlation_id.as_bytes()), + }); + } + + // handle payload replacement if file is provided + let mut new_payload: Option = None; + if let Some(file_path) = payload_file { + let payload = read_payload_from_file(file_path)?; + let payload = serde_json::to_string_pretty(&payload) + .context("Failed to convert new payload to JSON")?; + + new_payload = Some(payload.clone()); + + display_diff_and_plan( + &dlq_message, + &dlq_message.payload.to_string(), + &payload, + dlq_topic, + file_path, + dry_run, + &headers, + )?; + } else { + display_republish_info(&dlq_message, dlq_topic, dry_run, &headers)?; + } + + if dry_run { + print_dry_run_info(dlq_topic, original_topic); + return Ok(()); + } + + if !confirm_action()? { + println!("Operation cancelled."); + return Ok(()); + } + + // publish to original topic + println!("\nPublishing to {}...", original_topic); + message_json["payload"] = + serde_json::to_value(new_payload).context("Failed to convert new payload to JSON")?; + + publish_message(brokers, original_topic, &message_json, message_id, headers).await?; + println!("✅ Message published successfully"); + + // commit DLQ offset + if let Some(borrowed_message) = borrowed_message { + consumer + .commit_message(&borrowed_message, CommitMode::Sync) + .context("Failed to commit DLQ offset")?; + println!("✅ DLQ message committed (removed from {})", dlq_topic); + } + + println!("\nDone!"); + Ok(()) +} + +/// Read and parse payload from JSON file +fn read_payload_from_file(path: &Path) -> Result { + let content = std::fs::read_to_string(path) + .context(format!("Failed to read file: {}", path.display()))?; + + let payload: serde_json::Value = serde_json::from_str(&content) + .context(format!("Invalid JSON in file: {}", path.display()))?; + + Ok(payload) +} + +/// Display diff when payload is being changed +fn display_diff_and_plan( + dlq_message: &DlqMessage, + old_payload: &str, + new_payload: &str, + dlq_topic: &str, + payload_file: &Path, + dry_run: bool, + headers: &OwnedHeaders, +) -> Result<(), anyhow::Error> { + let original_topic = dlq_message.original_topic.as_str(); + + println!("\n{}", "=".repeat(65)); + println!("Found message in DLQ: {}", dlq_topic.cyan()); + if dry_run { + println!("{}", "[DRY RUN MODE]".yellow().bold()); + } + println!("{}", "=".repeat(65)); + + println!("\n{}", "MESSAGE METADATA:".bold()); + println!(" ID: {}", dlq_message.id); + println!(" Correlation ID: {}", dlq_message.correlation_id); + println!(" Failure Reason: {}", dlq_message.reason); + println!(" Retry Count: {}", dlq_message.retries); + println!(" Original Topic: {}", dlq_message.original_topic.green()); + println!(" Moved to DLQ: {}", dlq_message.moved_at); + + println!("\n{}", "MESSAGE HEADERS (will be republished as-is):".bold().yellow()); + for header in headers.iter() { + println!(" {}: {}", header.key, String::from_utf8_lossy(header.value.unwrap_or_default())); + } + + println!("\n{}", "=".repeat(65)); + println!("{}", "PAYLOAD COMPARISON".bold()); + println!("{}", "=".repeat(65)); + + println!("\n{}", "BEFORE (Current in DLQ):".yellow()); + println!("{}", old_payload); + + println!( + "\n{}", + format!("AFTER (From file: {}):", payload_file.display()).green() + ); + println!("{}", new_payload); + + println!("\n{}", "DIFF:".cyan()); + print_diff(old_payload, new_payload); + + println!("\n{}", "=".repeat(65)); + println!("{}", "PLANNED ACTION:".bold()); + if dry_run { + println!( + " {} {} Publish fixed message to: {}", + "[DRY RUN]".yellow(), + "→".green(), + original_topic.green() + ); + println!( + " {} {} Remove from DLQ: {}", + "[DRY RUN]".yellow(), + "→".yellow(), + dlq_topic.yellow() + ); + } else { + println!( + " {} Publish fixed message to: {}", + "→".green(), + original_topic.green() + ); + println!( + " {} Remove from DLQ: {} (commit offset)", + "→".yellow(), + dlq_topic.yellow() + ); + } + println!("{}", "=".repeat(65)); + + Ok(()) +} + +/// Display info when republishing without changes +fn display_republish_info( + dlq_message: &DlqMessage, + dlq_topic: &str, + dry_run: bool, + headers: &OwnedHeaders, +) -> Result<(), anyhow::Error> { + let original_topic = dlq_message.original_topic.as_str(); + + println!("\n{}", "=".repeat(65)); + println!("Found message in DLQ: {}", dlq_topic.cyan()); + if dry_run { + println!("{}", "[DRY RUN MODE]".yellow().bold()); + } + println!("{}", "=".repeat(65)); + + println!("\n{}", "MESSAGE METADATA:".bold()); + println!(" ID: {}", dlq_message.id); + println!(" Correlation ID: {}", dlq_message.correlation_id); + println!(" Failure Reason: {}", dlq_message.reason); + println!(" Retry Count: {}", dlq_message.retries); + println!(" Original Topic: {}", dlq_message.original_topic.green()); + println!(" Moved to DLQ: {}", dlq_message.moved_at); + + println!("\n{}", "MESSAGE HEADERS (will be republished as-is):".yellow()); + for header in headers.iter() { + println!(" {}: {}", header.key, String::from_utf8_lossy(header.value.unwrap_or_default())); + } + + println!("\n{}", "PAYLOAD (will be republished as-is):".yellow()); + println!("{}", dlq_message.payload); + + println!("\n{}", "=".repeat(65)); + println!("{}", "PLANNED ACTION:".bold()); + if dry_run { + println!( + " {} {} Republish message to: {}", + "[DRY RUN]".yellow(), + "→".green(), + original_topic.green() + ); + println!( + " {} {} Remove from DLQ: {}", + "[DRY RUN]".yellow(), + "→".yellow(), + dlq_topic.yellow() + ); + } else { + println!( + " {} Republish message to: {}", + "→".green(), + original_topic.green() + ); + println!(" {} Remove from DLQ: {}", "→".yellow(), dlq_topic.yellow()); + } + println!("{}", "=".repeat(65)); + + Ok(()) +} + +fn print_diff(old: &str, new: &str) { + let diff = TextDiff::from_lines(old, new); + for change in diff.iter_all_changes() { + let (sign, line) = match change.tag() { + ChangeTag::Delete => ("-", format!("{}", change).red()), + ChangeTag::Insert => ("+", format!("{}", change).green()), + ChangeTag::Equal => (" ", format!("{}", change).normal()), + }; + print!("{} {}", sign, line); + } +} + +fn print_dry_run_info(dlq_topic: &str, original_topic: &str) { + println!("\n{}", "=".repeat(65)); + println!( + "{}", + "🔍 DRY RUN MODE - No changes will be made".yellow().bold() + ); + println!("{}", "=".repeat(65)); + println!("\n{}", "What would happen:".cyan()); + println!( + " {} Message would be published to: {}", + "→".green(), + original_topic.green() + ); + println!( + " {} DLQ offset would be committed (message removed from {})", + "→".yellow(), + dlq_topic.yellow() + ); + println!( + "\n{}", + "To actually perform this operation, run without --dry-run".dimmed() + ); +} + +/// Publish message to Kafka topic +async fn publish_message( + brokers: &str, + topic: &str, + message: &serde_json::Value, + key: &str, + headers: OwnedHeaders, +) -> Result<(), anyhow::Error> { + let producer: FutureProducer = ClientConfig::new() + .set(BOOTSTRAP_SERVERS, brokers) + .set(MESSAGE_TIMEOUT_MS, TIMEOUT_MS) + .create() + .context("Producer creation failed")?; + + let payload = serde_json::to_vec(message).context("Failed to serialize message")?; + + producer + .send( + FutureRecord::to(topic) + .payload(&payload) + .key(key) + .headers(headers), + Duration::from_secs(0), + ) + .await + .map_err(|(err, _)| anyhow!("Failed to publish: {}", err))?; + + Ok(()) +} + +/// Ask user for confirmation +fn confirm_action() -> Result { + print!("\n{} ", "Proceed with this change? [y/N]:".yellow().bold()); + io::stdout().flush()?; + + let mut input = String::new(); + io::stdin().read_line(&mut input)?; + + Ok(input.trim().eq_ignore_ascii_case("y")) +} diff --git a/src/kafka/config.rs b/src/kafka/config.rs index 3826373..489f0e6 100644 --- a/src/kafka/config.rs +++ b/src/kafka/config.rs @@ -8,4 +8,9 @@ pub const SESSION_TIMEOUT_MS: &str = "session.timeout.ms"; pub const MESSAGE_TIMEOUT_MS: &str = "message.timeout.ms"; // Timeout value (in milliseconds) -pub const TIMEOUT_MS: &str = "6000"; // 6 seconds \ No newline at end of file +pub const TIMEOUT_MS: &str = "6000"; // 6 seconds + +// Kafka message headers +pub const MESSAGE_HEADER_MESSAGE_ID: &str = "id"; +pub const MESSAGE_HEADER_CORRELATION_ID: &str = "correlation_id"; +pub const MESSAGE_HEADER_REPUBLISHED_AT: &str = "republished_at"; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index a69b74e..31eaeac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,11 +38,6 @@ async fn main() { std::process::exit(1); } }, - Some(Commands::EditMessage { - message_id, - }) => { - todo!("{}", message_id); - }, Some(Commands::ArchiveMessage { topic, message_id, @@ -53,9 +48,22 @@ async fn main() { } }, Some(Commands::RepublishMessage { + topic, message_id, + payload_file, + dry_run, }) => { - todo!("{}", message_id); + if let Err(e) = republish_message( + brokers, + group_id, + &topic, + &message_id, + payload_file.as_deref(), + dry_run, + ).await { + eprintln!("Error republishing message: {}", e); + std::process::exit(1); + } }, None => { println!("Run with --help to see instructions"); diff --git a/src/models/mod.rs b/src/models/mod.rs new file mode 100644 index 0000000..e69de29