Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ edition = "2021"
anyhow = "1.0.100"
chrono = "0.4.42"
clap = { version = "4.5.48", features = ["derive", "env"] }
colored = "3.0.0"
log = "0.4.28"
rdkafka = { version = "0.38.0", features = ["cmake-build"] }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.145"
similar = "2.7.0"
tabled = "0.20.0"
tokio = { version = "1.47.1", features = ["full"] }
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,34 @@ One way to handle messages individually is to maintain a database table that tra
This approach allows **selective processing, auditing, and safe reprocessing**.
> Note: This is just a conceptual approach; it is not implemented in this project.

## Usage Example
```
# 1. Check what's in the DLQ
cargo run list-messages dlq-user-events

# Output:
# | id | reason | retries | original_topic |
# | f6fcc3f0-f911-... | Max retries | 5 | user-events |

# 2. View the problematic message
cargo run view-message dlq-user-events f6fcc3f0-f911-4083-a30a-bf72780096be

# 3. Create fixed-payload.json with corrected data
echo '{"userId": "12345", "email": "valid@example.com"}' > fixed-payload.json

# 4. Fix and republish (shows diff, asks confirmation)
cargo run edit-message dlq-user-events f6fcc3f0-f911-4083-a30a-bf72780096be \
--payload-file fixed-payload.json

# Shows diff, confirms, then:
# - Publishes to user-events (original topic)
# - Removes from dlq-user-events (commits offset)

# 5. Verify it's gone from DLQ
cargo run list-messages dlq-user-events
# (Should be empty or not show that message)
```

## Running Locally

### 1. Start Kafka Infrastructure
Expand Down
8 changes: 8 additions & 0 deletions fixed_payload.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"action": "test",
"data": {
"corrupted": true,
"email": "correct@email.com"
},
"userId": "user-123"
}
2 changes: 1 addition & 1 deletion send-message.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ case $TYPE in
MSG='{"id":"'$ID'","correlationId":"'$CORR_ID'","metadata":{"source":"test","retryCount":3,"failureReason":"Invalid format"},"payload":{"userId":"user-123","action":"test","data":{"email":"invalid","age":"not-number"}}}'
;;
"dlq")
MSG='{"id":"'$ID'","correlationId":"'$CORR_ID'","metadata":{"source":"test","retryCount":5,"failureReason":"Max retries","dlqTopic":"dlq-test","originalTopic":"test-topic","movedToDlqAt":"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"},"payload":{"userId":"user-123","action":"test","data":{"email":"malformed","corrupted":true}}}'
MSG='{"id":"'$ID'","correlationId":"'$CORR_ID'","metadata":{"source":"test","retryCount":5,"failureReason":"Max retries","originalTopic":"user-events","movedToDlqAt":"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"},"payload":{"userId":"user-123","action":"test","data":{"email":"malformed","corrupted":true}}}'
;;
*)
MSG='{"id":"'$ID'","correlationId":"'$CORR_ID'","metadata":{"source":"test","retryCount":0},"payload":{"userId":"user-123","action":"test","data":{"email":"user@example.com","age":25}}}'
Expand Down
20 changes: 15 additions & 5 deletions src/cli/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,27 @@ pub enum Commands {
topic: String,
message_id: String,
},
/// Edit a message.
EditMessage {
message_id: String,
},
/// Archive a message.
ArchiveMessage {
topic: String,
message_id: String,
},
/// Republish a message to the original topic.
/// Republish a DLQ message to its original topic.
///
/// Finds a message in the DLQ by ID, optionally replaces its payload with a fixed
/// version from a JSON file, shows a diff preview, then publishes to the original
/// topic and removes the message from the DLQ.
RepublishMessage {
/// DLQ topic to read from
topic: String,
/// Message ID or correlation ID to find
message_id: String,
/// Path to JSON file with fixed payload (optional)
/// If not provided, republishes the original payload as-is
#[arg(long, value_name = "FILE", help = "Path to fixed payload file")]
payload_file: Option<std::path::PathBuf>,
/// Preview changes without publishing (dry run) (default: false)
#[arg(long, default_value = "false")]
dry_run: bool,
},
}
4 changes: 3 additions & 1 deletion src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ pub mod list_messages;
pub mod view_message_by_id;
pub mod archive_message;
pub mod model;
pub mod republish_message;

pub use commands::*;
pub use list_topics::*;
pub use list_messages::*;
pub use view_message_by_id::*;
pub use archive_message::*;
pub use model::*;
pub use model::*;
pub use republish_message::*;
3 changes: 2 additions & 1 deletion src/cli/model.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use serde::{Deserialize, Serialize};
use tabled::Tabled;

// DLQ message field names
Expand All @@ -13,7 +14,7 @@ pub const METADATA_RETRY_COUNT: &str = "retryCount";
pub const METADATA_ORIGINAL_TOPIC: &str = "originalTopic";
pub const METADATA_MOVED_TO_DLQ_AT: &str = "movedToDlqAt";

#[derive(Tabled)]
#[derive(Tabled, Serialize, Deserialize)]
pub struct DlqMessage {
pub payload: String,
pub metadata: String,
Expand Down
Loading