Skip to content

Commit 469ae5e

Browse files
author
Aaron
committed
Connections
1 parent 89522d0 commit 469ae5e

File tree

7 files changed

+613
-2
lines changed

7 files changed

+613
-2
lines changed
Lines changed: 392 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,392 @@
1+
package org.iot.dsa.driver;
2+
3+
import org.iot.dsa.node.DSBool;
4+
import org.iot.dsa.node.DSIStatus;
5+
import org.iot.dsa.node.DSInfo;
6+
import org.iot.dsa.node.DSNode;
7+
import org.iot.dsa.node.DSStatus;
8+
import org.iot.dsa.node.DSString;
9+
import org.iot.dsa.time.DSDateTime;
10+
import org.iot.dsa.util.DSException;
11+
12+
/**
13+
* Abstract representation of a connection. Maintains status, provides callbacks to manage
14+
* a connection lifecycle and notifies children of state changes.
15+
* <p>
16+
*
17+
* <b>Running</b>
18+
* The run method manages calling connect, disconnect and ping(). The subclass is responsible
19+
* having a thread call this method, or to manage that lifecycle another way.
20+
* <p>
21+
*
22+
* <b>Pinging</b>
23+
* ping() is only called by the run method. It is only called if the time since the last OK or
24+
* last ping (whichever is later) exceeds the ping interval. Implementations should call connOk
25+
* whenever there are successful communications to avoid unnecessarily calling ping.
26+
* <p>
27+
*
28+
* <b>DSIConnected</b>
29+
* By default, DSConnection will notify instances of DSIConnected when the connection transitions
30+
* to connected and disconnected. Subclasses could choose to notify at other times
31+
*
32+
* @author Aaron Hansen
33+
*/
34+
public abstract class DSConnection extends DSNode implements DSIStatus {
35+
36+
///////////////////////////////////////////////////////////////////////////
37+
// Class Fields
38+
///////////////////////////////////////////////////////////////////////////
39+
40+
static final String ENABLED = "Enabled";
41+
static final String FAIL_CAUSE = "Fail Cause";
42+
static final String LAST_OK = "Last OK";
43+
static final String LAST_FAIL = "Last Fail";
44+
static final String STATE = "Connection State";
45+
static final String STATUS = "Status";
46+
47+
///////////////////////////////////////////////////////////////////////////
48+
// Instance Fields
49+
///////////////////////////////////////////////////////////////////////////
50+
51+
private DSInfo enabled = getInfo(ENABLED);
52+
private DSInfo failCause = getInfo(FAIL_CAUSE);
53+
private DSInfo lastFail = getInfo(LAST_FAIL);
54+
private DSInfo lastOk = getInfo(LAST_OK);
55+
private long lastPing;
56+
private DSInfo state = getInfo(STATE);
57+
private DSInfo status = getInfo(STATUS);
58+
59+
///////////////////////////////////////////////////////////////////////////
60+
// Constructors
61+
///////////////////////////////////////////////////////////////////////////
62+
63+
///////////////////////////////////////////////////////////////////////////
64+
// Public Methods
65+
///////////////////////////////////////////////////////////////////////////
66+
67+
public DSConnectionState getConnectionState() {
68+
return (DSConnectionState) state.getObject();
69+
}
70+
71+
/**
72+
* The last time connOk was called.
73+
*/
74+
public DSDateTime getLastOk() {
75+
return (DSDateTime) lastOk.getObject();
76+
}
77+
78+
public DSStatus getStatus() {
79+
return (DSStatus) status.getObject();
80+
}
81+
82+
public boolean isConnected() {
83+
return getConnectionState().isConnected();
84+
}
85+
86+
public boolean isEnabled() {
87+
return enabled.getElement().toBoolean();
88+
}
89+
90+
/**
91+
* Call this to automatically manage the connection lifecycle, it will not return
92+
* until the node is stopped.
93+
*/
94+
public synchronized void run() {
95+
long retryMs = 1000;
96+
long lastPing = 0;
97+
while (isRunning()) {
98+
if (isConnected()) {
99+
retryMs = 1000;
100+
if (!isEnabled()) {
101+
disconnect();
102+
} else {
103+
try {
104+
long ivl = getPingInterval();
105+
long last = Math.max(getLastOk().timeInMillis(), lastPing);
106+
long now = System.currentTimeMillis();
107+
long duration = now - last;
108+
if (duration >= ivl) {
109+
lastPing = now;
110+
try {
111+
ping();
112+
} catch (Throwable t) {
113+
error(error() ? getPath() : null, t);
114+
connDown(DSException.makeMessage(t));
115+
}
116+
} else {
117+
wait(ivl - duration);
118+
}
119+
} catch (Exception x) {
120+
debug(debug() ? getPath() : null, x);
121+
}
122+
}
123+
} else {
124+
if (isEnabled() && !getConnectionState().isEngaged()) {
125+
connect();
126+
} else {
127+
try {
128+
wait(retryMs);
129+
} catch (Exception x) {
130+
debug(debug() ? getPath() : null, x);
131+
}
132+
retryMs = Math.max(60000, retryMs + 5000);
133+
}
134+
}
135+
}
136+
}
137+
138+
///////////////////////////////////////////////////////////////////////////
139+
// Protected Methods
140+
///////////////////////////////////////////////////////////////////////////
141+
142+
/**
143+
* You should call configOk, configFault, or throw an exception.
144+
*/
145+
protected abstract void checkConfig();
146+
147+
/**
148+
* Puts the connection into the fault state and optionally sets the error message.
149+
*
150+
* @param msg Optional
151+
*/
152+
protected void configFault(String msg) {
153+
put(lastFail, DSDateTime.currentTime());
154+
if (msg != null) {
155+
if (!failCause.getElement().toString().equals(msg)) {
156+
put(failCause, DSString.valueOf(msg));
157+
}
158+
}
159+
if (!getStatus().isFault()) {
160+
put(status, getStatus().add(DSStatus.FAULT));
161+
}
162+
}
163+
164+
/**
165+
* Removes fault state.
166+
*/
167+
protected void configOk() {
168+
if (getStatus().isFault()) {
169+
put(status, getStatus().remove(DSStatus.FAULT));
170+
}
171+
}
172+
173+
/**
174+
* Puts the connection into the down state, optionally sets the reason and notifies
175+
* the subtree if the connection actually transitions to down.
176+
*
177+
* @param reason Optional
178+
*/
179+
protected void connDown(String reason) {
180+
put(lastFail, DSDateTime.currentTime());
181+
if (reason != null) {
182+
if (!failCause.getElement().toString().equals(reason)) {
183+
put(failCause, DSString.valueOf(reason));
184+
}
185+
}
186+
boolean notify = false;
187+
if (!getStatus().isDown()) {
188+
notify = true;
189+
put(status, getStatus().add(DSStatus.DOWN));
190+
}
191+
if (!getConnectionState().isDisconnected()) {
192+
notify = true;
193+
put(state, DSConnectionState.DISCONNECTED);
194+
}
195+
if (notify) {
196+
try {
197+
onDisconnected();
198+
} catch (Exception x) {
199+
error(error() ? getPath() : null, x);
200+
}
201+
}
202+
}
203+
204+
/**
205+
* Update the last ok timestamp, will remove the down status if present and notifies the
206+
* subtree if the connection actually transitions to ok.
207+
*/
208+
protected void connOk() {
209+
put(lastOk, DSDateTime.currentTime());
210+
boolean notify = false;
211+
if (getStatus().isDown()) {
212+
notify = true;
213+
put(status, getStatus().remove(DSStatus.DOWN));
214+
}
215+
if (!getConnectionState().isConnected()) {
216+
notify = true;
217+
put(state, DSConnectionState.CONNECTED);
218+
}
219+
if (notify) {
220+
try {
221+
onConnected();
222+
} catch (Exception x) {
223+
error(error() ? getPath() : null, x);
224+
}
225+
}
226+
}
227+
228+
/**
229+
* Will attempt to connect only if the current state is disconnected.
230+
*/
231+
protected void connect() {
232+
if (!getConnectionState().isDisconnected()) {
233+
return;
234+
}
235+
put(state, DSConnectionState.CONNECTING);
236+
try {
237+
checkConfig();
238+
try {
239+
if (isOperational()) {
240+
onConnect();
241+
}
242+
} catch (Throwable e) {
243+
error(error() ? getPath() : null, e);
244+
connDown(DSException.makeMessage(e));
245+
}
246+
} catch (Throwable x) {
247+
error(error() ? getPath() : null, x);
248+
configFault(DSException.makeMessage(x));
249+
}
250+
}
251+
252+
@Override
253+
protected void declareDefaults() {
254+
declareDefault(ENABLED, DSBool.TRUE);
255+
declareDefault(STATUS, DSStatus.down).setReadOnly(true).setTransient(true);
256+
declareDefault(STATE, DSConnectionState.DISCONNECTED).setReadOnly(true).setTransient(true);
257+
declareDefault(FAIL_CAUSE, DSString.EMPTY).setReadOnly(true).setTransient(true);
258+
declareDefault(LAST_OK, DSDateTime.NULL).setReadOnly(true);
259+
declareDefault(LAST_FAIL, DSDateTime.NULL).setReadOnly(true);
260+
}
261+
262+
/**
263+
* Will attempt to disconnected only if the current state is connected.
264+
*/
265+
protected void disconnect() {
266+
if (!getConnectionState().isConnected()) {
267+
return;
268+
}
269+
put(state, DSConnectionState.CONNECTING);
270+
try {
271+
onDisconnect();
272+
} catch (Throwable x) {
273+
warn(warn() ? getPath() : null, x);
274+
}
275+
}
276+
277+
/**
278+
* Ping interval in milliseconds (default is 60000). If the time since the last call
279+
* to connOk exceeds this, the ping method will be called. Implementations should
280+
* call connOk whenever there have been successful communications to minimize pinging.
281+
*
282+
* @return 60000
283+
*/
284+
protected long getPingInterval() {
285+
return 60000;
286+
}
287+
288+
protected boolean isConfigOk() {
289+
return !getStatus().isFault();
290+
}
291+
292+
/**
293+
* True if running, enabled and config is ok.
294+
*/
295+
protected boolean isOperational() {
296+
return isRunning() && isConfigOk() && isEnabled();
297+
}
298+
299+
/**
300+
* Calls DSIConnected.onChange on all implementations in the subtree. Stops at
301+
* instances of DSConnection, but if they implement DSIConnected, they will receive
302+
* the callback. By default, this is only called by onConnected and onDisconnected.
303+
*/
304+
protected void notifyDescendents() {
305+
notifyDescendents(this);
306+
}
307+
308+
protected void onChildChanged(DSInfo info) {
309+
if (info == enabled) {
310+
synchronized (this) {
311+
notify();
312+
}
313+
}
314+
}
315+
316+
/**
317+
* You must call connOk or connDown, it can be async after this method has returned.
318+
* You can throw an exception from this method instead of calling connDown.
319+
* This will only be called if configuration is ok.
320+
*/
321+
protected abstract void onConnect();
322+
323+
/**
324+
* Override point, called by connOkDown(). By default, this notifies all DSIConnected
325+
* objects in the subtree. Overrides should probably call super.onConnected unless they
326+
* have a very good reason not to.
327+
*/
328+
protected void onConnected() {
329+
notifyDescendents(this);
330+
}
331+
332+
/**
333+
* You must call connDown, it can be async after this method has returned. You can throw
334+
* an exception from this method instead of calling connDown.
335+
*/
336+
protected abstract void onDisconnect();
337+
338+
/**
339+
* Override point, called by connDown(). By default, this notifies all DSIConnected objects
340+
* in the subtree of the state change. Overrides should probably call super.onDisconnected
341+
* untless they have a very good reason not to.
342+
*/
343+
protected void onDisconnected() {
344+
notifyDescendents(this);
345+
}
346+
347+
/**
348+
* Calls onDisconnected().
349+
*/
350+
@Override
351+
protected void onStable() {
352+
onDisconnected();
353+
}
354+
355+
/**
356+
* Override point, called by the run method. Implementations should call verify the connection
357+
* is still valid and call connOk or connDown, but those can be async and after this method
358+
* returns. Throwing an exception will be treated as a connDown. By default, this only calls
359+
* connOk().
360+
*/
361+
protected void ping() {
362+
connOk();
363+
}
364+
365+
///////////////////////////////////////////////////////////////////////////
366+
// Package / Private Methods
367+
///////////////////////////////////////////////////////////////////////////
368+
369+
/**
370+
* Calls DSIConnected.onChange on all implementations in the subtree. Stops at
371+
* instances of DSConnection, but if they implement DSIConnected, they will receive
372+
* the callback.
373+
*/
374+
private void notifyDescendents(DSNode node) {
375+
DSInfo info = getFirstInfo();
376+
while (info != null) {
377+
if (info.is(DSIConnected.class)) {
378+
try {
379+
((DSIConnected) info.getObject()).onChange(this);
380+
} catch (Throwable t) {
381+
error(error() ? info.getPath(null) : null, t);
382+
}
383+
}
384+
if (info.isNode() && !info.is(DSConnection.class)) {
385+
notifyDescendents(info.getNode());
386+
}
387+
info = info.next();
388+
}
389+
}
390+
391+
392+
}

0 commit comments

Comments
 (0)