@aliasliyu4
2018-08-13T15:23:08.000000Z
字数 5970
阅读 3846
下文是github上go-hystrix对与其功能的阐述,其脱胎于java版本的Hystrix库,主要目的是为了解决分布式系统中对于错误的保护,这一点从其熔断的定义也可以明白。
Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.
I think the Hystrix patterns of programmer-defined fallbacks and adaptive health monitoring are good for any distributed system. Go routines and channels are great concurrency primitives, but don't directly help our application stay available during failures.
hystrix-go aims to allow Go programmers to easily build applications with similar execution semantics of the Java-based Hystrix library.
type CommandConfig struct {Timeout int `json:"timeout"` // 超时时间定义MaxConcurrentRequests int `json:"max_concurrent_requests"` // 最大并发请求数RequestVolumeThreshold int `json:"request_volume_threshold"` // 跳闸的最小请求数(不健康的断路器)SleepWindow int `json:"sleep_window"` // 跳闸之后可以重试的时间ErrorPercentThreshold int `json:"error_percent_threshold"` // 请求出错比}
// CircuitBreaker is created for each ExecutorPool to track whether requests// should be attempted, or rejected if the Health of the circuit is too low.type CircuitBreaker struct { // 断路器-定义Name string // 名字open bool // 开启与否,关闭"open"=true,开启"open" = falseforceOpen bool // 强制开启mutex *sync.RWMutex // 读写锁(unblock reading, block writer)openedOrLastTestedTime int64 // 断路器被打开或者最近一次尝试的时间,尝试指断路器打开之后,系统探测是否可以发送请求。executorPool *executorPool // 执行池metrics *metricExchange // 监控断路器}var (circuitBreakersMutex *sync.RWMutex // 断路器锁circuitBreakers map[string]*CircuitBreaker // 注册断路器,所有的断路器都保存在这里)func init() {circuitBreakersMutex = &sync.RWMutex{} // 初始化断路器锁circuitBreakers = make(map[string]*CircuitBreaker) // 初始化断路器}
// GetCircuit returns the circuit for the given command and whether this call created it.func GetCircuit(name string) (*CircuitBreaker, bool, error) {circuitBreakersMutex.RLock()_, ok := circuitBreakers[name]if !ok {circuitBreakersMutex.RUnlock()circuitBreakersMutex.Lock()defer circuitBreakersMutex.Unlock() // 注意这里同时加了两次🔒且第二把锁是互斥锁,其中一个goroutine hold住并且赋值,锁释放。其他goroutine从内存中获取断路器if cb, ok := circuitBreakers[name]; ok { // double check,防止其他的goroutine修改了全局变量circuitBreakersreturn cb, false, nil}circuitBreakers[name] = newCircuitBreaker(name)} else {defer circuitBreakersMutex.RUnlock()}return circuitBreakers[name], !ok, nil}
type executorPool struct { // 执行池Name string // 名字Metrics *poolMetrics // 执行池监控Max int // 最大的并发请求数量Tickets chan *struct{} // 票证}// 开启一个新的执行池func newExecutorPool(name string) *executorPool {p := &executorPool{}p.Name = name // 名字p.Metrics = newPoolMetrics(name)p.Max = getSettings(name).MaxConcurrentRequests // 从配置中获取最大的并发请求数量,如果配置中没有,则从默认配置中获取p.Tickets = make(chan *struct{}, p.Max) // 初始化buffer chanfor i := 0; i < p.Max; i++ {p.Tickets <- &struct{}{}}return p}func (p *executorPool) Return(ticket *struct{}) {if ticket == nil {return}p.Metrics.Updates <- poolMetricsUpdate{activeCount: p.ActiveCount(),}p.Tickets <- ticket}func (p *executorPool) ActiveCount() int {return p.Max - len(p.Tickets)}
func Go(name string, run runFunc, fallback fallbackFunc) chan error {runC := func(ctx context.Context) error { // 匿名run带ctxreturn run()}var fallbackC fallbackFuncCif fallback != nil {fallbackC = func(ctx context.Context, err error) error { // 匿名fallback带ctxreturn fallback(err)}}return GoC(context.Background(), name, runC, fallbackC)}
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {cmd := &command{ // command执行者run: run, // runfallback: fallback, // fallbackstart: time.Now(), // 开始时间errChan: make(chan error, 1), // 错误finished: make(chan bool, 1), // 是否完成}// dont have methods with explicit params and returns// let data come in and out naturally, like with any closure// explicit error return to give place for us to kill switch the operation (fallback)circuit, _, err := GetCircuit(name) // 获取断路器if err != nil {cmd.errChan <- errreturn cmd.errChan}cmd.circuit = circuitticketCond := sync.NewCond(cmd) // cond条件,具体使用见下文参考ticketChecked := falsereturnTicket := func() { //cmd.Lock()// Avoid releasing before a ticket is acquired.for !ticketChecked {ticketCond.Wait() // 相当于select{}}cmd.circuit.executorPool.Return(cmd.ticket) // 将ticket放回池子中cmd.Unlock()}returnOnce := &sync.Once{} // 确保被multi goroutine执行一次reportAllEvent := func() { // events采集,后续dashboard使用err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)if err != nil {log.Printf(err.Error())}}// g1, 检测断路器不允许通过,尝试fallback,将中途遇到的event上报。go func() {defer func() { cmd.finished <- true }()if !cmd.circuit.AllowRequest() {cmd.Lock()// It's safe for another goroutine to go ahead releasing a nil ticket.ticketChecked = trueticketCond.Signal()cmd.Unlock()returnOnce.Do(func() {returnTicket()cmd.errorWithFallback(ctx, ErrCircuitOpen)reportAllEvent()})return}cmd.Lock()select {case cmd.ticket = <-circuit.executorPool.Tickets: // 从池子中取出ticketticketChecked = trueticketCond.Signal() // 通知cond.Wait()cmd.Unlock()default:ticketChecked = trueticketCond.Signal()cmd.Unlock()returnOnce.Do(func() {returnTicket()cmd.errorWithFallback(ctx, ErrMaxConcurrency) // 并发过高reportAllEvent()})return}runStart := time.Now()runErr := run(ctx)returnOnce.Do(func() {defer reportAllEvent()cmd.runDuration = time.Since(runStart) // 运行时间returnTicket() // 把ticket返回去if runErr != nil {cmd.errorWithFallback(ctx, runErr) // fall back!return}cmd.reportEvent("success") // 执行成功})}()go func() {timer := time.NewTimer(getSettings(name).Timeout)defer timer.Stop()select {case <-cmd.finished: // 结束select// returnOnce has been executed in another goroutinecase <-ctx.Done(): // 处理ctx错误returnOnce.Do(func() {returnTicket()cmd.errorWithFallback(ctx, ctx.Err())reportAllEvent()})returncase <-timer.C: // 处理超时returnOnce.Do(func() {returnTicket()cmd.errorWithFallback(ctx, ErrTimeout)reportAllEvent()})return}}()return cmd.errChan // 错误返回至上层}
func Do(name string, run runFunc, fallback fallbackFunc) error {runC := func(ctx context.Context) error {return run()}var fallbackC fallbackFuncCif fallback != nil {fallbackC = func(ctx context.Context, err error) error {return fallback(err)}}return DoC(context.Background(), name, runC, fallbackC) // 具体逻辑如上GoC}
参考: