// Copyright 2014 The Cockroach Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or // implied. See the License for the specific language governing // permissions and limitations under the License. // // // // Copyright 2017-2019 Lei Ni (nilei81@gmail.com) and other Dragonboat authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License.
// Stopper is a manager struct for managing worker goroutines. It is modified // from an early version of the stopper struct found in CockroachDB's codebase. type Stopper struct { mu sync.Mutex shouldStopC chanstruct{} wg sync.WaitGroup debug bool }
// NewStopper return a new Stopper instance. funcNewStopper() *Stopper { s := &Stopper{ shouldStopC: make(chanstruct{}), debug: envutil.GetBoolEnvVarOrDefault("LEAKTEST", false), // 用于决定是否打印调试信息 }
return s }
// RunWorker creates a new goroutine and invoke the f func in that new // worker goroutine. func(s *Stopper)RunWorker(f func()) { s.mu.Lock() defer s.mu.Unlock() if s.stopped() { // 已经被关闭了,不再添加新的子 goroutine return } s.runWorker(f, "") }
func(s *Stopper)runWorker(f func(), namestring) { s.wg.Add(1) // 等待执行,数量 ++ var gid uint64 gofunc() { // runWorker 内部会根据传入的 f 创建 goroutine if s.debug { gid = lang.GetGIDForDebugOnly() log.Printf("goroutine %d started, name %s", gid, name) } f() s.wg.Done() // 执行完毕,数量 -- if s.debug { log.Printf("goroutine %d stopped, name %s", gid, name) } }() }
// ShouldStop returns a chan struct{} used for indicating whether the // Stop() function has been called on Stopper. func(s *Stopper)ShouldStop()chanstruct{} { return s.shouldStopC }
// Stop signals all managed worker goroutines to stop and wait for them // to actually stop. // 停止所有子 goroutine,阻塞等待子 goroutine 退出 func(s *Stopper)Stop() { s.mu.Lock() defer s.mu.Unlock() close(s.shouldStopC) s.wg.Wait() // 阻塞直到所有子 goroutine 退出 }
// Close closes the internal shouldStopc chan struct{} to signal all // worker goroutines that they should stop. // 停止所有子 goroutine,不阻塞 func(s *Stopper)Close() { close(s.shouldStopC) }
// Wait waits on the internal sync.WaitGroup. It only return when all // managed worker goroutines are ready to return and called // sync.WaitGroup.Done() on the internal sync.WaitGroup. // 等所有子 goroutine 结束 func(s *Stopper)Wait() { s.wg.Wait() } // stopped 返回 stoper 是否已经被调用 stop 了 func(s *Stopper)stopped()bool { select { case <-s.shouldStopC: returntrue default: } returnfalse }