Skip to content

Commit f1833a5

Browse files
author
chaocai
committed
Added methods for multi-tasks processing
AnyTaskReturn
1 parent 603b698 commit f1833a5

File tree

2 files changed

+286
-0
lines changed

2 files changed

+286
-0
lines changed

multi_tasks_aggration_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package concurrency
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
)
9+
10+
var task1 Task = func(ctx context.Context) TaskResult {
11+
time.Sleep(time.Second * 2)
12+
return TaskResult{
13+
"Task1", nil,
14+
}
15+
}
16+
17+
var task2 Task = func(ctx context.Context) TaskResult {
18+
time.Sleep(time.Second * 1)
19+
return TaskResult{
20+
"Task2", nil,
21+
}
22+
}
23+
24+
var task3 Task = func(ctx context.Context) TaskResult {
25+
time.Sleep(time.Second * 3)
26+
return TaskResult{
27+
"Task3", nil,
28+
}
29+
}
30+
31+
// Tests for AnyOneReturn
32+
func TestHappyCaseOfAnyReturn(t *testing.T) {
33+
startTime := time.Now()
34+
retStub := AsyncExecutorForFirstReturn(context.TODO(), 3500, task1, task2, task3)
35+
36+
if time.Since(startTime).Milliseconds() > 2 {
37+
t.Error("It is not asynchronous call.")
38+
}
39+
fmt.Println("nonblocking")
40+
if retStub.GetResultWhenAnyTaskReturns().Err != nil {
41+
t.Error("Unexpected error occurred.")
42+
}
43+
str, _ := retStub.GetResultWhenAnyTaskReturns().Result.(string)
44+
if str != "Task2" {
45+
t.Error("unexpected return value.")
46+
return
47+
}
48+
if time.Since(startTime).Milliseconds() > 1050 {
49+
t.Error("Time spent is unexpected.")
50+
return
51+
}
52+
fmt.Println(retStub.GetResultWhenAnyTaskReturns().Result.(string),
53+
retStub.GetResultWhenAnyTaskReturns().Err)
54+
}
55+
56+
func TestTimeoutCaseOfAnyReturn(t *testing.T) {
57+
startTime := time.Now()
58+
retStub := AsyncExecutorForFirstReturn(context.TODO(), 100, task1, task2, task3)
59+
60+
if time.Since(startTime).Milliseconds() > 2 {
61+
t.Error("It is not asynchronous call.")
62+
}
63+
fmt.Println("nonblocking")
64+
err := retStub.GetResultWhenAnyTaskReturns().Err
65+
if err == nil || err != ErrTimeout {
66+
t.Error("Timeout error is expected.")
67+
return
68+
}
69+
if time.Since(startTime).Milliseconds() > 110 {
70+
t.Error("Time spent is unexpected.")
71+
return
72+
}
73+
}
74+
75+
func TestCancelCaseOfAnyReturn(t *testing.T) {
76+
startTime := time.Now()
77+
ctx, cacelFn := context.WithCancel(context.Background())
78+
retStub := AsyncExecutorForFirstReturn(ctx, 100, task1, task2, task3)
79+
80+
if time.Since(startTime).Milliseconds() > 2 {
81+
t.Error("It is not asynchronous call.")
82+
}
83+
fmt.Println("nonblocking")
84+
cacelFn()
85+
err := retStub.GetResultWhenAnyTaskReturns().Err
86+
if err == nil || err != ErrCancelled {
87+
t.Error("Cancelled error is expected.")
88+
return
89+
}
90+
if time.Since(startTime).Milliseconds() > 3 {
91+
t.Error("Time spent is unexpected.")
92+
return
93+
}
94+
fmt.Println(retStub.GetResultWhenAnyTaskReturns())
95+
}
96+
97+
// Tests for AllReturn
98+
func TestHappyCaseOfAllReturn(t *testing.T) {
99+
startTime := time.Now()
100+
retStub := AsyncExecutorForAllReturn(context.TODO(), 3500, task1, task2, task3)
101+
102+
if time.Since(startTime).Milliseconds() > 2 {
103+
t.Error("It is not asynchronous call.")
104+
}
105+
fmt.Println("nonblocking")
106+
if retStub.GetResultsWhenAllTasksReturn().Err != nil {
107+
t.Error("Unexpected error occurred.")
108+
return
109+
}
110+
taskRet := *retStub.GetResultsWhenAllTasksReturn().Results
111+
if taskRet[0] == nil || taskRet[1] == nil || taskRet[2] == nil {
112+
t.Error("Unexpected results.")
113+
return
114+
}
115+
rets := *retStub.GetResultsWhenAllTasksReturn().Results
116+
fmt.Println(rets[0], rets[1], rets[2])
117+
if time.Since(startTime).Milliseconds() > 3050 {
118+
t.Error("Time spent is unexpected.")
119+
return
120+
}
121+
}
122+
123+
func TestTimeoutCaseOfAllReturn(t *testing.T) {
124+
startTime := time.Now()
125+
retStub := AsyncExecutorForAllReturn(context.TODO(), 2100, task1, task2, task3)
126+
127+
if time.Since(startTime).Milliseconds() > 2 {
128+
t.Error("It is not asynchronous call.")
129+
}
130+
fmt.Println("nonblocking")
131+
rets := *retStub.GetResultsWhenAllTasksReturn().Results
132+
if rets[0] == nil || rets[1] == nil || rets[2] != nil {
133+
t.Error("unexpected results", rets[0], rets[1], rets[2])
134+
return
135+
}
136+
fmt.Println(rets[0], rets[1], rets[2])
137+
err := retStub.GetResultsWhenAllTasksReturn().Err
138+
fmt.Println(err)
139+
if err == nil || err != ErrTimeout {
140+
t.Error("Timeout error is expected.")
141+
return
142+
}
143+
if time.Since(startTime).Milliseconds() > 2200 {
144+
t.Error("Time spent is unexpected.")
145+
return
146+
}
147+
}
148+
149+
func TestCancelCaseOfAllReturn(t *testing.T) {
150+
startTime := time.Now()
151+
ctx, cacelFn := context.WithCancel(context.Background())
152+
retStub := AsyncExecutorForAllReturn(ctx, 100, task1, task2, task3)
153+
154+
if time.Since(startTime).Milliseconds() > 2 {
155+
t.Error("It is not asynchronous call.")
156+
}
157+
fmt.Println("nonblocking")
158+
cacelFn()
159+
err := retStub.GetResultsWhenAllTasksReturn().Err
160+
if err == nil || err != ErrCancelled {
161+
t.Error("Cancelled error is expected.")
162+
return
163+
}
164+
if time.Since(startTime).Milliseconds() > 3 {
165+
t.Error("Time spent is unexpected.")
166+
return
167+
}
168+
fmt.Println(retStub.GetResultsWhenAllTasksReturn())
169+
rets := *retStub.GetResultsWhenAllTasksReturn().Results
170+
if rets[0] != nil || rets[1] != nil || rets[2] != nil {
171+
t.Error("unexpected results", rets[0], rets[1], rets[2])
172+
return
173+
}
174+
fmt.Println(rets[0], rets[1], rets[2])
175+
}

multi_tasks_aggregation.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package concurrency
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// MultiTaskResults is the results when calling GetResultsWhenAllTasksReturn
9+
type MultiTaskResults struct {
10+
// Results is the results of the tasks
11+
// could be some of the task results
12+
Results *[]*TaskResult
13+
// Err is the errors (including errors about timeout and Cancelled) of the process,
14+
Err error
15+
}
16+
17+
type AllResultStub struct {
18+
tTaskResults *[]*TaskResult
19+
retCh chan int
20+
timeoutMs int64
21+
ctx context.Context
22+
allReturns *MultiTaskResults
23+
numOfTasks int
24+
timer *time.Timer
25+
}
26+
27+
type AnyResultStub struct {
28+
retCh chan TaskResult
29+
timeoutMs int64
30+
ctx context.Context
31+
firstReturn *TaskResult
32+
timer *time.Timer
33+
}
34+
35+
func (stub *AnyResultStub) GetResultWhenAnyTaskReturns() TaskResult {
36+
if stub.firstReturn != nil {
37+
return *stub.firstReturn
38+
}
39+
stub.timer = time.NewTimer(time.Millisecond * time.Duration(stub.timeoutMs))
40+
select {
41+
case ret := <-stub.retCh:
42+
stub.firstReturn = &ret
43+
case <-stub.timer.C:
44+
stub.firstReturn = &TaskResult{nil, ErrTimeout}
45+
case <-stub.ctx.Done():
46+
stub.firstReturn = &TaskResult{nil, ErrCancelled}
47+
}
48+
return *stub.firstReturn
49+
50+
}
51+
52+
func (stub *AllResultStub) GetResultsWhenAllTasksReturn() MultiTaskResults {
53+
numOfTasks := stub.numOfTasks
54+
if stub.allReturns != nil {
55+
return *stub.allReturns
56+
}
57+
stub.timer = time.NewTimer(time.Millisecond * time.Duration(stub.timeoutMs))
58+
stub.allReturns = &MultiTaskResults{
59+
Results: stub.tTaskResults,
60+
}
61+
for i := 0; i < numOfTasks; i++ {
62+
select {
63+
case <-stub.retCh:
64+
case <-stub.timer.C:
65+
stub.allReturns.Err = ErrTimeout
66+
case <-stub.ctx.Done():
67+
stub.allReturns.Err = ErrCancelled
68+
}
69+
}
70+
return *stub.allReturns
71+
}
72+
73+
func AsyncExecutorForFirstReturn(ctx context.Context, timeoutMs int64, tasks ...Task) AnyResultStub {
74+
numOfTasks := len(tasks)
75+
resultsCh := make(chan TaskResult, numOfTasks)
76+
stub := AnyResultStub{
77+
retCh: resultsCh,
78+
timeoutMs: timeoutMs,
79+
ctx: ctx,
80+
}
81+
for i := 0; i < numOfTasks; i++ {
82+
go func(id int) {
83+
ret := tasks[id](ctx)
84+
resultsCh <- ret
85+
86+
}(i)
87+
}
88+
return stub
89+
}
90+
91+
func AsyncExecutorForAllReturn(ctx context.Context, timeoutMs int64, tasks ...Task) AllResultStub {
92+
numOfTasks := len(tasks)
93+
resultsCh := make(chan int, numOfTasks)
94+
results := make([]*TaskResult, numOfTasks)
95+
stub := AllResultStub{
96+
tTaskResults: &results,
97+
retCh: resultsCh,
98+
timeoutMs: timeoutMs,
99+
ctx: ctx,
100+
numOfTasks: numOfTasks,
101+
}
102+
for i := 0; i < numOfTasks; i++ {
103+
go func(id int) {
104+
ret := tasks[id](ctx)
105+
results[id] = &ret
106+
resultsCh <- id
107+
108+
}(i)
109+
}
110+
return stub
111+
}

0 commit comments

Comments
 (0)