1*6fa2df46SAndroid Build Coastguard Worker // Copyright (C) 2023 The Android Open Source Project
2*6fa2df46SAndroid Build Coastguard Worker //
3*6fa2df46SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
4*6fa2df46SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
5*6fa2df46SAndroid Build Coastguard Worker // You may obtain a copy of the License at
6*6fa2df46SAndroid Build Coastguard Worker //
7*6fa2df46SAndroid Build Coastguard Worker // http://www.apache.org/licenses/LICENSE-2.0
8*6fa2df46SAndroid Build Coastguard Worker //
9*6fa2df46SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*6fa2df46SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
11*6fa2df46SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*6fa2df46SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
13*6fa2df46SAndroid Build Coastguard Worker // limitations under the License.
14*6fa2df46SAndroid Build Coastguard Worker
15*6fa2df46SAndroid Build Coastguard Worker #include <ditto/multiprocessing.h>
16*6fa2df46SAndroid Build Coastguard Worker
17*6fa2df46SAndroid Build Coastguard Worker #include <ditto/logger.h>
18*6fa2df46SAndroid Build Coastguard Worker
19*6fa2df46SAndroid Build Coastguard Worker #include <sys/mman.h>
20*6fa2df46SAndroid Build Coastguard Worker #include <sys/prctl.h>
21*6fa2df46SAndroid Build Coastguard Worker #include <sys/types.h>
22*6fa2df46SAndroid Build Coastguard Worker #include <unistd.h>
23*6fa2df46SAndroid Build Coastguard Worker
24*6fa2df46SAndroid Build Coastguard Worker namespace dittosuite {
25*6fa2df46SAndroid Build Coastguard Worker
Multiprocessing(const Instruction::Params & params,std::vector<std::unique_ptr<Instruction>> instructions,std::vector<MultithreadingParams> thread_params)26*6fa2df46SAndroid Build Coastguard Worker Multiprocessing::Multiprocessing(const Instruction::Params& params,
27*6fa2df46SAndroid Build Coastguard Worker std::vector<std::unique_ptr<Instruction>> instructions,
28*6fa2df46SAndroid Build Coastguard Worker std::vector<MultithreadingParams> thread_params)
29*6fa2df46SAndroid Build Coastguard Worker : Instruction(kName, params),
30*6fa2df46SAndroid Build Coastguard Worker instructions_(std::move(instructions)),
31*6fa2df46SAndroid Build Coastguard Worker thread_params_(std::move(thread_params)) {}
32*6fa2df46SAndroid Build Coastguard Worker
SetUpSingle()33*6fa2df46SAndroid Build Coastguard Worker void Multiprocessing::SetUpSingle() {
34*6fa2df46SAndroid Build Coastguard Worker pthread_barrierattr_t barrier_attr;
35*6fa2df46SAndroid Build Coastguard Worker pthread_barrierattr_init(&barrier_attr);
36*6fa2df46SAndroid Build Coastguard Worker pthread_barrierattr_setpshared(&barrier_attr, PTHREAD_PROCESS_SHARED);
37*6fa2df46SAndroid Build Coastguard Worker barrier_execution_ = static_cast<pthread_barrier_t*>(mmap(nullptr, sizeof(*barrier_execution_),
38*6fa2df46SAndroid Build Coastguard Worker PROT_READ | PROT_WRITE,
39*6fa2df46SAndroid Build Coastguard Worker MAP_SHARED | MAP_ANONYMOUS, -1, 0));
40*6fa2df46SAndroid Build Coastguard Worker if (barrier_execution_ == MAP_FAILED) {
41*6fa2df46SAndroid Build Coastguard Worker LOGF("mmap() failed");
42*6fa2df46SAndroid Build Coastguard Worker }
43*6fa2df46SAndroid Build Coastguard Worker pthread_barrier_init(barrier_execution_, &barrier_attr, instructions_.size());
44*6fa2df46SAndroid Build Coastguard Worker
45*6fa2df46SAndroid Build Coastguard Worker barrier_execution_end_ = static_cast<pthread_barrier_t*>(
46*6fa2df46SAndroid Build Coastguard Worker mmap(nullptr, sizeof(*barrier_execution_end_), PROT_READ | PROT_WRITE,
47*6fa2df46SAndroid Build Coastguard Worker MAP_SHARED | MAP_ANONYMOUS, -1, 0));
48*6fa2df46SAndroid Build Coastguard Worker if (barrier_execution_end_ == MAP_FAILED) {
49*6fa2df46SAndroid Build Coastguard Worker LOGF("mmap() failed");
50*6fa2df46SAndroid Build Coastguard Worker }
51*6fa2df46SAndroid Build Coastguard Worker pthread_barrier_init(barrier_execution_end_, &barrier_attr, instructions_.size());
52*6fa2df46SAndroid Build Coastguard Worker
53*6fa2df46SAndroid Build Coastguard Worker initialization_mutex_ = static_cast<pthread_mutex_t*>(
54*6fa2df46SAndroid Build Coastguard Worker mmap(nullptr, sizeof(*initialization_mutex_), PROT_READ | PROT_WRITE,
55*6fa2df46SAndroid Build Coastguard Worker MAP_SHARED | MAP_ANONYMOUS, -1, 0));
56*6fa2df46SAndroid Build Coastguard Worker if (initialization_mutex_ == MAP_FAILED) {
57*6fa2df46SAndroid Build Coastguard Worker LOGF("mmap() failed");
58*6fa2df46SAndroid Build Coastguard Worker }
59*6fa2df46SAndroid Build Coastguard Worker pthread_mutexattr_t mutex_attr;
60*6fa2df46SAndroid Build Coastguard Worker pthread_mutexattr_init(&mutex_attr);
61*6fa2df46SAndroid Build Coastguard Worker pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED);
62*6fa2df46SAndroid Build Coastguard Worker pthread_mutex_init(initialization_mutex_, &mutex_attr);
63*6fa2df46SAndroid Build Coastguard Worker
64*6fa2df46SAndroid Build Coastguard Worker pipe_fds_.resize(instructions_.size());
65*6fa2df46SAndroid Build Coastguard Worker for (unsigned int i = 0; i < instructions_.size(); i++) {
66*6fa2df46SAndroid Build Coastguard Worker if (pipe(pipe_fds_[i].data()) == -1) {
67*6fa2df46SAndroid Build Coastguard Worker LOGF("Pipe Failed");
68*6fa2df46SAndroid Build Coastguard Worker } else {
69*6fa2df46SAndroid Build Coastguard Worker LOGD("Created pipe for: " + std::to_string(i));
70*6fa2df46SAndroid Build Coastguard Worker }
71*6fa2df46SAndroid Build Coastguard Worker }
72*6fa2df46SAndroid Build Coastguard Worker
73*6fa2df46SAndroid Build Coastguard Worker instruction_id_ = 0;
74*6fa2df46SAndroid Build Coastguard Worker for (unsigned int i = 0; i < instructions_.size(); i++) {
75*6fa2df46SAndroid Build Coastguard Worker // LOGD("Not the last... Can fork");
76*6fa2df46SAndroid Build Coastguard Worker pid_t child_pid_ = fork();
77*6fa2df46SAndroid Build Coastguard Worker if (child_pid_) {
78*6fa2df46SAndroid Build Coastguard Worker // I'm the manager, will continue spawning processes
79*6fa2df46SAndroid Build Coastguard Worker is_manager_ = true;
80*6fa2df46SAndroid Build Coastguard Worker continue;
81*6fa2df46SAndroid Build Coastguard Worker }
82*6fa2df46SAndroid Build Coastguard Worker is_manager_ = false;
83*6fa2df46SAndroid Build Coastguard Worker instruction_id_ = i;
84*6fa2df46SAndroid Build Coastguard Worker LOGD("Child process created: " + std::to_string(instruction_id_) +
85*6fa2df46SAndroid Build Coastguard Worker " pid: " + std::to_string(getpid()));
86*6fa2df46SAndroid Build Coastguard Worker
87*6fa2df46SAndroid Build Coastguard Worker break;
88*6fa2df46SAndroid Build Coastguard Worker }
89*6fa2df46SAndroid Build Coastguard Worker
90*6fa2df46SAndroid Build Coastguard Worker // Close the write pipes that do not belong to this process
91*6fa2df46SAndroid Build Coastguard Worker for (unsigned int p = 0; p < instructions_.size(); p++) {
92*6fa2df46SAndroid Build Coastguard Worker if (is_manager_ || p != instruction_id_) {
93*6fa2df46SAndroid Build Coastguard Worker close(pipe_fds_[p][1]);
94*6fa2df46SAndroid Build Coastguard Worker }
95*6fa2df46SAndroid Build Coastguard Worker }
96*6fa2df46SAndroid Build Coastguard Worker
97*6fa2df46SAndroid Build Coastguard Worker if (!is_manager_) {
98*6fa2df46SAndroid Build Coastguard Worker pthread_mutex_lock(initialization_mutex_);
99*6fa2df46SAndroid Build Coastguard Worker LOGD("Trying to set the name for instruction: " + std::to_string(instruction_id_) +
100*6fa2df46SAndroid Build Coastguard Worker "; process: " + std::to_string(getpid()) +
101*6fa2df46SAndroid Build Coastguard Worker "; new name: " + thread_params_[instruction_id_].name_);
102*6fa2df46SAndroid Build Coastguard Worker setproctitle(argc_, argv_, (thread_params_[instruction_id_].name_.c_str()));
103*6fa2df46SAndroid Build Coastguard Worker
104*6fa2df46SAndroid Build Coastguard Worker if (thread_params_[instruction_id_].sched_attr_.IsSet()) {
105*6fa2df46SAndroid Build Coastguard Worker thread_params_[instruction_id_].sched_attr_.Set();
106*6fa2df46SAndroid Build Coastguard Worker }
107*6fa2df46SAndroid Build Coastguard Worker if (thread_params_[instruction_id_].sched_affinity_.IsSet()) {
108*6fa2df46SAndroid Build Coastguard Worker thread_params_[instruction_id_].sched_affinity_.Set();
109*6fa2df46SAndroid Build Coastguard Worker }
110*6fa2df46SAndroid Build Coastguard Worker
111*6fa2df46SAndroid Build Coastguard Worker LOGD("Process initializing instruction: " + std::to_string(instruction_id_) +
112*6fa2df46SAndroid Build Coastguard Worker " pid: " + std::to_string(getpid()));
113*6fa2df46SAndroid Build Coastguard Worker instructions_[instruction_id_]->SetUp();
114*6fa2df46SAndroid Build Coastguard Worker LOGD("Process initializing done, unlocking: " + std::to_string(instruction_id_) +
115*6fa2df46SAndroid Build Coastguard Worker " pid: " + std::to_string(getpid()));
116*6fa2df46SAndroid Build Coastguard Worker pthread_mutex_unlock(initialization_mutex_);
117*6fa2df46SAndroid Build Coastguard Worker }
118*6fa2df46SAndroid Build Coastguard Worker
119*6fa2df46SAndroid Build Coastguard Worker Instruction::SetUpSingle();
120*6fa2df46SAndroid Build Coastguard Worker }
121*6fa2df46SAndroid Build Coastguard Worker
RunSingle()122*6fa2df46SAndroid Build Coastguard Worker void Multiprocessing::RunSingle() {
123*6fa2df46SAndroid Build Coastguard Worker if (!is_manager_) {
124*6fa2df46SAndroid Build Coastguard Worker LOGD("Waiting for the barrier... " + std::to_string(getpid()));
125*6fa2df46SAndroid Build Coastguard Worker pthread_barrier_wait(barrier_execution_);
126*6fa2df46SAndroid Build Coastguard Worker LOGD("Barrier passed! Executing: " + std::to_string(getpid()));
127*6fa2df46SAndroid Build Coastguard Worker instructions_[instruction_id_]->Run();
128*6fa2df46SAndroid Build Coastguard Worker LOGD("Waiting for all the processes to finish... " + std::to_string(getpid()));
129*6fa2df46SAndroid Build Coastguard Worker pthread_barrier_wait(barrier_execution_end_);
130*6fa2df46SAndroid Build Coastguard Worker LOGD("All the processes finished... " + std::to_string(getpid()));
131*6fa2df46SAndroid Build Coastguard Worker }
132*6fa2df46SAndroid Build Coastguard Worker }
133*6fa2df46SAndroid Build Coastguard Worker
TearDownSingle(bool is_last)134*6fa2df46SAndroid Build Coastguard Worker void Multiprocessing::TearDownSingle(bool is_last) {
135*6fa2df46SAndroid Build Coastguard Worker Instruction::TearDownSingle(is_last);
136*6fa2df46SAndroid Build Coastguard Worker
137*6fa2df46SAndroid Build Coastguard Worker if (!is_manager_) {
138*6fa2df46SAndroid Build Coastguard Worker instructions_[instruction_id_]->TearDown();
139*6fa2df46SAndroid Build Coastguard Worker }
140*6fa2df46SAndroid Build Coastguard Worker }
141*6fa2df46SAndroid Build Coastguard Worker
CollectResults(const std::string & prefix)142*6fa2df46SAndroid Build Coastguard Worker std::unique_ptr<Result> Multiprocessing::CollectResults(const std::string& prefix) {
143*6fa2df46SAndroid Build Coastguard Worker // TODO this was copied from Multithreading and should be adapted with some
144*6fa2df46SAndroid Build Coastguard Worker // shared memory solution.
145*6fa2df46SAndroid Build Coastguard Worker auto result = std::make_unique<Result>(prefix + name_, repeat_);
146*6fa2df46SAndroid Build Coastguard Worker dittosuiteproto::Result result_pb;
147*6fa2df46SAndroid Build Coastguard Worker
148*6fa2df46SAndroid Build Coastguard Worker LOGD("Collecting results... " + std::to_string(getpid()));
149*6fa2df46SAndroid Build Coastguard Worker
150*6fa2df46SAndroid Build Coastguard Worker result->AddMeasurement("duration", TimespecToDoubleNanos(time_sampler_.GetSamples()));
151*6fa2df46SAndroid Build Coastguard Worker
152*6fa2df46SAndroid Build Coastguard Worker if (is_manager_) {
153*6fa2df46SAndroid Build Coastguard Worker LOGD("Parent reading... " + std::to_string(getpid()));
154*6fa2df46SAndroid Build Coastguard Worker for (unsigned int i = 0; i < instructions_.size(); i++) {
155*6fa2df46SAndroid Build Coastguard Worker LOGD("Parent reading from pipe: " + std::to_string(i));
156*6fa2df46SAndroid Build Coastguard Worker std::array<char, 100> buffer;
157*6fa2df46SAndroid Build Coastguard Worker dittosuiteproto::Result sub_result_pb;
158*6fa2df46SAndroid Build Coastguard Worker std::string serialized;
159*6fa2df46SAndroid Build Coastguard Worker
160*6fa2df46SAndroid Build Coastguard Worker while (true) {
161*6fa2df46SAndroid Build Coastguard Worker ssize_t chars_read = read(pipe_fds_[i][0], buffer.data(), buffer.size());
162*6fa2df46SAndroid Build Coastguard Worker if (chars_read == -1) {
163*6fa2df46SAndroid Build Coastguard Worker PLOGF("Error reading from pipe");
164*6fa2df46SAndroid Build Coastguard Worker } else if (chars_read == 0) {
165*6fa2df46SAndroid Build Coastguard Worker // Finished reading from pipe
166*6fa2df46SAndroid Build Coastguard Worker LOGD("Finished reading, time to decode the PB");
167*6fa2df46SAndroid Build Coastguard Worker if (!sub_result_pb.ParseFromString(serialized)) {
168*6fa2df46SAndroid Build Coastguard Worker LOGF("Failed decoding PB from pipe");
169*6fa2df46SAndroid Build Coastguard Worker }
170*6fa2df46SAndroid Build Coastguard Worker //PrintPb(sub_result_pb);
171*6fa2df46SAndroid Build Coastguard Worker result->AddSubResult(Result::FromPb(sub_result_pb));
172*6fa2df46SAndroid Build Coastguard Worker break;
173*6fa2df46SAndroid Build Coastguard Worker }
174*6fa2df46SAndroid Build Coastguard Worker LOGD("Parent received: " + std::to_string(chars_read) + " bytes: \"" +
175*6fa2df46SAndroid Build Coastguard Worker std::string(buffer.data()) + "\"");
176*6fa2df46SAndroid Build Coastguard Worker serialized.append(buffer.data(), chars_read);
177*6fa2df46SAndroid Build Coastguard Worker LOGD("PB so far: \"" + serialized + "\"");
178*6fa2df46SAndroid Build Coastguard Worker }
179*6fa2df46SAndroid Build Coastguard Worker }
180*6fa2df46SAndroid Build Coastguard Worker } else {
181*6fa2df46SAndroid Build Coastguard Worker LOGD("Child writing... " + std::to_string(getpid()));
182*6fa2df46SAndroid Build Coastguard Worker std::string child_name = thread_params_[instruction_id_].name_;
183*6fa2df46SAndroid Build Coastguard Worker result->AddSubResult(instructions_[instruction_id_]->CollectResults(child_name + "/"));
184*6fa2df46SAndroid Build Coastguard Worker
185*6fa2df46SAndroid Build Coastguard Worker result_pb = result->ToPb();
186*6fa2df46SAndroid Build Coastguard Worker
187*6fa2df46SAndroid Build Coastguard Worker std::string serialized;
188*6fa2df46SAndroid Build Coastguard Worker // It is safe to pick result_pb.sub_result()[0] because it will be the "multiprocessing"
189*6fa2df46SAndroid Build Coastguard Worker // instruction.
190*6fa2df46SAndroid Build Coastguard Worker if (!result_pb.sub_result()[0].SerializeToString(&serialized)) {
191*6fa2df46SAndroid Build Coastguard Worker LOGF("Failed decoding PB from pipe");
192*6fa2df46SAndroid Build Coastguard Worker }
193*6fa2df46SAndroid Build Coastguard Worker ssize_t chars_sent = 0;
194*6fa2df46SAndroid Build Coastguard Worker while (chars_sent < static_cast<ssize_t>(serialized.size())) {
195*6fa2df46SAndroid Build Coastguard Worker ssize_t chars_written = write(pipe_fds_[instruction_id_][1], &serialized.data()[chars_sent],
196*6fa2df46SAndroid Build Coastguard Worker serialized.size() - chars_sent);
197*6fa2df46SAndroid Build Coastguard Worker if (chars_written == -1) {
198*6fa2df46SAndroid Build Coastguard Worker PLOGF("Error writing to pipe");
199*6fa2df46SAndroid Build Coastguard Worker }
200*6fa2df46SAndroid Build Coastguard Worker chars_sent += chars_written;
201*6fa2df46SAndroid Build Coastguard Worker }
202*6fa2df46SAndroid Build Coastguard Worker LOGD("Child closing pipe: " + std::to_string(getpid()));
203*6fa2df46SAndroid Build Coastguard Worker close(pipe_fds_[instruction_id_][1]);
204*6fa2df46SAndroid Build Coastguard Worker }
205*6fa2df46SAndroid Build Coastguard Worker
206*6fa2df46SAndroid Build Coastguard Worker if (!is_manager_) {
207*6fa2df46SAndroid Build Coastguard Worker // Stop every child
208*6fa2df46SAndroid Build Coastguard Worker LOGD("Child stopping: " + std::to_string(getpid()));
209*6fa2df46SAndroid Build Coastguard Worker exit(0);
210*6fa2df46SAndroid Build Coastguard Worker } else {
211*6fa2df46SAndroid Build Coastguard Worker pthread_mutex_destroy(initialization_mutex_);
212*6fa2df46SAndroid Build Coastguard Worker pthread_barrier_destroy(barrier_execution_);
213*6fa2df46SAndroid Build Coastguard Worker pthread_barrier_destroy(barrier_execution_end_);
214*6fa2df46SAndroid Build Coastguard Worker LOGD("Parent finished: " + std::to_string(getpid()));
215*6fa2df46SAndroid Build Coastguard Worker }
216*6fa2df46SAndroid Build Coastguard Worker return result;
217*6fa2df46SAndroid Build Coastguard Worker }
218*6fa2df46SAndroid Build Coastguard Worker
219*6fa2df46SAndroid Build Coastguard Worker } // namespace dittosuite
220