Skip to content

Conversation

@tedkimdev
Copy link
Owner

DLQ-3 Add Republish Message Feature with Payload Editing and Diff Preview

Summary

Implements a new republish-message command that allows operators to fix failed messages in the DLQ and republish them to their original topics. This feature includes visual diff preview, dry-run mode, and Kafka message headers for observability.

Motivation

When messages fail and land in the DLQ, operators need a safe way to:

  1. Fix payload issues (e.g., invalid data, malformed JSON)
  2. Preview changes before republishing
  3. Track which messages were manually fixed
  4. Republish to the original topic and remove from DLQ

Previously, this required manual Kafka console operations or custom scripts.

Features

Core Functionality

  • Find messages by ID or correlation ID in DLQ topics
  • Optional payload replacement from JSON file
  • Visual diff display showing before/after changes with color coding
  • Dry-run mode for safe preview without side effects
  • Confirmation prompt before publishing (safety first)
  • Automatic cleanup - commits DLQ offset after successful republish

Observability

  • Kafka message headers added for tracing:
    • id: Message ID
    • correlation_id: Correlation ID for distributed tracing
    • republished_at: Timestamp of manual intervention

Dependencies Added

colored = "2.1"   # For colored terminal output
similar = "2.5"   # For text diffing

Usage

Basic Usage: Republish with Fixed Payload

cargo run republish-message <dlq-topic> <message-id> \
    --payload-file fixed-payload.json

Dry Run Mode (Preview Only)

cargo run republish-message <dlq-topic> <message-id> \
    --payload-file fixed-payload.json --dry-run

Republish Without Changes (Retry As-Is)

cargo run republish-message <dlq-topic> <message-id>

Example Output

cargo run republish-message dlq-user-events 4dcba7cd-0c51-43a8-9cf8-789468c5cecd --dry-run --payload-file fixed_payload.json

=================================================================
Found message in DLQ: dlq-user-events
[DRY RUN MODE]
=================================================================

MESSAGE METADATA:
  ID:             4dcba7cd-0c51-43a8-9cf8-789468c5cecd
  Correlation ID: a4aea474-65a4-4d85-a05f-42db4aa62a29
  Failure Reason: Max retries
  Retry Count:    5
  Original Topic: user-events
  Moved to DLQ:   2025-10-24T22:36:49Z

MESSAGE HEADERS (will be republished as-is):
  id: 4dcba7cd-0c51-43a8-9cf8-789468c5cecd
  republished_at: 2025-10-25T08:55:31.117949+00:00
  correlation_id: a4aea474-65a4-4d85-a05f-42db4aa62a29

=================================================================
PAYLOAD COMPARISON
=================================================================

BEFORE (Current in DLQ):
{
  "action": "test",
  "data": {
    "corrupted": true,
    "email": "malformed"
  },
  "userId": "user-123"
}

AFTER (From file: fixed_payload.json):
{
  "action": "test",
  "data": {
    "corrupted": true,
    "email": "correct@email.com"
  },
  "userId": "user-123"
}

DIFF:
  {
    "action": "test",
    "data": {
      "corrupted": true,
-     "email": "malformed"
+     "email": "correct@email.com"
    },
    "userId": "user-123"
  }

=================================================================
PLANNED ACTION:
  [DRY RUN] → Publish fixed message to: user-events
  [DRY RUN] → Remove from DLQ: dlq-user-events
=================================================================

=================================================================
🔍 DRY RUN MODE - No changes will be made
=================================================================

What would happen:
  → Message would be published to: user-events
  → DLQ offset would be committed (message removed from dlq-user-events)

To actually perform this operation, run without --dry-run

Some(Ok(s)) => s,
Some(Err(e)) => {
warn!("Error while deserializing message payload: {:?}", e);
""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional or do you want to deal with this in the same way as for JSON deserialization errors below?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't intentional. I've fixed to use continue. Thank you for catching this.

/// Display diff when payload is being changed
fn display_diff_and_plan(
dlq_message: &DlqMessage,
old_payload: &String,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd normally not take an actual &String unless you really need the functionality of string. Prefer &str or even something like AsRef<str>.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, right. I will change to &str. Thank you.

Copy link

@GuillemIscla GuillemIscla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting to see Kafka in Rust 👍

@tedkimdev tedkimdev merged commit 3e40ff0 into main Oct 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants