17
17
import java .util .concurrent .ExecutorService ;
18
18
import java .util .concurrent .Executors ;
19
19
import java .util .concurrent .TimeUnit ;
20
+ import java .util .concurrent .locks .Condition ;
21
+ import java .util .concurrent .locks .Lock ;
22
+ import java .util .concurrent .locks .ReentrantLock ;
20
23
21
24
import org .tinystruct .AbstractApplication ;
22
25
import org .tinystruct .ApplicationException ;
26
+ import org .tinystruct .ApplicationRuntimeException ;
23
27
import org .tinystruct .data .component .Builder ;
24
28
import org .tinystruct .system .ApplicationManager ;
25
29
@@ -31,14 +35,17 @@ public class talk extends AbstractApplication {
31
35
protected final Map <String , Queue <Builder >> list = new ConcurrentHashMap <String , Queue <Builder >>();
32
36
protected final Map <String , List <String >> sessions = new ConcurrentHashMap <String , List <String >>();
33
37
private ExecutorService service ;
38
+ private Lock lock = new ReentrantLock ();
39
+ private Condition consumer = lock .newCondition ();
40
+ private Condition producer = lock .newCondition ();
34
41
35
42
@ Override
36
43
public void init () {
37
44
this .setAction ("talk/update" , "update" );
38
45
this .setAction ("talk/save" , "save" );
39
46
this .setAction ("talk/version" , "version" );
40
47
this .setAction ("talk/testing" , "testing" );
41
-
48
+
42
49
if (this .service != null ) {
43
50
Runtime .getRuntime ().addShutdownHook (new Thread (new Runnable () {
44
51
@ Override
@@ -131,17 +138,17 @@ public final String update(final String sessionId) throws ApplicationException,
131
138
Queue <Builder > messages = this .list .get (sessionId );
132
139
// If there is a new message, then return it directly
133
140
if ((message = messages .poll ()) != null ) return message .toString ();
134
-
135
- synchronized (talk .class ) {
136
- while ((message = messages .poll ()) == null ) {
137
- try {
138
- talk .class .wait (TIMEOUT );
139
- } catch (InterruptedException e ) {
140
- throw new ApplicationException (e .getMessage (), e );
141
- }
141
+ lock .lock ();
142
+ while ((message = messages .poll ()) == null ) {
143
+ try {
144
+ consumer .await (TIMEOUT , TimeUnit .MICROSECONDS );
145
+ } catch (InterruptedException e ) {
146
+ throw new ApplicationException (e .getMessage (), e );
142
147
}
143
- return message .toString ();
144
148
}
149
+ producer .signalAll ();
150
+ lock .unlock ();
151
+ return message .toString ();
145
152
}
146
153
147
154
/**
@@ -168,10 +175,15 @@ private final void copy(Object meetingCode, Builder builder) {
168
175
while (iterator .hasNext ()) {
169
176
Entry <String , Queue <Builder >> list = iterator .next ();
170
177
if (_sessions .contains (list .getKey ())) {
171
- synchronized (talk .class ) {
172
- list .getValue ().add (builder );
173
- talk .class .notifyAll ();
178
+ lock .lock ();
179
+ try {
180
+ producer .await ();
181
+ } catch (InterruptedException e ) {
182
+ throw new ApplicationRuntimeException (e .getMessage (), e );
174
183
}
184
+ list .getValue ().add (builder );
185
+ consumer .signalAll ();
186
+ lock .unlock ();
175
187
}
176
188
}
177
189
}
0 commit comments