diff --git a/asyc_call.go b/asyc_call.go index 6c97999..de95660 100644 --- a/asyc_call.go +++ b/asyc_call.go @@ -13,15 +13,18 @@ var ErrCancelled = errors.New("task has been cancelled") type TaskResult struct { Result interface{} Err error + CostMs int64 } // ResultStub is returned immediately after calling AsynExecutor // It is the stub to get the result. type ResultStub struct { tResult *TaskResult - retCh chan TaskResult + retCh <-chan TaskResult timeoutMs int64 ctx context.Context + // for some async framework, which returns channel erro + errChan <-chan error } // GetResult is to get the asynchronous task's result @@ -41,6 +44,8 @@ func (rs *ResultStub) GetResult() TaskResult { case <-rs.ctx.Done(): tResult.Err = ErrCancelled rs.tResult = &tResult + case tResult.Err = <-rs.errChan: + rs.tResult = &tResult } return *rs.tResult } diff --git a/asyn_call_test.go b/asyn_call_test.go index b4a18b4..b36381a 100644 --- a/asyn_call_test.go +++ b/asyn_call_test.go @@ -10,7 +10,7 @@ import ( var task = func(ctx context.Context) TaskResult { time.Sleep(time.Second * 3) - return TaskResult{"OK", nil} + return TaskResult{Result: "OK", Err: nil} } func TestHappyPath(t *testing.T) { diff --git a/async_hystrix.go b/async_hystrix.go new file mode 100644 index 0000000..25ce699 --- /dev/null +++ b/async_hystrix.go @@ -0,0 +1,22 @@ +package concurrency + +import ( + "context" + + "github.com/afex/hystrix-go/hystrix" +) + +func AsynHystrix(ctx context.Context, task Task, name string, timeoutMs int64) ResultStub { + retCh := make(chan TaskResult, 1) + + runC := func(ctx context.Context) error { + retCh <- task(ctx) + return nil + } + errChan := hystrix.GoC(ctx, name, runC, nil) + + return ResultStub{retCh: retCh, + timeoutMs: timeoutMs, + ctx: ctx, + errChan: errChan} +} diff --git a/async_hystrix_test.go b/async_hystrix_test.go new file mode 100644 index 0000000..65f662e --- /dev/null +++ b/async_hystrix_test.go @@ -0,0 +1,103 @@ +package concurrency + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/afex/hystrix-go/hystrix" +) + +var testTask = func(ctx context.Context) TaskResult { + time.Sleep(time.Millisecond * 300) + return TaskResult{Result: "OK", Err: nil, CostMs: 200} +} +var hystrixName string + +func BuildHystrix(timeout int) { + hystrixName = "async_hystrix" + hystrix.ConfigureCommand(hystrixName, hystrix.CommandConfig{ + Timeout: timeout, + MaxConcurrentRequests: 2, + RequestVolumeThreshold: 2, + SleepWindow: timeout * 2, + ErrorPercentThreshold: 5, + }) +} +func TestHystrixHappyPath(t *testing.T) { + BuildHystrix(1000) + startTime := time.Now() + + retStub := AsynHystrix(context.TODO(), testTask, hystrixName, 1000) + if time.Since(startTime).Milliseconds() > 2 { + t.Error("It is not asynchronous call.") + } + fmt.Println("nonblocking") + ret := retStub.GetResult() + if ret.Err != nil { + t.Error(ret.Err) + } + if time.Since(startTime).Microseconds() < 3000 { + t.Error("It is not unexpected execution time.") + } + fmt.Println(ret.Result.(string), ret.Err) +} + +func TestHystrixTimeoutPath(t *testing.T) { + BuildHystrix(100) + startTime := time.Now() + retStub := AsynHystrix(context.TODO(), testTask, hystrixName, 100) + if time.Since(startTime).Milliseconds() > 2 { + t.Error("It is not asynchronous call.") + } + fmt.Println("nonblocking") + ret := retStub.GetResult() + if ret.Err == nil { + t.Error("It is not unexpected execution time.") + } + if !errors.Is(ret.Err, ErrTimeout) { + t.Error("It is unexpected error", ret.Err) + } + fmt.Println(ret.Result, ret.Err) + + // test hystrix timeout + retStub = AsynHystrix(context.TODO(), testTask, hystrixName, 1000) + ret = retStub.GetResult() + if ret.Err == nil { + t.Error("It is not unexpected execution time.") + } + if !errors.Is(ret.Err, hystrix.ErrTimeout) { + t.Error("It is unexpected error", ret.Err) + } +} + +func TestHystrixConcurrencyPath(t *testing.T) { + BuildHystrix(1000) + // test hystrix max cocurrency + stubs := []ResultStub{} + stubs = append(stubs, AsynHystrix(context.TODO(), testTask, hystrixName, 1000)) + time.Sleep(time.Millisecond * 100) + stubs = append(stubs, AsynHystrix(context.TODO(), testTask, hystrixName, 1000)) + time.Sleep(time.Millisecond * 100) + stubs = append(stubs, AsynHystrix(context.TODO(), testTask, hystrixName, 1000)) + + ret := stubs[0].GetResult() + if ret.Err != nil { + t.Error("It is unexpected execution time.", ret.Err) + } + + ret = stubs[1].GetResult() + if ret.Err != nil { + t.Error("It is unexpected execution time.", ret.Err) + } + + ret = stubs[2].GetResult() + if ret.Err == nil { + t.Error("It is not unexpected execution time.") + } + if !errors.Is(ret.Err, hystrix.ErrMaxConcurrency) { + t.Error("It is unexpected error", ret.Err) + } +} diff --git a/go.mod b/go.mod index 248a57c..66b4eed 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,8 @@ module github.com/easierway/concurrency_utils go 1.13 + +require ( + github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 + github.com/smartystreets/goconvey v1.7.2 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8715897 --- /dev/null +++ b/go.sum @@ -0,0 +1,15 @@ +github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 h1:rFw4nCn9iMW+Vajsk51NtYIcwSTkXr+JGrMd36kTDJw= +github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= +github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= +github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= +github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= diff --git a/multi_tasks_aggration_test.go b/multi_tasks_aggration_test.go index 17108f2..bcb70e2 100644 --- a/multi_tasks_aggration_test.go +++ b/multi_tasks_aggration_test.go @@ -10,21 +10,21 @@ import ( var task1 Task = func(ctx context.Context) TaskResult { time.Sleep(time.Second * 2) return TaskResult{ - "Task1", nil, + Result: "Task1", Err: nil, } } var task2 Task = func(ctx context.Context) TaskResult { time.Sleep(time.Second * 1) return TaskResult{ - "Task2", nil, + Result: "Task2", Err: nil, } } var task3 Task = func(ctx context.Context) TaskResult { time.Sleep(time.Second * 3) return TaskResult{ - "Task3", nil, + Result: "Task3", Err: nil, } } diff --git a/multi_tasks_aggregation.go b/multi_tasks_aggregation.go index c60d0c2..34849f3 100644 --- a/multi_tasks_aggregation.go +++ b/multi_tasks_aggregation.go @@ -41,9 +41,9 @@ func (stub *AnyResultStub) GetResultWhenAnyTaskReturns() TaskResult { case ret := <-stub.retCh: stub.firstReturn = &ret case <-stub.timer.C: - stub.firstReturn = &TaskResult{nil, ErrTimeout} + stub.firstReturn = &TaskResult{Result: nil, Err: ErrTimeout} case <-stub.ctx.Done(): - stub.firstReturn = &TaskResult{nil, ErrCancelled} + stub.firstReturn = &TaskResult{Result: nil, Err: ErrCancelled} } return *stub.firstReturn