Skip to content

Commit fcc0e66

Browse files
IWF-397: Add channel sizes data for RPC (#280)
1 parent 712bae9 commit fcc0e66

File tree

7 files changed

+128
-11
lines changed

7 files changed

+128
-11
lines changed

iwf-idl

src/main/java/io/iworkflow/core/WorkerService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ public WorkflowWorkerRpcResponse handleWorkflowWorkerRpc(final WorkflowWorkerRpc
7070
final Map<String, SearchAttributeValueType> searchAttrsTypeMap = registry.getSearchAttributeKeyToTypeMap(req.getWorkflowType());
7171
final SearchAttributeRWImpl searchAttributeRW = new SearchAttributeRWImpl(searchAttrsTypeMap, req.getSearchAttributes());
7272
final CommunicationImpl communication = new CommunicationImpl(
73+
req.getInternalChannelInfos(),
74+
req.getSignalChannelInfos(),
7375
registry.getInternalChannelTypeStore(req.getWorkflowType()),
76+
registry.getSignalChannelTypeStore(req.getWorkflowType()),
7477
workerOptions.getObjectEncoder(),
7578
true
7679
);
@@ -169,7 +172,10 @@ public WorkflowStateWaitUntilResponse handleWorkflowStateWaitUntil(final Workflo
169172
final Map<String, SearchAttributeValueType> searchAttrsTypeMap = registry.getSearchAttributeKeyToTypeMap(req.getWorkflowType());
170173
final SearchAttributeRWImpl searchAttributeRW = new SearchAttributeRWImpl(searchAttrsTypeMap, req.getSearchAttributes());
171174
final CommunicationImpl communication = new CommunicationImpl(
175+
new HashMap<>(),
176+
new HashMap<>(),
172177
registry.getInternalChannelTypeStore(req.getWorkflowType()),
178+
registry.getSignalChannelTypeStore(req.getWorkflowType()),
173179
workerOptions.getObjectEncoder(),
174180
false
175181
);
@@ -234,7 +240,10 @@ public WorkflowStateExecuteResponse handleWorkflowStateExecute(final WorkflowSta
234240
final Map<String, SearchAttributeValueType> saTypeMap = registry.getSearchAttributeKeyToTypeMap(req.getWorkflowType());
235241
final SearchAttributeRWImpl searchAttributeRW = new SearchAttributeRWImpl(saTypeMap, req.getSearchAttributes());
236242
final CommunicationImpl communication = new CommunicationImpl(
243+
new HashMap<>(),
244+
new HashMap<>(),
237245
registry.getInternalChannelTypeStore(req.getWorkflowType()),
246+
registry.getSignalChannelTypeStore(req.getWorkflowType()),
238247
workerOptions.getObjectEncoder(),
239248
false
240249
);

src/main/java/io/iworkflow/core/communication/Communication.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,22 @@
44

55
public interface Communication {
66

7+
/**
8+
* Get the size of the internal channel(including the one being sent in the buffer)
9+
* NOTE: currently only supported in RPC
10+
* @param channelName the channel name to get size
11+
* @return the size of the internal channel
12+
*/
13+
int getInternalChannelSize(final String channelName);
14+
15+
/**
16+
* Get the size of the signal channel(including the one being sent in the buffer)
17+
* NOTE: currently only supported in RPC
18+
* @param channelName the channel name to get size
19+
* @return the size of the signal channel
20+
*/
21+
int getSignalChannelSize(final String channelName);
22+
723
/**
824
* Publish a value to an internal Channel
925
*

src/main/java/io/iworkflow/core/communication/CommunicationImpl.java

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.iworkflow.core.StateMovement;
55
import io.iworkflow.core.TypeStore;
66
import io.iworkflow.core.WorkflowDefinitionException;
7+
import io.iworkflow.gen.models.ChannelInfo;
78
import io.iworkflow.gen.models.EncodedObject;
89

910
import java.util.ArrayList;
@@ -14,7 +15,12 @@
1415

1516
public class CommunicationImpl implements Communication {
1617

17-
final TypeStore typeStore;
18+
final TypeStore internalChannelTypeStore;
19+
final TypeStore signalChannelTypeStore;
20+
21+
final Map<String, ChannelInfo> internalChannelInfos;
22+
23+
final Map<String, ChannelInfo> signalChannelInfos;
1824
final Map<String, List<EncodedObject>> toPublish = new HashMap<>();
1925

2026
final List<StateMovement> stateMovements = new ArrayList<>();
@@ -23,23 +29,63 @@ public class CommunicationImpl implements Communication {
2329
final ObjectEncoder objectEncoder;
2430

2531
public CommunicationImpl(
26-
final TypeStore typeStore,
32+
final Map<String, ChannelInfo> internalChannelInfos,
33+
final Map<String, ChannelInfo> signalChannelInfos,
34+
final TypeStore internalChannelTypeStore,
35+
final TypeStore signalChannelTypeStore,
2736
final ObjectEncoder objectEncoder,
2837
final boolean allowTriggerStateMovements) {
29-
this.typeStore = typeStore;
38+
this.internalChannelInfos = internalChannelInfos;
39+
this.signalChannelInfos = signalChannelInfos;
40+
this.internalChannelTypeStore = internalChannelTypeStore;
41+
this.signalChannelTypeStore = signalChannelTypeStore;
3042
this.objectEncoder = objectEncoder;
3143
this.allowTriggerStateMovements = allowTriggerStateMovements;
3244
}
3345

46+
@Override
47+
public int getInternalChannelSize(final String channelName) {
48+
checkInternalChannelNameValid(channelName, null);
49+
int size = 0;
50+
if(internalChannelInfos.containsKey(channelName)){
51+
size += internalChannelInfos.get(channelName).getSize();
52+
}
53+
if(toPublish.containsKey(channelName)){
54+
size += toPublish.get(channelName).size();
55+
}
56+
return size;
57+
}
58+
59+
@Override
60+
public int getSignalChannelSize(final String channelName) {
61+
checkSignalChannelNameValid(channelName, null);
62+
if(signalChannelInfos.containsKey(channelName)){
63+
return signalChannelInfos.get(channelName).getSize();
64+
}
65+
return 0;
66+
}
67+
3468
@Override
3569
public void publishInternalChannel(final String channelName, final Object value) {
36-
final Class<?> type = typeStore.getType(channelName);
70+
checkInternalChannelNameValid(channelName, value);
71+
final List<EncodedObject> publish = toPublish.computeIfAbsent(channelName, s -> new ArrayList<>());
72+
publish.add(objectEncoder.encode(value));
73+
}
74+
75+
private void checkInternalChannelNameValid(final String channelName, final Object value) {
76+
final Class<?> type = internalChannelTypeStore.getType(channelName);
3777

3878
if (value != null && !type.isInstance(value)) {
3979
throw new WorkflowDefinitionException(String.format("InternalChannel value is not of type %s", type.getName()));
4080
}
41-
final List<EncodedObject> publish = toPublish.computeIfAbsent(channelName, s -> new ArrayList<>());
42-
publish.add(objectEncoder.encode(value));
81+
}
82+
83+
private void checkSignalChannelNameValid(final String channelName, final Object value) {
84+
final Class<?> type = signalChannelTypeStore.getType(channelName);
85+
86+
if (value != null && !type.isInstance(value)) {
87+
throw new WorkflowDefinitionException(String.format("SignalChannel value is not of type %s", type.getName()));
88+
}
4389
}
4490

4591
@Override

src/test/java/io/iworkflow/integ/RpcTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.iworkflow.core.ImmutableWorkflowOptions;
99
import io.iworkflow.gen.models.*;
1010
import io.iworkflow.integ.persistence.BasicPersistenceWorkflow;
11+
import io.iworkflow.integ.rpc.DeadEndStateWorkflow;
1112
import io.iworkflow.integ.rpc.NoStateWorkflow;
1213
import io.iworkflow.integ.rpc.RpcWorkflow;
1314
import io.iworkflow.integ.rpc.RpcWorkflowState2;
@@ -313,4 +314,23 @@ public void testRpcError() throws InterruptedException {
313314
client.stopWorkflow(wfId, null);
314315
}
315316

317+
318+
@Test
319+
public void testSignalChannelSizeInfo(){
320+
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
321+
final String wfId = "testSignalChannelSizeInfo" + System.currentTimeMillis() / 1000;
322+
client.startWorkflow(
323+
DeadEndStateWorkflow.class, wfId, 10);
324+
final DeadEndStateWorkflow rpcStub = client.newRpcStub(DeadEndStateWorkflow.class, wfId, "");
325+
client.invokeRPC(rpcStub::sendAndGetInternalChannelSize);
326+
final Integer size1 = client.invokeRPC(rpcStub::sendAndGetInternalChannelSize);
327+
Assertions.assertEquals(2, size1);
328+
329+
client.signalWorkflow(DeadEndStateWorkflow.class, wfId, DeadEndStateWorkflow.IDLE_SIGNAL_CHANNEL, null);
330+
client.signalWorkflow(DeadEndStateWorkflow.class, wfId, DeadEndStateWorkflow.IDLE_SIGNAL_CHANNEL, null);
331+
client.signalWorkflow(DeadEndStateWorkflow.class, wfId, DeadEndStateWorkflow.IDLE_SIGNAL_CHANNEL, null);
332+
final Integer size2 = client.invokeRPC(rpcStub::getSignalChannelSize);
333+
Assertions.assertEquals(3, size2);
334+
335+
}
316336
}

src/test/java/io/iworkflow/integ/rpc/DeadEndStateWorkflow.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import io.iworkflow.core.WorkflowState;
1010
import io.iworkflow.core.command.CommandResults;
1111
import io.iworkflow.core.communication.Communication;
12+
import io.iworkflow.core.communication.CommunicationMethodDef;
13+
import io.iworkflow.core.communication.InternalChannelDef;
14+
import io.iworkflow.core.communication.SignalChannelDef;
1215
import io.iworkflow.core.persistence.Persistence;
1316
import org.springframework.stereotype.Component;
1417

@@ -19,6 +22,17 @@
1922

2023
@Component
2124
public class DeadEndStateWorkflow implements ObjectWorkflow {
25+
26+
public static final String IDLE_INTERNAL_CHANNEL = "ideal-internal-channel";
27+
public static final String IDLE_SIGNAL_CHANNEL = "ideal-signal-channel";
28+
@Override
29+
public List<CommunicationMethodDef> getCommunicationSchema() {
30+
return Arrays.asList(
31+
InternalChannelDef.create(Void.class, IDLE_INTERNAL_CHANNEL),
32+
SignalChannelDef.create(Void.class, IDLE_SIGNAL_CHANNEL)
33+
);
34+
}
35+
2236
@Override
2337
public List<StateDef> getWorkflowStates() {
2438
return Arrays.asList(
@@ -27,6 +41,16 @@ public List<StateDef> getWorkflowStates() {
2741
);
2842
}
2943

44+
@RPC
45+
public int getSignalChannelSize(Context context, Persistence persistence, Communication communication) {
46+
return communication.getSignalChannelSize(IDLE_SIGNAL_CHANNEL);
47+
}
48+
49+
@RPC
50+
public int sendAndGetInternalChannelSize(Context context, Persistence persistence, Communication communication) {
51+
communication.publishInternalChannel(IDLE_INTERNAL_CHANNEL, null);
52+
return communication.getInternalChannelSize(IDLE_INTERNAL_CHANNEL);
53+
}
3054
@RPC
3155
public Long testRpcFunc1(Context context, String input, Persistence persistence, Communication communication) {
3256
if (context.getWorkflowId().isEmpty() || context.getWorkflowRunId().isEmpty() ||

src/test/java/io/iworkflow/spring/controller/ApiController.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,21 @@ private ResponseEntity<?> processWorkerException(final String workflowId, final
6868
PrintWriter pw = new PrintWriter(sw);
6969
e.printStackTrace(pw);
7070
String sStackTrace = sw.toString(); // stack trace as a string
71-
if(sStackTrace.length()>500){
71+
if(sStackTrace.length()>1000){
7272
// make sure NOT exceeding 4K limit in Temporal https://github.com/indeedeng/iwf/issues/272
73-
sStackTrace = sStackTrace.substring(0, 500) + "...(TRUNCATED)";
73+
sStackTrace = sStackTrace.substring(0, 1000) + "...(TRUNCATED)";
7474
}
7575
String msg = e.getMessage();
7676
if(msg == null){
7777
System.out.println("empty message for exception "+sw.toString());
7878
msg = "empty message for exception "+e;
7979
}
80-
if (msg.length() > 50) {
81-
msg = msg.substring(0, 50) + "...(TRUNCATED)";
80+
if (msg.length() > 500) {
81+
msg = msg.substring(0, 500) + "...(TRUNCATED)";
8282
}
8383

84+
System.out.println("Exception in "+methodType+" for workflow "+workflowId+" : "+msg + "\n STACKTRACE: \n"+sStackTrace);
85+
8486
final WorkerErrorResponse errResp = new WorkerErrorResponse()
8587
.detail(msg+"\n STACKTRACE: \n"+sStackTrace)
8688
.errorType(e.getClass().getName());

0 commit comments

Comments
 (0)