1*288bf522SAndroid Build Coastguard Worker /*
2*288bf522SAndroid Build Coastguard Worker * Copyright (C) 2015 The Android Open Source Project
3*288bf522SAndroid Build Coastguard Worker *
4*288bf522SAndroid Build Coastguard Worker * Licensed under the Apache License, Version 2.0 (the "License");
5*288bf522SAndroid Build Coastguard Worker * you may not use this file except in compliance with the License.
6*288bf522SAndroid Build Coastguard Worker * You may obtain a copy of the License at
7*288bf522SAndroid Build Coastguard Worker *
8*288bf522SAndroid Build Coastguard Worker * http://www.apache.org/licenses/LICENSE-2.0
9*288bf522SAndroid Build Coastguard Worker *
10*288bf522SAndroid Build Coastguard Worker * Unless required by applicable law or agreed to in writing, software
11*288bf522SAndroid Build Coastguard Worker * distributed under the License is distributed on an "AS IS" BASIS,
12*288bf522SAndroid Build Coastguard Worker * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*288bf522SAndroid Build Coastguard Worker * See the License for the specific language governing permissions and
14*288bf522SAndroid Build Coastguard Worker * limitations under the License.
15*288bf522SAndroid Build Coastguard Worker */
16*288bf522SAndroid Build Coastguard Worker
17*288bf522SAndroid Build Coastguard Worker #include <future>
18*288bf522SAndroid Build Coastguard Worker #include "fec_private.h"
19*288bf522SAndroid Build Coastguard Worker
20*288bf522SAndroid Build Coastguard Worker struct process_info {
21*288bf522SAndroid Build Coastguard Worker int id;
22*288bf522SAndroid Build Coastguard Worker fec_handle* f;
23*288bf522SAndroid Build Coastguard Worker uint8_t* buf;
24*288bf522SAndroid Build Coastguard Worker size_t count;
25*288bf522SAndroid Build Coastguard Worker uint64_t offset;
26*288bf522SAndroid Build Coastguard Worker read_func func;
27*288bf522SAndroid Build Coastguard Worker ssize_t rc;
28*288bf522SAndroid Build Coastguard Worker size_t errors;
29*288bf522SAndroid Build Coastguard Worker };
30*288bf522SAndroid Build Coastguard Worker
31*288bf522SAndroid Build Coastguard Worker /* thread function */
__process(process_info * p)32*288bf522SAndroid Build Coastguard Worker static process_info* __process(process_info* p) {
33*288bf522SAndroid Build Coastguard Worker debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset, p->offset + p->count);
34*288bf522SAndroid Build Coastguard Worker
35*288bf522SAndroid Build Coastguard Worker p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors);
36*288bf522SAndroid Build Coastguard Worker return p;
37*288bf522SAndroid Build Coastguard Worker }
38*288bf522SAndroid Build Coastguard Worker
39*288bf522SAndroid Build Coastguard Worker /* launches a maximum number of threads to process a read */
process(fec_handle * f,uint8_t * buf,size_t count,uint64_t offset,read_func func)40*288bf522SAndroid Build Coastguard Worker ssize_t process(fec_handle* f, uint8_t* buf, size_t count, uint64_t offset, read_func func) {
41*288bf522SAndroid Build Coastguard Worker check(f);
42*288bf522SAndroid Build Coastguard Worker check(buf);
43*288bf522SAndroid Build Coastguard Worker check(func);
44*288bf522SAndroid Build Coastguard Worker
45*288bf522SAndroid Build Coastguard Worker if (count == 0) {
46*288bf522SAndroid Build Coastguard Worker return 0;
47*288bf522SAndroid Build Coastguard Worker }
48*288bf522SAndroid Build Coastguard Worker
49*288bf522SAndroid Build Coastguard Worker int threads = sysconf(_SC_NPROCESSORS_ONLN);
50*288bf522SAndroid Build Coastguard Worker
51*288bf522SAndroid Build Coastguard Worker if (threads < WORK_MIN_THREADS) {
52*288bf522SAndroid Build Coastguard Worker threads = WORK_MIN_THREADS;
53*288bf522SAndroid Build Coastguard Worker } else if (threads > WORK_MAX_THREADS) {
54*288bf522SAndroid Build Coastguard Worker threads = WORK_MAX_THREADS;
55*288bf522SAndroid Build Coastguard Worker }
56*288bf522SAndroid Build Coastguard Worker
57*288bf522SAndroid Build Coastguard Worker uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE;
58*288bf522SAndroid Build Coastguard Worker size_t blocks = fec_div_round_up(offset + count - start, FEC_BLOCKSIZE);
59*288bf522SAndroid Build Coastguard Worker
60*288bf522SAndroid Build Coastguard Worker /* start at most one thread per block we're accessing */
61*288bf522SAndroid Build Coastguard Worker if ((size_t)threads > blocks) {
62*288bf522SAndroid Build Coastguard Worker threads = (int)blocks;
63*288bf522SAndroid Build Coastguard Worker }
64*288bf522SAndroid Build Coastguard Worker
65*288bf522SAndroid Build Coastguard Worker size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE;
66*288bf522SAndroid Build Coastguard Worker size_t left = count;
67*288bf522SAndroid Build Coastguard Worker uint64_t pos = offset;
68*288bf522SAndroid Build Coastguard Worker uint64_t end = start + count_per_thread;
69*288bf522SAndroid Build Coastguard Worker
70*288bf522SAndroid Build Coastguard Worker debug("max %d threads, %zu bytes per thread (total %zu spanning %zu blocks)", threads,
71*288bf522SAndroid Build Coastguard Worker count_per_thread, count, blocks);
72*288bf522SAndroid Build Coastguard Worker
73*288bf522SAndroid Build Coastguard Worker std::vector<std::future<process_info*>> handles;
74*288bf522SAndroid Build Coastguard Worker process_info info[threads];
75*288bf522SAndroid Build Coastguard Worker ssize_t rc = 0;
76*288bf522SAndroid Build Coastguard Worker
77*288bf522SAndroid Build Coastguard Worker /* start threads to process queue */
78*288bf522SAndroid Build Coastguard Worker for (int i = 0; i < threads && left > 0; ++i) {
79*288bf522SAndroid Build Coastguard Worker info[i].id = i;
80*288bf522SAndroid Build Coastguard Worker info[i].f = f;
81*288bf522SAndroid Build Coastguard Worker info[i].buf = &buf[pos - offset];
82*288bf522SAndroid Build Coastguard Worker info[i].count = (size_t)(end - pos);
83*288bf522SAndroid Build Coastguard Worker info[i].offset = pos;
84*288bf522SAndroid Build Coastguard Worker info[i].func = func;
85*288bf522SAndroid Build Coastguard Worker info[i].rc = -1;
86*288bf522SAndroid Build Coastguard Worker info[i].errors = 0;
87*288bf522SAndroid Build Coastguard Worker
88*288bf522SAndroid Build Coastguard Worker if (info[i].count > left) {
89*288bf522SAndroid Build Coastguard Worker info[i].count = left;
90*288bf522SAndroid Build Coastguard Worker }
91*288bf522SAndroid Build Coastguard Worker
92*288bf522SAndroid Build Coastguard Worker handles.push_back(std::async(std::launch::async, __process, &info[i]));
93*288bf522SAndroid Build Coastguard Worker
94*288bf522SAndroid Build Coastguard Worker pos = end;
95*288bf522SAndroid Build Coastguard Worker end += count_per_thread;
96*288bf522SAndroid Build Coastguard Worker left -= info[i].count;
97*288bf522SAndroid Build Coastguard Worker }
98*288bf522SAndroid Build Coastguard Worker
99*288bf522SAndroid Build Coastguard Worker ssize_t nread = 0;
100*288bf522SAndroid Build Coastguard Worker
101*288bf522SAndroid Build Coastguard Worker /* wait for all threads to complete */
102*288bf522SAndroid Build Coastguard Worker for (auto&& future : handles) {
103*288bf522SAndroid Build Coastguard Worker process_info* p = future.get();
104*288bf522SAndroid Build Coastguard Worker if (!p || p->rc == -1) {
105*288bf522SAndroid Build Coastguard Worker rc = -1;
106*288bf522SAndroid Build Coastguard Worker } else {
107*288bf522SAndroid Build Coastguard Worker nread += p->rc;
108*288bf522SAndroid Build Coastguard Worker f->errors += p->errors;
109*288bf522SAndroid Build Coastguard Worker }
110*288bf522SAndroid Build Coastguard Worker }
111*288bf522SAndroid Build Coastguard Worker
112*288bf522SAndroid Build Coastguard Worker if (left > 0 || rc == -1) {
113*288bf522SAndroid Build Coastguard Worker errno = EIO;
114*288bf522SAndroid Build Coastguard Worker return -1;
115*288bf522SAndroid Build Coastguard Worker }
116*288bf522SAndroid Build Coastguard Worker
117*288bf522SAndroid Build Coastguard Worker return nread;
118*288bf522SAndroid Build Coastguard Worker }
119