diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java index 8f40adf1a7a55..460def012859f 100644 --- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java @@ -709,7 +709,14 @@ private Optional> createPushRequest(ClientTelemetrySubscription local } CompressionType compressionType = ClientTelemetryUtils.preferredCompressionType(localSubscription.acceptedCompressionTypes()); - ByteBuffer buffer = ClientTelemetryUtils.compress(payload, compressionType); + ByteBuffer compressedPayload; + try { + compressedPayload = ClientTelemetryUtils.compress(payload, compressionType); + } catch (Throwable e) { + log.debug("Failed to compress telemetry payload for compression: {}, sending uncompressed data", compressionType); + compressedPayload = ByteBuffer.wrap(payload); + compressionType = CompressionType.NONE; + } AbstractRequest.Builder requestBuilder = new PushTelemetryRequest.Builder( new PushTelemetryRequestData() @@ -717,7 +724,7 @@ private Optional> createPushRequest(ClientTelemetrySubscription local .setSubscriptionId(localSubscription.subscriptionId()) .setTerminating(terminating) .setCompressionType(compressionType.id) - .setMetrics(Utils.readBytes(buffer)), true); + .setMetrics(Utils.readBytes(compressedPayload)), true); return Optional.of(requestBuilder); } diff --git a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java index 33e2604479dfa..3840588531fff 100644 --- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java +++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java @@ -177,7 +177,7 @@ public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest re if (metrics != null && metrics.length > 0) { try { receiverPlugin.exportMetrics(requestContext, request); - } catch (Exception exception) { + dd } catch (Throwable exception) { clientInstance.lastKnownError(Errors.INVALID_RECORD); return request.errorResponse(0, Errors.INVALID_RECORD); }