Skip to content

Commit 8ea9191

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.0.x_34751' into 1.10_release_4.0.x
2 parents d13b07a + b606657 commit 8ea9191

File tree

2 files changed

+15
-13
lines changed

2 files changed

+15
-13
lines changed

core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderManager.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020

2121
import com.dtstack.flink.sql.util.PluginUtil;
2222
import com.dtstack.flink.sql.util.ReflectionUtils;
23+
import org.apache.commons.codec.digest.DigestUtils;
2324
import org.apache.commons.lang3.StringUtils;
2425
import org.slf4j.Logger;
2526
import org.slf4j.LoggerFactory;
2627

28+
import java.io.FileInputStream;
2729
import java.lang.reflect.InvocationTargetException;
2830
import java.lang.reflect.Method;
2931
import java.net.URL;
@@ -71,9 +73,19 @@ private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
7173
});
7274
}
7375

74-
private static DtClassLoader retrieveClassLoad(List<URL> jarUrls) {
76+
public static DtClassLoader retrieveClassLoad(List<URL> jarUrls) {
7577
jarUrls.sort(Comparator.comparing(URL::toString));
76-
String jarUrlkey = StringUtils.join(jarUrls, "_");
78+
79+
List<String> jarMd5s = new ArrayList<>(jarUrls.size());
80+
for (URL jarUrl : jarUrls) {
81+
try (FileInputStream inputStream = new FileInputStream(jarUrl.getPath())){
82+
String jarMd5 = DigestUtils.md5Hex(inputStream);
83+
jarMd5s.add(jarMd5);
84+
} catch (Exception e) {
85+
throw new RuntimeException("Exceptions appears when read file:" + e);
86+
}
87+
}
88+
String jarUrlkey = StringUtils.join(jarMd5s, "_");
7789
return pluginClassLoader.computeIfAbsent(jarUrlkey, k -> {
7890
try {
7991
URL[] urls = jarUrls.toArray(new URL[jarUrls.size()]);

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -257,20 +257,10 @@ private static void sqlTranslation(String localSqlPluginPath,
257257
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv, boolean getPlan)
258258
throws IllegalAccessException, InvocationTargetException {
259259
// udf和tableEnv须由同一个类加载器加载
260-
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
261260
ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
262-
URLClassLoader classLoader = null;
261+
URLClassLoader classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) currentClassLoader);
263262
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
264263
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
265-
// 构建plan的情况下,udf和tableEnv不需要是同一个类加载器
266-
if (getPlan) {
267-
classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) currentClassLoader);
268-
}
269-
270-
//classloader
271-
if (classLoader == null) {
272-
classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) levelClassLoader);
273-
}
274264
FunctionManager.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, classLoader);
275265
}
276266
}

0 commit comments

Comments
 (0)