Skip to content

Commit edd5b19

Browse files
committed
[feat-1434][kafka]增加单测
1 parent 29f84ce commit edd5b19

File tree

1 file changed

+90
-0
lines changed

1 file changed

+90
-0
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flink.sql.source.kafka.deserializer;
19+
20+
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import org.apache.kafka.common.serialization.Deserializer;
23+
import org.junit.Assert;
24+
import org.junit.Before;
25+
import org.junit.Test;
26+
import org.mockito.InjectMocks;
27+
import org.mockito.Mock;
28+
import org.mockito.MockitoAnnotations;
29+
30+
import java.util.HashMap;
31+
import java.util.Map;
32+
33+
import static org.mockito.Mockito.*;
34+
35+
public class DtKafkaDeserializerTest {
36+
@InjectMocks
37+
DtKafkaDeserializer<byte[]> dtKafkaDeserializer;
38+
39+
@Before
40+
public void setUp() {
41+
dtKafkaDeserializer = new DtKafkaDeserializer<>();
42+
}
43+
44+
@Test
45+
public void testConfigure() {
46+
Map<String, String> map = new HashMap<>();
47+
String deserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
48+
map.put(KafkaSourceTableInfo.DT_KEY_DESERIALIZER, deserializer);
49+
map.put(KafkaSourceTableInfo.DT_VALUE_DESERIALIZER, deserializer);
50+
dtKafkaDeserializer.configure(map, true);
51+
dtKafkaDeserializer.configure(map, false);
52+
String nonexistentDeserializer = "xxx.xxx.xxx.serialization.XxxDeserializer";
53+
map.put(KafkaSourceTableInfo.DT_KEY_DESERIALIZER, nonexistentDeserializer);
54+
try {
55+
dtKafkaDeserializer.configure(map, true);
56+
}catch (Exception e){
57+
Assert.assertEquals("Can't create instance: " + nonexistentDeserializer, e.getMessage());
58+
}
59+
}
60+
61+
@Test
62+
public void testDeserialize() {
63+
Map<String, String> map = new HashMap<>();
64+
String deserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
65+
map.put(KafkaSourceTableInfo.DT_KEY_DESERIALIZER, deserializer);
66+
dtKafkaDeserializer.configure(map, true);
67+
68+
byte[] result = dtKafkaDeserializer.deserialize("topic", null, new byte[]{(byte) 0});
69+
Assert.assertArrayEquals(new byte[]{(byte) 0}, result);
70+
}
71+
72+
@Test
73+
public void testDeserialize2() {
74+
Map<String, String> map = new HashMap<>();
75+
String deserializer = "org.apache.kafka.common.serialization.StringDeserializer";
76+
map.put(KafkaSourceTableInfo.DT_KEY_DESERIALIZER, deserializer);
77+
dtKafkaDeserializer.configure(map, true);
78+
byte[] result = dtKafkaDeserializer.deserialize("topic", new byte[]{(byte) 0});
79+
Assert.assertArrayEquals(new byte[]{(byte) 0}, result);
80+
}
81+
82+
@Test
83+
public void testClose() {
84+
Map<String, String> map = new HashMap<>();
85+
String deserializer = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
86+
map.put(KafkaSourceTableInfo.DT_KEY_DESERIALIZER, deserializer);
87+
dtKafkaDeserializer.configure(map, true);
88+
dtKafkaDeserializer.close();
89+
}
90+
}

0 commit comments

Comments
 (0)