xref: /aosp_15_r20/external/bazelbuild-rules_android/src/tools/ak/res/respipe/streams.go (revision 9e965d6fece27a77de5377433c2f7e6999b8cc0b)
1// Copyright 2022 The Bazel Authors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//    http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package respipe contains utilities for running pipelines on android resources.
16package respipe
17
18import (
19	"context"
20	"sync"
21
22	rdpb "src/tools/ak/res/proto/res_data_go_proto"
23	"src/tools/ak/res/res"
24)
25
26// MergePathInfoStreams fans in multiple PathInfo streams into a single stream.
27func MergePathInfoStreams(ctx context.Context, piCs []<-chan *res.PathInfo) <-chan *res.PathInfo {
28	piC := make(chan *res.PathInfo)
29	var wg sync.WaitGroup
30	wg.Add(len(piCs))
31	output := func(c <-chan *res.PathInfo) {
32		defer wg.Done()
33		for r := range c {
34			select {
35			case piC <- r:
36			case <-ctx.Done():
37				return
38			}
39		}
40	}
41	for _, rc := range piCs {
42		go output(rc)
43	}
44	go func() {
45		wg.Wait()
46		close(piC)
47	}()
48	return piC
49}
50
51// MergeResStreams fans in multiple Resource streams into a single stream.
52func MergeResStreams(ctx context.Context, resCs []<-chan *rdpb.Resource) <-chan *rdpb.Resource {
53	resC := make(chan *rdpb.Resource)
54	var wg sync.WaitGroup
55	wg.Add(len(resCs))
56	output := func(c <-chan *rdpb.Resource) {
57		defer wg.Done()
58		for r := range c {
59			select {
60			case resC <- r:
61			case <-ctx.Done():
62				return
63			}
64		}
65	}
66	for _, rc := range resCs {
67		go output(rc)
68	}
69	go func() {
70		wg.Wait()
71		close(resC)
72	}()
73	return resC
74}
75
76// MergeErrStreams fans in multiple error streams into a single stream.
77func MergeErrStreams(ctx context.Context, errCs []<-chan error) <-chan error {
78	errC := make(chan error)
79	var wg sync.WaitGroup
80	wg.Add(len(errCs))
81	output := func(c <-chan error) {
82		defer wg.Done()
83		for e := range c {
84			select {
85			case errC <- e:
86			case <-ctx.Done():
87				return
88			}
89		}
90	}
91	for _, rc := range errCs {
92		go output(rc)
93	}
94	go func() {
95		wg.Wait()
96		close(errC)
97	}()
98	return errC
99}
100
101// SendErr attempts to send the provided error to the provided chan, however is the context is canceled, it will return false.
102func SendErr(ctx context.Context, errC chan<- error, err error) bool {
103	select {
104	case <-ctx.Done():
105		return false
106	case errC <- err:
107		return true
108	}
109}
110
111// SendRes attempts to send the provided resource to the provided chan, however is the context is canceled, it will return false.
112func SendRes(ctx context.Context, resC chan<- *rdpb.Resource, r *rdpb.Resource) bool {
113	select {
114	case <-ctx.Done():
115		return false
116	case resC <- r:
117		return true
118	}
119}
120