xref: /aosp_15_r20/test/dittosuite/src/multiprocessing.cpp (revision 6fa2df46f119dce7527f5beb2814eca0e6f886ac)
1*6fa2df46SAndroid Build Coastguard Worker // Copyright (C) 2023 The Android Open Source Project
2*6fa2df46SAndroid Build Coastguard Worker //
3*6fa2df46SAndroid Build Coastguard Worker // Licensed under the Apache License, Version 2.0 (the "License");
4*6fa2df46SAndroid Build Coastguard Worker // you may not use this file except in compliance with the License.
5*6fa2df46SAndroid Build Coastguard Worker // You may obtain a copy of the License at
6*6fa2df46SAndroid Build Coastguard Worker //
7*6fa2df46SAndroid Build Coastguard Worker //      http://www.apache.org/licenses/LICENSE-2.0
8*6fa2df46SAndroid Build Coastguard Worker //
9*6fa2df46SAndroid Build Coastguard Worker // Unless required by applicable law or agreed to in writing, software
10*6fa2df46SAndroid Build Coastguard Worker // distributed under the License is distributed on an "AS IS" BASIS,
11*6fa2df46SAndroid Build Coastguard Worker // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12*6fa2df46SAndroid Build Coastguard Worker // See the License for the specific language governing permissions and
13*6fa2df46SAndroid Build Coastguard Worker // limitations under the License.
14*6fa2df46SAndroid Build Coastguard Worker 
15*6fa2df46SAndroid Build Coastguard Worker #include <ditto/multiprocessing.h>
16*6fa2df46SAndroid Build Coastguard Worker 
17*6fa2df46SAndroid Build Coastguard Worker #include <ditto/logger.h>
18*6fa2df46SAndroid Build Coastguard Worker 
19*6fa2df46SAndroid Build Coastguard Worker #include <sys/mman.h>
20*6fa2df46SAndroid Build Coastguard Worker #include <sys/prctl.h>
21*6fa2df46SAndroid Build Coastguard Worker #include <sys/types.h>
22*6fa2df46SAndroid Build Coastguard Worker #include <unistd.h>
23*6fa2df46SAndroid Build Coastguard Worker 
24*6fa2df46SAndroid Build Coastguard Worker namespace dittosuite {
25*6fa2df46SAndroid Build Coastguard Worker 
Multiprocessing(const Instruction::Params & params,std::vector<std::unique_ptr<Instruction>> instructions,std::vector<MultithreadingParams> thread_params)26*6fa2df46SAndroid Build Coastguard Worker Multiprocessing::Multiprocessing(const Instruction::Params& params,
27*6fa2df46SAndroid Build Coastguard Worker                                  std::vector<std::unique_ptr<Instruction>> instructions,
28*6fa2df46SAndroid Build Coastguard Worker                                  std::vector<MultithreadingParams> thread_params)
29*6fa2df46SAndroid Build Coastguard Worker     : Instruction(kName, params),
30*6fa2df46SAndroid Build Coastguard Worker       instructions_(std::move(instructions)),
31*6fa2df46SAndroid Build Coastguard Worker       thread_params_(std::move(thread_params)) {}
32*6fa2df46SAndroid Build Coastguard Worker 
SetUpSingle()33*6fa2df46SAndroid Build Coastguard Worker void Multiprocessing::SetUpSingle() {
34*6fa2df46SAndroid Build Coastguard Worker   pthread_barrierattr_t barrier_attr;
35*6fa2df46SAndroid Build Coastguard Worker   pthread_barrierattr_init(&barrier_attr);
36*6fa2df46SAndroid Build Coastguard Worker   pthread_barrierattr_setpshared(&barrier_attr, PTHREAD_PROCESS_SHARED);
37*6fa2df46SAndroid Build Coastguard Worker   barrier_execution_ = static_cast<pthread_barrier_t*>(mmap(nullptr, sizeof(*barrier_execution_),
38*6fa2df46SAndroid Build Coastguard Worker                                                             PROT_READ | PROT_WRITE,
39*6fa2df46SAndroid Build Coastguard Worker                                                             MAP_SHARED | MAP_ANONYMOUS, -1, 0));
40*6fa2df46SAndroid Build Coastguard Worker   if (barrier_execution_ == MAP_FAILED) {
41*6fa2df46SAndroid Build Coastguard Worker     LOGF("mmap() failed");
42*6fa2df46SAndroid Build Coastguard Worker   }
43*6fa2df46SAndroid Build Coastguard Worker   pthread_barrier_init(barrier_execution_, &barrier_attr, instructions_.size());
44*6fa2df46SAndroid Build Coastguard Worker 
45*6fa2df46SAndroid Build Coastguard Worker   barrier_execution_end_ = static_cast<pthread_barrier_t*>(
46*6fa2df46SAndroid Build Coastguard Worker       mmap(nullptr, sizeof(*barrier_execution_end_), PROT_READ | PROT_WRITE,
47*6fa2df46SAndroid Build Coastguard Worker            MAP_SHARED | MAP_ANONYMOUS, -1, 0));
48*6fa2df46SAndroid Build Coastguard Worker   if (barrier_execution_end_ == MAP_FAILED) {
49*6fa2df46SAndroid Build Coastguard Worker     LOGF("mmap() failed");
50*6fa2df46SAndroid Build Coastguard Worker   }
51*6fa2df46SAndroid Build Coastguard Worker   pthread_barrier_init(barrier_execution_end_, &barrier_attr, instructions_.size());
52*6fa2df46SAndroid Build Coastguard Worker 
53*6fa2df46SAndroid Build Coastguard Worker   initialization_mutex_ = static_cast<pthread_mutex_t*>(
54*6fa2df46SAndroid Build Coastguard Worker       mmap(nullptr, sizeof(*initialization_mutex_), PROT_READ | PROT_WRITE,
55*6fa2df46SAndroid Build Coastguard Worker            MAP_SHARED | MAP_ANONYMOUS, -1, 0));
56*6fa2df46SAndroid Build Coastguard Worker   if (initialization_mutex_ == MAP_FAILED) {
57*6fa2df46SAndroid Build Coastguard Worker     LOGF("mmap() failed");
58*6fa2df46SAndroid Build Coastguard Worker   }
59*6fa2df46SAndroid Build Coastguard Worker   pthread_mutexattr_t mutex_attr;
60*6fa2df46SAndroid Build Coastguard Worker   pthread_mutexattr_init(&mutex_attr);
61*6fa2df46SAndroid Build Coastguard Worker   pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED);
62*6fa2df46SAndroid Build Coastguard Worker   pthread_mutex_init(initialization_mutex_, &mutex_attr);
63*6fa2df46SAndroid Build Coastguard Worker 
64*6fa2df46SAndroid Build Coastguard Worker   pipe_fds_.resize(instructions_.size());
65*6fa2df46SAndroid Build Coastguard Worker   for (unsigned int i = 0; i < instructions_.size(); i++) {
66*6fa2df46SAndroid Build Coastguard Worker     if (pipe(pipe_fds_[i].data()) == -1) {
67*6fa2df46SAndroid Build Coastguard Worker       LOGF("Pipe Failed");
68*6fa2df46SAndroid Build Coastguard Worker     } else {
69*6fa2df46SAndroid Build Coastguard Worker       LOGD("Created pipe for: " + std::to_string(i));
70*6fa2df46SAndroid Build Coastguard Worker     }
71*6fa2df46SAndroid Build Coastguard Worker   }
72*6fa2df46SAndroid Build Coastguard Worker 
73*6fa2df46SAndroid Build Coastguard Worker   instruction_id_ = 0;
74*6fa2df46SAndroid Build Coastguard Worker   for (unsigned int i = 0; i < instructions_.size(); i++) {
75*6fa2df46SAndroid Build Coastguard Worker     // LOGD("Not the last... Can fork");
76*6fa2df46SAndroid Build Coastguard Worker     pid_t child_pid_ = fork();
77*6fa2df46SAndroid Build Coastguard Worker     if (child_pid_) {
78*6fa2df46SAndroid Build Coastguard Worker       // I'm the manager, will continue spawning processes
79*6fa2df46SAndroid Build Coastguard Worker       is_manager_ = true;
80*6fa2df46SAndroid Build Coastguard Worker       continue;
81*6fa2df46SAndroid Build Coastguard Worker     }
82*6fa2df46SAndroid Build Coastguard Worker     is_manager_ = false;
83*6fa2df46SAndroid Build Coastguard Worker     instruction_id_ = i;
84*6fa2df46SAndroid Build Coastguard Worker     LOGD("Child process created: " + std::to_string(instruction_id_) +
85*6fa2df46SAndroid Build Coastguard Worker          " pid: " + std::to_string(getpid()));
86*6fa2df46SAndroid Build Coastguard Worker 
87*6fa2df46SAndroid Build Coastguard Worker     break;
88*6fa2df46SAndroid Build Coastguard Worker   }
89*6fa2df46SAndroid Build Coastguard Worker 
90*6fa2df46SAndroid Build Coastguard Worker   // Close the write pipes that do not belong to this process
91*6fa2df46SAndroid Build Coastguard Worker   for (unsigned int p = 0; p < instructions_.size(); p++) {
92*6fa2df46SAndroid Build Coastguard Worker     if (is_manager_ || p != instruction_id_) {
93*6fa2df46SAndroid Build Coastguard Worker       close(pipe_fds_[p][1]);
94*6fa2df46SAndroid Build Coastguard Worker     }
95*6fa2df46SAndroid Build Coastguard Worker   }
96*6fa2df46SAndroid Build Coastguard Worker 
97*6fa2df46SAndroid Build Coastguard Worker   if (!is_manager_) {
98*6fa2df46SAndroid Build Coastguard Worker     pthread_mutex_lock(initialization_mutex_);
99*6fa2df46SAndroid Build Coastguard Worker     LOGD("Trying to set the name for instruction: " + std::to_string(instruction_id_) +
100*6fa2df46SAndroid Build Coastguard Worker          "; process: " + std::to_string(getpid()) +
101*6fa2df46SAndroid Build Coastguard Worker          "; new name: " + thread_params_[instruction_id_].name_);
102*6fa2df46SAndroid Build Coastguard Worker     setproctitle(argc_, argv_, (thread_params_[instruction_id_].name_.c_str()));
103*6fa2df46SAndroid Build Coastguard Worker 
104*6fa2df46SAndroid Build Coastguard Worker     if (thread_params_[instruction_id_].sched_attr_.IsSet()) {
105*6fa2df46SAndroid Build Coastguard Worker       thread_params_[instruction_id_].sched_attr_.Set();
106*6fa2df46SAndroid Build Coastguard Worker     }
107*6fa2df46SAndroid Build Coastguard Worker     if (thread_params_[instruction_id_].sched_affinity_.IsSet()) {
108*6fa2df46SAndroid Build Coastguard Worker       thread_params_[instruction_id_].sched_affinity_.Set();
109*6fa2df46SAndroid Build Coastguard Worker     }
110*6fa2df46SAndroid Build Coastguard Worker 
111*6fa2df46SAndroid Build Coastguard Worker     LOGD("Process initializing instruction: " + std::to_string(instruction_id_) +
112*6fa2df46SAndroid Build Coastguard Worker          " pid: " + std::to_string(getpid()));
113*6fa2df46SAndroid Build Coastguard Worker     instructions_[instruction_id_]->SetUp();
114*6fa2df46SAndroid Build Coastguard Worker     LOGD("Process initializing done, unlocking: " + std::to_string(instruction_id_) +
115*6fa2df46SAndroid Build Coastguard Worker          " pid: " + std::to_string(getpid()));
116*6fa2df46SAndroid Build Coastguard Worker     pthread_mutex_unlock(initialization_mutex_);
117*6fa2df46SAndroid Build Coastguard Worker   }
118*6fa2df46SAndroid Build Coastguard Worker 
119*6fa2df46SAndroid Build Coastguard Worker   Instruction::SetUpSingle();
120*6fa2df46SAndroid Build Coastguard Worker }
121*6fa2df46SAndroid Build Coastguard Worker 
RunSingle()122*6fa2df46SAndroid Build Coastguard Worker void Multiprocessing::RunSingle() {
123*6fa2df46SAndroid Build Coastguard Worker   if (!is_manager_) {
124*6fa2df46SAndroid Build Coastguard Worker     LOGD("Waiting for the barrier... " + std::to_string(getpid()));
125*6fa2df46SAndroid Build Coastguard Worker     pthread_barrier_wait(barrier_execution_);
126*6fa2df46SAndroid Build Coastguard Worker     LOGD("Barrier passed! Executing: " + std::to_string(getpid()));
127*6fa2df46SAndroid Build Coastguard Worker     instructions_[instruction_id_]->Run();
128*6fa2df46SAndroid Build Coastguard Worker     LOGD("Waiting for all the processes to finish... " + std::to_string(getpid()));
129*6fa2df46SAndroid Build Coastguard Worker     pthread_barrier_wait(barrier_execution_end_);
130*6fa2df46SAndroid Build Coastguard Worker     LOGD("All the processes finished... " + std::to_string(getpid()));
131*6fa2df46SAndroid Build Coastguard Worker   }
132*6fa2df46SAndroid Build Coastguard Worker }
133*6fa2df46SAndroid Build Coastguard Worker 
TearDownSingle(bool is_last)134*6fa2df46SAndroid Build Coastguard Worker void Multiprocessing::TearDownSingle(bool is_last) {
135*6fa2df46SAndroid Build Coastguard Worker   Instruction::TearDownSingle(is_last);
136*6fa2df46SAndroid Build Coastguard Worker 
137*6fa2df46SAndroid Build Coastguard Worker   if (!is_manager_) {
138*6fa2df46SAndroid Build Coastguard Worker     instructions_[instruction_id_]->TearDown();
139*6fa2df46SAndroid Build Coastguard Worker   }
140*6fa2df46SAndroid Build Coastguard Worker }
141*6fa2df46SAndroid Build Coastguard Worker 
CollectResults(const std::string & prefix)142*6fa2df46SAndroid Build Coastguard Worker std::unique_ptr<Result> Multiprocessing::CollectResults(const std::string& prefix) {
143*6fa2df46SAndroid Build Coastguard Worker   // TODO this was copied from Multithreading and should be adapted with some
144*6fa2df46SAndroid Build Coastguard Worker   // shared memory solution.
145*6fa2df46SAndroid Build Coastguard Worker   auto result = std::make_unique<Result>(prefix + name_, repeat_);
146*6fa2df46SAndroid Build Coastguard Worker   dittosuiteproto::Result result_pb;
147*6fa2df46SAndroid Build Coastguard Worker 
148*6fa2df46SAndroid Build Coastguard Worker   LOGD("Collecting results... " + std::to_string(getpid()));
149*6fa2df46SAndroid Build Coastguard Worker 
150*6fa2df46SAndroid Build Coastguard Worker   result->AddMeasurement("duration", TimespecToDoubleNanos(time_sampler_.GetSamples()));
151*6fa2df46SAndroid Build Coastguard Worker 
152*6fa2df46SAndroid Build Coastguard Worker   if (is_manager_) {
153*6fa2df46SAndroid Build Coastguard Worker     LOGD("Parent reading... " + std::to_string(getpid()));
154*6fa2df46SAndroid Build Coastguard Worker     for (unsigned int i = 0; i < instructions_.size(); i++) {
155*6fa2df46SAndroid Build Coastguard Worker       LOGD("Parent reading from pipe: " + std::to_string(i));
156*6fa2df46SAndroid Build Coastguard Worker       std::array<char, 100> buffer;
157*6fa2df46SAndroid Build Coastguard Worker       dittosuiteproto::Result sub_result_pb;
158*6fa2df46SAndroid Build Coastguard Worker       std::string serialized;
159*6fa2df46SAndroid Build Coastguard Worker 
160*6fa2df46SAndroid Build Coastguard Worker       while (true) {
161*6fa2df46SAndroid Build Coastguard Worker         ssize_t chars_read = read(pipe_fds_[i][0], buffer.data(), buffer.size());
162*6fa2df46SAndroid Build Coastguard Worker         if (chars_read == -1) {
163*6fa2df46SAndroid Build Coastguard Worker           PLOGF("Error reading from pipe");
164*6fa2df46SAndroid Build Coastguard Worker         } else if (chars_read == 0) {
165*6fa2df46SAndroid Build Coastguard Worker           // Finished reading from pipe
166*6fa2df46SAndroid Build Coastguard Worker           LOGD("Finished reading, time to decode the PB");
167*6fa2df46SAndroid Build Coastguard Worker           if (!sub_result_pb.ParseFromString(serialized)) {
168*6fa2df46SAndroid Build Coastguard Worker             LOGF("Failed decoding PB from pipe");
169*6fa2df46SAndroid Build Coastguard Worker           }
170*6fa2df46SAndroid Build Coastguard Worker           //PrintPb(sub_result_pb);
171*6fa2df46SAndroid Build Coastguard Worker           result->AddSubResult(Result::FromPb(sub_result_pb));
172*6fa2df46SAndroid Build Coastguard Worker           break;
173*6fa2df46SAndroid Build Coastguard Worker         }
174*6fa2df46SAndroid Build Coastguard Worker         LOGD("Parent received: " + std::to_string(chars_read) + " bytes: \"" +
175*6fa2df46SAndroid Build Coastguard Worker              std::string(buffer.data()) + "\"");
176*6fa2df46SAndroid Build Coastguard Worker         serialized.append(buffer.data(), chars_read);
177*6fa2df46SAndroid Build Coastguard Worker         LOGD("PB so far: \"" + serialized + "\"");
178*6fa2df46SAndroid Build Coastguard Worker       }
179*6fa2df46SAndroid Build Coastguard Worker     }
180*6fa2df46SAndroid Build Coastguard Worker   } else {
181*6fa2df46SAndroid Build Coastguard Worker     LOGD("Child writing... " + std::to_string(getpid()));
182*6fa2df46SAndroid Build Coastguard Worker     std::string child_name = thread_params_[instruction_id_].name_;
183*6fa2df46SAndroid Build Coastguard Worker     result->AddSubResult(instructions_[instruction_id_]->CollectResults(child_name + "/"));
184*6fa2df46SAndroid Build Coastguard Worker 
185*6fa2df46SAndroid Build Coastguard Worker     result_pb = result->ToPb();
186*6fa2df46SAndroid Build Coastguard Worker 
187*6fa2df46SAndroid Build Coastguard Worker     std::string serialized;
188*6fa2df46SAndroid Build Coastguard Worker     // It is safe to pick result_pb.sub_result()[0] because it will be the "multiprocessing"
189*6fa2df46SAndroid Build Coastguard Worker     // instruction.
190*6fa2df46SAndroid Build Coastguard Worker     if (!result_pb.sub_result()[0].SerializeToString(&serialized)) {
191*6fa2df46SAndroid Build Coastguard Worker       LOGF("Failed decoding PB from pipe");
192*6fa2df46SAndroid Build Coastguard Worker     }
193*6fa2df46SAndroid Build Coastguard Worker     ssize_t chars_sent = 0;
194*6fa2df46SAndroid Build Coastguard Worker     while (chars_sent < static_cast<ssize_t>(serialized.size())) {
195*6fa2df46SAndroid Build Coastguard Worker       ssize_t chars_written = write(pipe_fds_[instruction_id_][1], &serialized.data()[chars_sent],
196*6fa2df46SAndroid Build Coastguard Worker                                     serialized.size() - chars_sent);
197*6fa2df46SAndroid Build Coastguard Worker       if (chars_written == -1) {
198*6fa2df46SAndroid Build Coastguard Worker         PLOGF("Error writing to pipe");
199*6fa2df46SAndroid Build Coastguard Worker       }
200*6fa2df46SAndroid Build Coastguard Worker       chars_sent += chars_written;
201*6fa2df46SAndroid Build Coastguard Worker     }
202*6fa2df46SAndroid Build Coastguard Worker     LOGD("Child closing pipe: " + std::to_string(getpid()));
203*6fa2df46SAndroid Build Coastguard Worker     close(pipe_fds_[instruction_id_][1]);
204*6fa2df46SAndroid Build Coastguard Worker   }
205*6fa2df46SAndroid Build Coastguard Worker 
206*6fa2df46SAndroid Build Coastguard Worker   if (!is_manager_) {
207*6fa2df46SAndroid Build Coastguard Worker     // Stop every child
208*6fa2df46SAndroid Build Coastguard Worker     LOGD("Child stopping: " + std::to_string(getpid()));
209*6fa2df46SAndroid Build Coastguard Worker     exit(0);
210*6fa2df46SAndroid Build Coastguard Worker   } else {
211*6fa2df46SAndroid Build Coastguard Worker     pthread_mutex_destroy(initialization_mutex_);
212*6fa2df46SAndroid Build Coastguard Worker     pthread_barrier_destroy(barrier_execution_);
213*6fa2df46SAndroid Build Coastguard Worker     pthread_barrier_destroy(barrier_execution_end_);
214*6fa2df46SAndroid Build Coastguard Worker     LOGD("Parent finished: " + std::to_string(getpid()));
215*6fa2df46SAndroid Build Coastguard Worker   }
216*6fa2df46SAndroid Build Coastguard Worker   return result;
217*6fa2df46SAndroid Build Coastguard Worker }
218*6fa2df46SAndroid Build Coastguard Worker 
219*6fa2df46SAndroid Build Coastguard Worker }  // namespace dittosuite
220