Skip to content

Commit 3e40ff0

Browse files
authored
Merge pull request #3 from tedkimdev/DLQ-3
DLQ-3: Add Republish Message Feature with Payload Editing and Diff Preview
2 parents 24bbd3e + 31cf45d commit 3e40ff0

File tree

12 files changed

+500
-15
lines changed

12 files changed

+500
-15
lines changed

Cargo.lock

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ edition = "2021"
77
anyhow = "1.0.100"
88
chrono = "0.4.42"
99
clap = { version = "4.5.48", features = ["derive", "env"] }
10+
colored = "3.0.0"
1011
log = "0.4.28"
1112
rdkafka = { version = "0.38.0", features = ["cmake-build"] }
1213
serde = { version = "1.0.228", features = ["derive"] }
1314
serde_json = "1.0.145"
15+
similar = "2.7.0"
1416
tabled = "0.20.0"
1517
tokio = { version = "1.47.1", features = ["full"] }

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,34 @@ One way to handle messages individually is to maintain a database table that tra
5454
This approach allows **selective processing, auditing, and safe reprocessing**.
5555
> Note: This is just a conceptual approach; it is not implemented in this project.
5656
57+
## Usage Example
58+
```
59+
# 1. Check what's in the DLQ
60+
cargo run list-messages dlq-user-events
61+
62+
# Output:
63+
# | id | reason | retries | original_topic |
64+
# | f6fcc3f0-f911-... | Max retries | 5 | user-events |
65+
66+
# 2. View the problematic message
67+
cargo run view-message dlq-user-events f6fcc3f0-f911-4083-a30a-bf72780096be
68+
69+
# 3. Create fixed-payload.json with corrected data
70+
echo '{"userId": "12345", "email": "valid@example.com"}' > fixed-payload.json
71+
72+
# 4. Fix and republish (shows diff, asks confirmation)
73+
cargo run edit-message dlq-user-events f6fcc3f0-f911-4083-a30a-bf72780096be \
74+
--payload-file fixed-payload.json
75+
76+
# Shows diff, confirms, then:
77+
# - Publishes to user-events (original topic)
78+
# - Removes from dlq-user-events (commits offset)
79+
80+
# 5. Verify it's gone from DLQ
81+
cargo run list-messages dlq-user-events
82+
# (Should be empty or not show that message)
83+
```
84+
5785
## Running Locally
5886

5987
### 1. Start Kafka Infrastructure

fixed_payload.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"action": "test",
3+
"data": {
4+
"corrupted": true,
5+
"email": "correct@email.com"
6+
},
7+
"userId": "user-123"
8+
}

send-message.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ case $TYPE in
2222
MSG='{"id":"'$ID'","correlationId":"'$CORR_ID'","metadata":{"source":"test","retryCount":3,"failureReason":"Invalid format"},"payload":{"userId":"user-123","action":"test","data":{"email":"invalid","age":"not-number"}}}'
2323
;;
2424
"dlq")
25-
MSG='{"id":"'$ID'","correlationId":"'$CORR_ID'","metadata":{"source":"test","retryCount":5,"failureReason":"Max retries","dlqTopic":"dlq-test","originalTopic":"test-topic","movedToDlqAt":"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"},"payload":{"userId":"user-123","action":"test","data":{"email":"malformed","corrupted":true}}}'
25+
MSG='{"id":"'$ID'","correlationId":"'$CORR_ID'","metadata":{"source":"test","retryCount":5,"failureReason":"Max retries","originalTopic":"user-events","movedToDlqAt":"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"},"payload":{"userId":"user-123","action":"test","data":{"email":"malformed","corrupted":true}}}'
2626
;;
2727
*)
2828
MSG='{"id":"'$ID'","correlationId":"'$CORR_ID'","metadata":{"source":"test","retryCount":0},"payload":{"userId":"user-123","action":"test","data":{"email":"user@example.com","age":25}}}'

src/cli/commands.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,27 @@ pub enum Commands {
2020
topic: String,
2121
message_id: String,
2222
},
23-
/// Edit a message.
24-
EditMessage {
25-
message_id: String,
26-
},
2723
/// Archive a message.
2824
ArchiveMessage {
2925
topic: String,
3026
message_id: String,
3127
},
32-
/// Republish a message to the original topic.
28+
/// Republish a DLQ message to its original topic.
29+
///
30+
/// Finds a message in the DLQ by ID, optionally replaces its payload with a fixed
31+
/// version from a JSON file, shows a diff preview, then publishes to the original
32+
/// topic and removes the message from the DLQ.
3333
RepublishMessage {
34+
/// DLQ topic to read from
35+
topic: String,
36+
/// Message ID or correlation ID to find
3437
message_id: String,
38+
/// Path to JSON file with fixed payload (optional)
39+
/// If not provided, republishes the original payload as-is
40+
#[arg(long, value_name = "FILE", help = "Path to fixed payload file")]
41+
payload_file: Option<std::path::PathBuf>,
42+
/// Preview changes without publishing (dry run) (default: false)
43+
#[arg(long, default_value = "false")]
44+
dry_run: bool,
3545
},
3646
}

src/cli/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ pub mod list_messages;
44
pub mod view_message_by_id;
55
pub mod archive_message;
66
pub mod model;
7+
pub mod republish_message;
78

89
pub use commands::*;
910
pub use list_topics::*;
1011
pub use list_messages::*;
1112
pub use view_message_by_id::*;
1213
pub use archive_message::*;
13-
pub use model::*;
14+
pub use model::*;
15+
pub use republish_message::*;

src/cli/model.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use serde::{Deserialize, Serialize};
12
use tabled::Tabled;
23

34
// DLQ message field names
@@ -13,7 +14,7 @@ pub const METADATA_RETRY_COUNT: &str = "retryCount";
1314
pub const METADATA_ORIGINAL_TOPIC: &str = "originalTopic";
1415
pub const METADATA_MOVED_TO_DLQ_AT: &str = "movedToDlqAt";
1516

16-
#[derive(Tabled)]
17+
#[derive(Tabled, Serialize, Deserialize)]
1718
pub struct DlqMessage {
1819
pub payload: String,
1920
pub metadata: String,

0 commit comments

Comments
 (0)