1// Copyright 2018 The Bazel Authors. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package bucketize 16 17import ( 18 "context" 19 "fmt" 20 "strings" 21 "sync" 22 23 "src/tools/ak/res/res" 24) 25 26type contextKey int 27 28const ( 29 ctxErr contextKey = 0 30) 31 32// errorf returns a formatted error with any context sensitive information prefixed to the error 33func errorf(ctx context.Context, fmts string, a ...interface{}) error { 34 if s, ok := ctx.Value(ctxErr).(string); ok { 35 return fmt.Errorf(strings.Join([]string{s, fmts}, ""), a...) 36 } 37 return fmt.Errorf(fmts, a...) 38} 39 40// prefixErr returns a context which adds a prefix to error messages. 41func prefixErr(ctx context.Context, add string) context.Context { 42 if s, ok := ctx.Value(ctxErr).(string); ok { 43 return context.WithValue(ctx, ctxErr, strings.Join([]string{s, add}, "")) 44 } 45 return context.WithValue(ctx, ctxErr, add) 46} 47 48func separatePathInfosByValues(ctx context.Context, pis []*res.PathInfo) (<-chan *res.PathInfo, <-chan *res.PathInfo) { 49 valuesPIC := make(chan *res.PathInfo) 50 nonValuesPIC := make(chan *res.PathInfo) 51 go func() { 52 defer close(valuesPIC) 53 defer close(nonValuesPIC) 54 for _, pi := range pis { 55 if pi.Type.Kind() == res.Value || pi.Type.Kind() == res.Both && strings.HasPrefix(pi.TypeDir, "values") { 56 select { 57 case valuesPIC <- pi: 58 case <-ctx.Done(): 59 return 60 } 61 } else { 62 select { 63 case nonValuesPIC <- pi: 64 case <-ctx.Done(): 65 return 66 } 67 } 68 } 69 }() 70 return valuesPIC, nonValuesPIC 71} 72 73func mergeValuesResourceStreams(ctx context.Context, vrCs []<-chan *res.ValuesResource) <-chan *res.ValuesResource { 74 vrC := make(chan *res.ValuesResource) 75 var wg sync.WaitGroup 76 wg.Add(len(vrCs)) 77 output := func(c <-chan *res.ValuesResource) { 78 defer wg.Done() 79 for vr := range c { 80 select { 81 case vrC <- vr: 82 case <-ctx.Done(): 83 return 84 } 85 } 86 } 87 for _, c := range vrCs { 88 go output(c) 89 } 90 go func() { 91 wg.Wait() 92 close(vrC) 93 }() 94 return vrC 95} 96 97func mergeResourcesAttributeStreams(ctx context.Context, raCs []<-chan *ResourcesAttribute) <-chan *ResourcesAttribute { 98 raC := make(chan *ResourcesAttribute) 99 var wg sync.WaitGroup 100 wg.Add(len(raCs)) 101 output := func(c <-chan *ResourcesAttribute) { 102 defer wg.Done() 103 for ra := range c { 104 select { 105 case raC <- ra: 106 case <-ctx.Done(): 107 return 108 } 109 } 110 } 111 for _, c := range raCs { 112 go output(c) 113 } 114 go func() { 115 wg.Wait() 116 close(raC) 117 }() 118 return raC 119} 120 121// mergeErrStreams fans in multiple error streams into a single stream. 122func mergeErrStreams(ctx context.Context, errCs []<-chan error) <-chan error { 123 errC := make(chan error) 124 var wg sync.WaitGroup 125 wg.Add(len(errCs)) 126 output := func(c <-chan error) { 127 defer wg.Done() 128 for e := range c { 129 select { 130 case errC <- e: 131 case <-ctx.Done(): 132 return 133 } 134 } 135 } 136 for _, rc := range errCs { 137 go output(rc) 138 } 139 go func() { 140 wg.Wait() 141 close(errC) 142 }() 143 return errC 144} 145 146// sendErr attempts to send the provided error to the provided chan, however is the context is canceled, it will return false. 147func sendErr(ctx context.Context, errC chan<- error, err error) bool { 148 select { 149 case <-ctx.Done(): 150 return false 151 case errC <- err: 152 return true 153 } 154} 155