1// Copyright 2022 The Bazel Authors. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// Package respipe contains utilities for running pipelines on android resources. 16package respipe 17 18import ( 19 "context" 20 "sync" 21 22 rdpb "src/tools/ak/res/proto/res_data_go_proto" 23 "src/tools/ak/res/res" 24) 25 26// MergePathInfoStreams fans in multiple PathInfo streams into a single stream. 27func MergePathInfoStreams(ctx context.Context, piCs []<-chan *res.PathInfo) <-chan *res.PathInfo { 28 piC := make(chan *res.PathInfo) 29 var wg sync.WaitGroup 30 wg.Add(len(piCs)) 31 output := func(c <-chan *res.PathInfo) { 32 defer wg.Done() 33 for r := range c { 34 select { 35 case piC <- r: 36 case <-ctx.Done(): 37 return 38 } 39 } 40 } 41 for _, rc := range piCs { 42 go output(rc) 43 } 44 go func() { 45 wg.Wait() 46 close(piC) 47 }() 48 return piC 49} 50 51// MergeResStreams fans in multiple Resource streams into a single stream. 52func MergeResStreams(ctx context.Context, resCs []<-chan *rdpb.Resource) <-chan *rdpb.Resource { 53 resC := make(chan *rdpb.Resource) 54 var wg sync.WaitGroup 55 wg.Add(len(resCs)) 56 output := func(c <-chan *rdpb.Resource) { 57 defer wg.Done() 58 for r := range c { 59 select { 60 case resC <- r: 61 case <-ctx.Done(): 62 return 63 } 64 } 65 } 66 for _, rc := range resCs { 67 go output(rc) 68 } 69 go func() { 70 wg.Wait() 71 close(resC) 72 }() 73 return resC 74} 75 76// MergeErrStreams fans in multiple error streams into a single stream. 77func MergeErrStreams(ctx context.Context, errCs []<-chan error) <-chan error { 78 errC := make(chan error) 79 var wg sync.WaitGroup 80 wg.Add(len(errCs)) 81 output := func(c <-chan error) { 82 defer wg.Done() 83 for e := range c { 84 select { 85 case errC <- e: 86 case <-ctx.Done(): 87 return 88 } 89 } 90 } 91 for _, rc := range errCs { 92 go output(rc) 93 } 94 go func() { 95 wg.Wait() 96 close(errC) 97 }() 98 return errC 99} 100 101// SendErr attempts to send the provided error to the provided chan, however is the context is canceled, it will return false. 102func SendErr(ctx context.Context, errC chan<- error, err error) bool { 103 select { 104 case <-ctx.Done(): 105 return false 106 case errC <- err: 107 return true 108 } 109} 110 111// SendRes attempts to send the provided resource to the provided chan, however is the context is canceled, it will return false. 112func SendRes(ctx context.Context, resC chan<- *rdpb.Resource, r *rdpb.Resource) bool { 113 select { 114 case <-ctx.Done(): 115 return false 116 case resC <- r: 117 return true 118 } 119} 120