Skip to content

Commit 74d0cef

Browse files
author
yutao.sun
committed
add hystrix
1 parent f1833a5 commit 74d0cef

File tree

8 files changed

+158
-7
lines changed

8 files changed

+158
-7
lines changed

asyc_call.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,19 @@ var ErrCancelled = errors.New("task has been cancelled")
1313
type TaskResult struct {
1414
Result interface{}
1515
Err error
16+
// for the need for record perf metric
17+
CostMs int64
1618
}
1719

1820
// ResultStub is returned immediately after calling AsynExecutor
1921
// It is the stub to get the result.
2022
type ResultStub struct {
2123
tResult *TaskResult
22-
retCh chan TaskResult
24+
retCh <-chan TaskResult
2325
timeoutMs int64
2426
ctx context.Context
27+
// for some async framework, which returns channel erro
28+
errChan chan error
2529
}
2630

2731
// GetResult is to get the asynchronous task's result
@@ -41,6 +45,8 @@ func (rs *ResultStub) GetResult() TaskResult {
4145
case <-rs.ctx.Done():
4246
tResult.Err = ErrCancelled
4347
rs.tResult = &tResult
48+
case tResult.Err = <-rs.errChan:
49+
rs.tResult = &tResult
4450
}
4551
return *rs.tResult
4652
}

asyn_call_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
var task = func(ctx context.Context) TaskResult {
1212
time.Sleep(time.Second * 3)
13-
return TaskResult{"OK", nil}
13+
return TaskResult{Result: "OK", Err: nil}
1414
}
1515

1616
func TestHappyPath(t *testing.T) {

async_hystrix.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package concurrency
2+
3+
import (
4+
"context"
5+
6+
"github.com/afex/hystrix-go/hystrix"
7+
)
8+
9+
func AsynHystrix(ctx context.Context, task Task, name string, timeoutMs int64) ResultStub {
10+
retCh := make(chan TaskResult, 1)
11+
12+
runC := func(ctx context.Context) error {
13+
retCh <- task(ctx)
14+
return nil
15+
}
16+
errChan := hystrix.GoC(ctx, name, runC, nil)
17+
18+
return ResultStub{retCh: retCh,
19+
timeoutMs: timeoutMs,
20+
ctx: ctx,
21+
errChan: errChan}
22+
}

async_hystrix_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package concurrency
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"testing"
8+
"time"
9+
10+
"github.com/afex/hystrix-go/hystrix"
11+
)
12+
13+
var testTask = func(ctx context.Context) TaskResult {
14+
time.Sleep(time.Millisecond * 300)
15+
return TaskResult{Result: "OK", Err: nil, CostMs: 200}
16+
}
17+
var hystrixName string
18+
19+
func BuildHystrix(timeout int) {
20+
hystrixName = "async_hystrix"
21+
hystrix.ConfigureCommand(hystrixName, hystrix.CommandConfig{
22+
Timeout: timeout,
23+
MaxConcurrentRequests: 2,
24+
RequestVolumeThreshold: 2,
25+
SleepWindow: timeout * 2,
26+
ErrorPercentThreshold: 5,
27+
})
28+
}
29+
func TestHystrixHappyPath(t *testing.T) {
30+
BuildHystrix(1000)
31+
startTime := time.Now()
32+
33+
retStub := AsynHystrix(context.TODO(), testTask, hystrixName, 1000)
34+
if time.Since(startTime).Milliseconds() > 2 {
35+
t.Error("It is not asynchronous call.")
36+
}
37+
fmt.Println("nonblocking")
38+
ret := retStub.GetResult()
39+
if ret.Err != nil {
40+
t.Error(ret.Err)
41+
}
42+
if time.Since(startTime).Microseconds() < 3000 {
43+
t.Error("It is not unexpected execution time.")
44+
}
45+
fmt.Println(ret.Result.(string), ret.Err)
46+
}
47+
48+
func TestHystrixTimeoutPath(t *testing.T) {
49+
BuildHystrix(100)
50+
startTime := time.Now()
51+
retStub := AsynHystrix(context.TODO(), testTask, hystrixName, 100)
52+
if time.Since(startTime).Milliseconds() > 2 {
53+
t.Error("It is not asynchronous call.")
54+
}
55+
fmt.Println("nonblocking")
56+
ret := retStub.GetResult()
57+
if ret.Err == nil {
58+
t.Error("It is not unexpected execution time.")
59+
}
60+
if !errors.Is(ret.Err, ErrTimeout) {
61+
t.Error("It is unexpected error", ret.Err)
62+
}
63+
fmt.Println(ret.Result, ret.Err)
64+
65+
// test hystrix timeout
66+
retStub = AsynHystrix(context.TODO(), testTask, hystrixName, 1000)
67+
ret = retStub.GetResult()
68+
if ret.Err == nil {
69+
t.Error("It is not unexpected execution time.")
70+
}
71+
if !errors.Is(ret.Err, hystrix.ErrTimeout) {
72+
t.Error("It is unexpected error", ret.Err)
73+
}
74+
}
75+
76+
func TestHystrixConcurrencyPath(t *testing.T) {
77+
BuildHystrix(1000)
78+
// test hystrix max cocurrency
79+
stubs := []ResultStub{}
80+
stubs = append(stubs, AsynHystrix(context.TODO(), testTask, hystrixName, 1000))
81+
time.Sleep(time.Millisecond * 100)
82+
stubs = append(stubs, AsynHystrix(context.TODO(), testTask, hystrixName, 1000))
83+
time.Sleep(time.Millisecond * 100)
84+
stubs = append(stubs, AsynHystrix(context.TODO(), testTask, hystrixName, 1000))
85+
86+
ret := stubs[0].GetResult()
87+
if ret.Err != nil {
88+
t.Error("It is unexpected execution time.", ret.Err)
89+
}
90+
91+
ret = stubs[1].GetResult()
92+
if ret.Err != nil {
93+
t.Error("It is unexpected execution time.", ret.Err)
94+
}
95+
96+
ret = stubs[2].GetResult()
97+
if ret.Err == nil {
98+
t.Error("It is not unexpected execution time.")
99+
}
100+
if !errors.Is(ret.Err, hystrix.ErrMaxConcurrency) {
101+
t.Error("It is unexpected error", ret.Err)
102+
}
103+
}

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
11
module github.com/easierway/concurrency_utils
22

33
go 1.13
4+
5+
require (
6+
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
7+
github.com/smartystreets/goconvey v1.7.2 // indirect
8+
)

go.sum

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vajsk51NtYIcwSTkXr+JGrMd36kTDJw=
2+
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
3+
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
4+
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
5+
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
6+
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
7+
github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs=
8+
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
9+
github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg=
10+
github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM=
11+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
12+
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
13+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
14+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
15+
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=

multi_tasks_aggration_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,21 @@ import (
1010
var task1 Task = func(ctx context.Context) TaskResult {
1111
time.Sleep(time.Second * 2)
1212
return TaskResult{
13-
"Task1", nil,
13+
Result: "Task1", Err: nil,
1414
}
1515
}
1616

1717
var task2 Task = func(ctx context.Context) TaskResult {
1818
time.Sleep(time.Second * 1)
1919
return TaskResult{
20-
"Task2", nil,
20+
Result: "Task2", Err: nil,
2121
}
2222
}
2323

2424
var task3 Task = func(ctx context.Context) TaskResult {
2525
time.Sleep(time.Second * 3)
2626
return TaskResult{
27-
"Task3", nil,
27+
Result: "Task3", Err: nil,
2828
}
2929
}
3030

multi_tasks_aggregation.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ func (stub *AnyResultStub) GetResultWhenAnyTaskReturns() TaskResult {
4141
case ret := <-stub.retCh:
4242
stub.firstReturn = &ret
4343
case <-stub.timer.C:
44-
stub.firstReturn = &TaskResult{nil, ErrTimeout}
44+
stub.firstReturn = &TaskResult{Result: nil, Err: ErrTimeout}
4545
case <-stub.ctx.Done():
46-
stub.firstReturn = &TaskResult{nil, ErrCancelled}
46+
stub.firstReturn = &TaskResult{Result: nil, Err: ErrCancelled}
4747
}
4848
return *stub.firstReturn
4949

0 commit comments

Comments
 (0)