Skip to content

Commit afa2b5f

Browse files
author
胡贵
committed
增加mysql 的 队列实现
1 parent ece7ccc commit afa2b5f

File tree

65 files changed

+1216
-292
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1216
-292
lines changed

README.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
4646
* 框架优化
4747

4848
## 调用示例
49-
* 安装 zookeeper(或redis) 和 mongo (后提供其他任务队列实现方式)
49+
* 安装 zookeeper(或redis) 和 mongo(或mysql) (后提供其他任务队列实现方式)
5050

5151
运行 job-example模块中的例子(包含API启动例子和Spring例子)
5252
分别执行 JobTrackerTest TaskTrackerTest JobClientTest
@@ -64,11 +64,20 @@ LTS 轻量级分布式任务调度框架(Light Task Schedule)
6464
jobTracker.addMasterChangeListener(new MasterChangeListenerImpl());
6565
// 设置业务日志记录
6666
// jobTracker.addConfig("job.logger", "mongo");
67-
// 任务队列用mongo
67+
68+
// 1. 任务队列用mongo
6869
jobTracker.addConfig("job.queue", "mongo");
6970
// mongo 配置
7071
jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017"); // 多个地址用逗号分割
7172
jobTracker.addConfig("mongo.database", "job");
73+
74+
// 2. 任务队里用mysql
75+
// jobTracker.addConfig("job.queue", "mysql");
76+
// mysql 配置
77+
// jobTracker.addConfig("jdbc.url", "jdbc:mysql://test.superboss.cc:3306/lts");
78+
// jobTracker.addConfig("jdbc.username", "root");
79+
// jobTracker.addConfig("jdbc.password", "root");
80+
7281
jobTracker.setOldDataHandler(new OldDataDeletePolicy());
7382
// 设置 zk 客户端用哪个, 可选 zkclient, curator 默认是 zkclient
7483
jobTracker.addConfig("zk.client", "zkclient");

data/mongo/mongo.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,20 @@ db.createCollection("JobPo");
66
db.JobPo.ensureIndex({"jobId":1},{unique:true});
77
db.JobPo.ensureIndex({"taskTrackerNodeGroup":1, "taskId":1},{unique:true});
88
db.JobPo.ensureIndex({"taskTrackerIdentity":1});
9-
db.JobPo.ensureIndex({"triggerTime":1, "priority":1, "gmtCreate": 1});
9+
db.JobPo.ensureIndex({"triggerTime":1, "priority":1, "gmtCreated": 1});
1010
db.JobPo.ensureIndex({"isRunning":1});
1111
db.JobPo.ensureIndex({"taskTrackerNodeGroup":1, "isRunning":1, "triggerTime":1});
1212

1313
db.createCollection("JobLogPo");
1414
db.JobLogPo.ensureIndex({"jobId":1});
1515
db.JobLogPo.ensureIndex({"submitNodeGroup":1, "taskId":1});
1616
db.JobLogPo.ensureIndex({"taskTrackerIdentity":1});
17-
db.JobLogPo.ensureIndex({"gmtCreate":1});
17+
db.JobLogPo.ensureIndex({"gmtCreated":1});
1818
db.JobLogPo.ensureIndex({"priority":1});
1919
db.JobLogPo.ensureIndex({"logType":1});
2020
db.JobLogPo.ensureIndex({"timestamp":1});
2121
db.JobLogPo.ensureIndex({"taskId":1});
2222

2323
db.createCollection("JobFeedbackPo");
2424
db.JobFeedbackPo.ensureIndex({"id":1});
25-
db.JobFeedbackPo.ensureIndex({"gmtCreated":1});
25+
db.JobFeedbackPo.ensureIndex({"gmtCreatedd":1});

job-core/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,5 +73,14 @@
7373
<groupId>org.fusesource.leveldbjni</groupId>
7474
<artifactId>leveldbjni-all</artifactId>
7575
</dependency>
76+
<dependency>
77+
<groupId>commons-dbutils</groupId>
78+
<artifactId>commons-dbutils</artifactId>
79+
<scope>provided</scope>
80+
</dependency>
81+
<dependency>
82+
<groupId>com.alibaba</groupId>
83+
<artifactId>druid</artifactId>
84+
</dependency>
7685
</dependencies>
7786
</project>

job-core/src/main/java/com/lts/job/core/Application.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.lts.job.core.cluster.MasterElector;
55
import com.lts.job.core.cluster.SubscribedNodeManager;
66
import com.lts.job.core.protocol.command.CommandBodyWrapper;
7+
import com.lts.job.ec.EventCenter;
78

89
/**
910
* @author Robert HG (254963746@qq.com) on 8/17/14.
@@ -19,6 +20,16 @@ public abstract class Application {
1920
private MasterElector masterElector;
2021
// 节点通信CommandBody包装器
2122
private CommandBodyWrapper commandBodyWrapper;
23+
// 事件中心
24+
private EventCenter eventCenter;
25+
26+
public EventCenter getEventCenter() {
27+
return eventCenter;
28+
}
29+
30+
public void setEventCenter(EventCenter eventCenter) {
31+
this.eventCenter = eventCenter;
32+
}
2233

2334
public CommandBodyWrapper getCommandBodyWrapper() {
2435
return commandBodyWrapper;

job-core/src/main/java/com/lts/job/core/cluster/AbstractJobNode.java

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ final public void start() {
5757

5858
innerStart();
5959

60+
initEvent();
61+
6062
initRegistry();
6163

6264
registry.register(node);
@@ -68,6 +70,22 @@ final public void start() {
6870
}
6971
}
7072

73+
protected void initEvent(){
74+
// 监听节点 启用/禁用消息
75+
application.getEventCenter().subscribe(
76+
new String[]{EcTopic.NODE_DISABLE, EcTopic.NODE_ENABLE},
77+
new EventSubscriber(node.getIdentity(), new Observer() {
78+
@Override
79+
public void onObserved(EventInfo eventInfo) {
80+
if (EcTopic.NODE_DISABLE.equals(eventInfo.getTopic())) {
81+
nodeDisable();
82+
} else {
83+
nodeEnable();
84+
}
85+
}
86+
}));
87+
};
88+
7189
final public void stop() {
7290
try {
7391
registry.unregister(node);
@@ -94,15 +112,6 @@ private void initRegistry() {
94112
if (registry instanceof AbstractRegistry) {
95113
((AbstractRegistry) registry).setNode(node);
96114
}
97-
// 订阅的node管理
98-
SubscribedNodeManager subscribedNodeManager = new SubscribedNodeManager(application);
99-
application.setSubscribedNodeManager(subscribedNodeManager);
100-
nodeChangeListeners.add(subscribedNodeManager);
101-
// 用于master选举的监听器
102-
nodeChangeListeners.add(new MasterElectionListener(application));
103-
// 监听自己节点变化(如,当前节点被禁用了)
104-
nodeChangeListeners.add(new SelfChangeListener(config));
105-
106115
registry.subscribe(node, new NotifyListener() {
107116
private final Logger NOTIFY_LOGGER = LoggerFactory.getLogger(NotifyListener.class);
108117

@@ -145,19 +154,16 @@ protected void initConfig() {
145154

146155
LOGGER.info("当前节点配置:{}", config);
147156

148-
// 监听节点 启用/禁用消息
149-
eventCenterFactory.getEventCenter(config).subscribe(
150-
new String[]{EcTopic.NODE_DISABLE, EcTopic.NODE_ENABLE},
151-
new EventSubscriber(node.getIdentity(), new Observer() {
152-
@Override
153-
public void onObserved(EventInfo eventInfo) {
154-
if (EcTopic.NODE_DISABLE.equals(eventInfo.getTopic())) {
155-
nodeDisable();
156-
} else {
157-
nodeEnable();
158-
}
159-
}
160-
}));
157+
application.setEventCenter(eventCenterFactory.getEventCenter(config));
158+
159+
// 订阅的node管理
160+
SubscribedNodeManager subscribedNodeManager = new SubscribedNodeManager(application);
161+
application.setSubscribedNodeManager(subscribedNodeManager);
162+
nodeChangeListeners.add(subscribedNodeManager);
163+
// 用于master选举的监听器
164+
nodeChangeListeners.add(new MasterElectionListener(application));
165+
// 监听自己节点变化(如,当前节点被禁用了)
166+
nodeChangeListeners.add(new SelfChangeListener(application));
161167
}
162168

163169
protected abstract void innerStart();

job-core/src/main/java/com/lts/job/core/file/FileUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,6 @@ public static File createDirIfNotExist(String path) {
3131
}
3232
return file;
3333
}
34+
35+
3436
}

job-core/src/main/java/com/lts/job/core/listener/SelfChangeListener.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
package com.lts.job.core.listener;
22

3+
import com.lts.job.core.Application;
34
import com.lts.job.core.cluster.Config;
45
import com.lts.job.core.cluster.Node;
56
import com.lts.job.core.cluster.NodeType;
67
import com.lts.job.core.constant.EcTopic;
7-
import com.lts.job.core.extension.ExtensionLoader;
88
import com.lts.job.core.util.CollectionUtils;
99
import com.lts.job.ec.EventCenter;
10-
import com.lts.job.ec.EventCenterFactory;
1110
import com.lts.job.ec.EventInfo;
1211

1312
import java.util.List;
@@ -20,12 +19,11 @@
2019
public class SelfChangeListener implements NodeChangeListener {
2120

2221
private Config config;
23-
private EventCenter eventCenter;
24-
private EventCenterFactory eventCenterFactory = ExtensionLoader.getExtensionLoader(EventCenterFactory.class).getAdaptiveExtension();
22+
private Application application;
2523

26-
public SelfChangeListener(Config config) {
27-
this.config = config;
28-
this.eventCenter = eventCenterFactory.getEventCenter(config);
24+
public SelfChangeListener(Application application) {
25+
this.config = application.getConfig();
26+
this.application = application;
2927
}
3028

3129

@@ -36,14 +34,14 @@ private void change(Node node) {
3634
if (node.getNodeType().equals(NodeType.TASK_TRACKER)
3735
&& (node.getThreads() != config.getWorkThreads())) {
3836
config.setWorkThreads(node.getThreads());
39-
eventCenter.publishAsync(new EventInfo(EcTopic.WORK_THREAD_CHANGE));
37+
application.getEventCenter().publishAsync(new EventInfo(EcTopic.WORK_THREAD_CHANGE));
4038
}
4139

4240
// 2. 看 available 有没有改变
4341
if (node.isAvailable() != config.isAvailable()) {
4442
String topic = node.isAvailable() ? EcTopic.NODE_ENABLE : EcTopic.NODE_DISABLE;
4543
config.setAvailable(node.isAvailable());
46-
eventCenter.publishAsync(new EventInfo(topic));
44+
application.getEventCenter().publishAsync(new EventInfo(topic));
4745
}
4846
}
4947
}

job-core/src/main/java/com/lts/job/core/util/JSONUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.alibaba.fastjson.JSON;
44
import com.alibaba.fastjson.JSONArray;
55
import com.alibaba.fastjson.JSONObject;
6+
import com.alibaba.fastjson.TypeReference;
67

78
import java.lang.reflect.Type;
89

@@ -15,6 +16,13 @@ public static <T> T parse(String json, Type type) {
1516
return (T) JSONObject.parseObject(json, type);
1617
}
1718

19+
public static <T> T parse(String json, TypeReference<T> type) {
20+
if (StringUtils.isEmpty(json)) {
21+
return null;
22+
}
23+
return JSONObject.parseObject(json, type);
24+
}
25+
1826
public static String toJSONString(Object obj) {
1927
return JSONObject.toJSONString(obj);
2028
}

job-core/src/main/java/com/lts/job/ec/injvm/InjvmEventCenter.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.lts.job.ec.injvm;
22

33
import com.lts.job.core.constant.Constants;
4-
import com.lts.job.core.util.ConcurrentHashSet;
5-
import com.lts.job.core.util.JSONUtils;
64
import com.lts.job.core.logger.Logger;
75
import com.lts.job.core.logger.LoggerFactory;
6+
import com.lts.job.core.util.ConcurrentHashSet;
7+
import com.lts.job.core.util.JSONUtils;
88
import com.lts.job.ec.EventCenter;
99
import com.lts.job.ec.EventInfo;
1010
import com.lts.job.ec.EventSubscriber;
@@ -17,16 +17,17 @@
1717

1818
/**
1919
* 在一个jvm中的pub sub 简易实现
20+
*
2021
* @author Robert HG (254963746@qq.com) on 5/12/15.
2122
*/
2223
public class InjvmEventCenter implements EventCenter {
2324

2425
private static final Logger LOGGER = LoggerFactory.getLogger(EventCenter.class.getName());
2526

26-
private Map<String, Set<EventSubscriber>> ecMap =
27+
private final Map<String, Set<EventSubscriber>> ecMap =
2728
new ConcurrentHashMap<String, Set<EventSubscriber>>();
2829

29-
private ExecutorService executor = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR * 2);
30+
private final ExecutorService executor = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR * 2);
3031

3132
public void subscribe(String topic, EventSubscriber subscriber) {
3233
Set<EventSubscriber> subscribers = ecMap.get(topic);
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package com.lts.job.store.jdbc;
2+
3+
import com.alibaba.druid.pool.DruidDataSource;
4+
import com.lts.job.core.cluster.Config;
5+
import com.lts.job.core.logger.Logger;
6+
import com.lts.job.core.logger.LoggerFactory;
7+
import com.lts.job.core.util.StringUtils;
8+
9+
import javax.sql.DataSource;
10+
import java.lang.reflect.Method;
11+
import java.util.Map;
12+
import java.util.concurrent.ConcurrentHashMap;
13+
14+
/**
15+
* @author Robert HG (254963746@qq.com) on 10/24/14.
16+
*/
17+
public class DataSourceProvider {
18+
19+
private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceProvider.class);
20+
// 同一配置, 始终保持同一个连接
21+
private static final ConcurrentHashMap<String, DataSource> DATA_STORE_MAP = new ConcurrentHashMap<String, DataSource>();
22+
23+
private static final Object lock = new Object();
24+
25+
public static DataSource getDataSource(Config config) {
26+
27+
String url = config.getParameter(URL_KEY);
28+
String username = config.getParameter(USERNAME_KEY);
29+
String password = config.getParameter(PASSWORD_KEY);
30+
31+
String cachedKey = StringUtils.concat(url, username, password);
32+
33+
DataSource dataSource = DATA_STORE_MAP.get(cachedKey);
34+
if (dataSource == null) {
35+
try {
36+
synchronized (lock) {
37+
dataSource = DATA_STORE_MAP.get(cachedKey);
38+
if (dataSource != null) {
39+
return dataSource;
40+
}
41+
dataSource = createDruidDataSource(config);
42+
43+
DATA_STORE_MAP.put(cachedKey, dataSource);
44+
}
45+
} catch (Exception e) {
46+
throw new IllegalStateException(
47+
StringUtils.format("connect datasource failed! url: {}", url), e);
48+
}
49+
}
50+
return dataSource;
51+
}
52+
53+
private static DataSource createDruidDataSource(Config config) {
54+
DruidDataSource dataSource = new DruidDataSource();
55+
Class<DruidDataSource> clazz = DruidDataSource.class;
56+
for (Map.Entry<String, Class> entry : FIELDS.entrySet()) {
57+
String field = entry.getKey();
58+
String value = config.getParameter("druid." + field);
59+
if (StringUtils.isNotEmpty(value)) {
60+
Method setMethod = null;
61+
try {
62+
setMethod = clazz.getMethod("set" + (field.substring(0, 1).toUpperCase() + field.substring(1))
63+
, entry.getValue());
64+
setMethod.invoke(dataSource, value);
65+
} catch (Exception e) {
66+
LOGGER.warn("set field[{}] failed! value is {}", field, value);
67+
}
68+
}
69+
}
70+
71+
String url = config.getParameter(URL_KEY);
72+
String username = config.getParameter(USERNAME_KEY);
73+
String password = config.getParameter(PASSWORD_KEY);
74+
75+
dataSource.setUrl(url);
76+
dataSource.setUsername(username);
77+
dataSource.setPassword(password);
78+
79+
return dataSource;
80+
}
81+
82+
private static final Map<String, Class> FIELDS = new ConcurrentHashMap<String, Class>();
83+
84+
static {
85+
// druid配置属性,see <a href="https://github.com/alibaba/druid/wiki/DruidDataSource%E9%85%8D%E7%BD%AE%E5%B1%9E%E6%80%A7%E5%88%97%E8%A1%A8">DruidDataSource配置属性列表</a>
86+
// FIELDS.put("url", String.class);
87+
// FIELDS.put("username", String.class);
88+
// FIELDS.put("password", String.class);
89+
// FIELDS.put("driverClassName", String.class);
90+
FIELDS.put("initialSize", Integer.class);
91+
FIELDS.put("maxActive", Integer.class);
92+
FIELDS.put("maxIdle", Integer.class);
93+
FIELDS.put("minIdle", Integer.class);
94+
FIELDS.put("maxWait", Integer.class);
95+
FIELDS.put("poolPreparedStatements", Boolean.class);
96+
FIELDS.put("maxOpenPreparedStatements", Integer.class);
97+
FIELDS.put("validationQuery", String.class);
98+
FIELDS.put("testOnBorrow", Boolean.class);
99+
FIELDS.put("testOnReturn", Boolean.class);
100+
FIELDS.put("testWhileIdle", Boolean.class);
101+
FIELDS.put("timeBetweenEvictionRunsMillis", Long.class);
102+
FIELDS.put("numTestsPerEvictionRun", Integer.class);
103+
FIELDS.put("minEvictableIdleTimeMillis", Long.class);
104+
FIELDS.put("exceptionSorter", String.class);
105+
FIELDS.put("filters", String.class);
106+
}
107+
108+
private static final String URL_KEY = "jdbc.url";
109+
private static final String USERNAME_KEY = "jdbc.username";
110+
private static final String PASSWORD_KEY = "jdbc.password";
111+
112+
}

0 commit comments

Comments
 (0)