Skip to content

Commit c28f0c1

Browse files
add : eventmesh-retry/eventmesh-retry-kafka
1 parent 6a490e2 commit c28f0c1

File tree

5 files changed

+137
-0
lines changed

5 files changed

+137
-0
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
dependencies {
20+
implementation project(":eventmesh-storage-plugin:eventmesh-storage-api")
21+
implementation project(":eventmesh-storage-plugin:eventmesh-storage-kafka")
22+
implementation project(":eventmesh-retry:eventmesh-retry-api")
23+
implementation project(":eventmesh-common")
24+
25+
implementation 'io.cloudevents:cloudevents-core'
26+
implementation 'io.cloudevents:cloudevents-json-jackson'
27+
28+
compileOnly 'org.projectlombok:lombok'
29+
annotationProcessor 'org.projectlombok:lombok'
30+
31+
testImplementation 'org.junit.jupiter:junit-jupiter'
32+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
kafka_version=3.7.1
18+
pluginType=retry
19+
pluginName=kafka
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.eventmesh.retry.kafka;
19+
20+
import org.apache.eventmesh.api.SendCallback;
21+
import org.apache.eventmesh.api.SendResult;
22+
import org.apache.eventmesh.api.exception.OnExceptionContext;
23+
import org.apache.eventmesh.api.producer.Producer;
24+
import org.apache.eventmesh.common.protocol.http.common.ProtocolKey;
25+
import org.apache.eventmesh.retry.api.conf.RetryConfiguration;
26+
import org.apache.eventmesh.retry.api.strategy.RetryStrategy;
27+
import java.util.Objects;
28+
import io.cloudevents.CloudEvent;
29+
import io.cloudevents.core.builder.CloudEventBuilder;
30+
import lombok.SneakyThrows;
31+
import lombok.extern.slf4j.Slf4j;
32+
33+
@Slf4j
34+
public class KafkaRetryStrategyImpl implements RetryStrategy {
35+
36+
@Override
37+
public void retry(RetryConfiguration configuration) {
38+
sendMessageBack(configuration);
39+
}
40+
41+
@SneakyThrows
42+
private void sendMessageBack(final RetryConfiguration configuration) {
43+
CloudEvent event = configuration.getEvent();
44+
String topic = configuration.getTopic();
45+
String consumerGroupName = configuration.getConsumerGroupName();
46+
47+
String bizSeqNo = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey())).toString();
48+
String uniqueId = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey())).toString();
49+
CloudEvent retryEvent = CloudEventBuilder.from(event)
50+
// .withExtension(ProtocolKey.TOPIC, topic)
51+
.withSubject(topic)
52+
.build();
53+
Producer producer = configuration.getProducer();
54+
producer.publish(retryEvent, new SendCallback() {
55+
56+
@Override
57+
public void onSuccess(SendResult sendResult) {
58+
log.info("consumer:{} consume success,, bizSeqno:{}, uniqueId:{}",
59+
consumerGroupName, bizSeqNo, uniqueId);
60+
}
61+
62+
@Override
63+
public void onException(OnExceptionContext context) {
64+
log.warn("consumer:{} consume fail, sendMessageBack, bizSeqno:{}, uniqueId:{}",
65+
consumerGroupName, bizSeqNo, uniqueId, context.getException());
66+
}
67+
});
68+
}
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
kafka=org.apache.eventmesh.retry.kafka.KafkaRetryStrategyImpl

settings.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ include 'eventmesh-trace-plugin:eventmesh-trace-jaeger'
120120
include 'eventmesh-retry'
121121
include 'eventmesh-retry:eventmesh-retry-api'
122122
include 'eventmesh-retry:eventmesh-retry-rocketmq'
123+
include 'eventmesh-retry:eventmesh-retry-kafka'
123124
include 'eventmesh-runtime-v2'
124125
include 'eventmesh-admin-server'
125126
include 'eventmesh-registry'

0 commit comments

Comments
 (0)