funcmain() { // 初始化context var wg sync.WaitGroup ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel()
// 添加数据 generator := func(data string, stream chan any) { for { select { case <-ctx.Done(): return case stream <- data: } } } infiniteApples := make(chan any) go generator("apple", infiniteApples)
infiniteBananas := make(chan any) go generator("banana", infiniteBananas)
infiniteOranges := make(chan any) go generator("orange", infiniteOranges)
wg.Add(1) go func1(ctx, &wg, infiniteApples) func2 := genericFunc func3 := genericFunc wg.Add(1) go func2(ctx, &wg, infiniteBananas) wg.Add(1) go func3(ctx, &wg, infiniteOranges)
wg.Wait() }
funcfunc1(ctx context.Context, s *sync.WaitGroup, streams <-chan any) { defer s.Done() var wg sync.WaitGroup
doWOrks := func(CTX context.Context) { defer wg.Done() for { select { case <-CTX.Done(): return case d, ok := <-streams: if !ok { fmt.Println("stream closed") return } fmt.Println(d) } } } // 基于父上下文设置自己的上下文 newCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel()
// 启动自己的子函数 for i := 0; i < 3; i++ { wg.Add(1) go doWOrks(newCtx) } wg.Wait() }
// 沿用父函数的context funcgenericFunc(ctx context.Context, s *sync.WaitGroup, apples chan any) { defer s.Done() for { select { case <-ctx.Done(): return case d, ok := <-apples: if !ok { fmt.Println("stream closed") return } fmt.Println(d) } } }