1 /*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "AidlBufferPoolCli"
18 //#define LOG_NDEBUG 0
19
20 #include <thread>
21 #include <aidlcommonsupport/NativeHandle.h>
22 #include <utils/Log.h>
23 #include "BufferPoolClient.h"
24 #include "Accessor.h"
25 #include "Connection.h"
26
27 namespace aidl::android::hardware::media::bufferpool2::implementation {
28
29 using aidl::android::hardware::media::bufferpool2::IConnection;
30 using aidl::android::hardware::media::bufferpool2::ResultStatus;
31 using FetchInfo = aidl::android::hardware::media::bufferpool2::IConnection::FetchInfo;
32 using FetchResult = aidl::android::hardware::media::bufferpool2::IConnection::FetchResult;
33
34 static constexpr int64_t kReceiveTimeoutMs = 2000; // 2s
35 static constexpr int kPostMaxRetry = 3;
36 static constexpr int kCacheTtlMs = 1000;
37 static constexpr size_t kMaxCachedBufferCount = 64;
38 static constexpr size_t kCachedBufferCountTarget = kMaxCachedBufferCount - 16;
39
40 class BufferPoolClient::Impl
41 : public std::enable_shared_from_this<BufferPoolClient::Impl> {
42 public:
43 explicit Impl(const std::shared_ptr<Accessor> &accessor,
44 const std::shared_ptr<IObserver> &observer);
45
46 explicit Impl(const std::shared_ptr<IAccessor> &accessor,
47 const std::shared_ptr<IObserver> &observer);
48
isValid()49 bool isValid() {
50 return mValid;
51 }
52
isLocal()53 bool isLocal() {
54 return mValid && mLocal;
55 }
56
getConnectionId()57 ConnectionId getConnectionId() {
58 return mConnectionId;
59 }
60
getAccessor()61 std::shared_ptr<IAccessor> &getAccessor() {
62 return mAccessor;
63 }
64
65 bool isActive(int64_t *lastTransactionMs, bool clearCache);
66
67 void receiveInvalidation(uint32_t msgID);
68
69 BufferPoolStatus flush();
70
71 BufferPoolStatus allocate(const std::vector<uint8_t> ¶ms,
72 native_handle_t **handle,
73 std::shared_ptr<BufferPoolData> *buffer);
74
75 BufferPoolStatus receive(
76 TransactionId transactionId, BufferId bufferId,
77 int64_t timestampMs,
78 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer);
79
80 void postBufferRelease(BufferId bufferId);
81
82 bool postSend(
83 BufferId bufferId, ConnectionId receiver,
84 TransactionId *transactionId, int64_t *timestampMs);
85 private:
86
87 bool postReceive(
88 BufferId bufferId, TransactionId transactionId,
89 int64_t timestampMs);
90
91 bool postReceiveResult(
92 BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync);
93
94 void trySyncFromRemote();
95
96 bool syncReleased(uint32_t msgId = 0);
97
98 void evictCaches(bool clearCache = false);
99
100 void invalidateBuffer(BufferId id);
101
102 void invalidateRange(BufferId from, BufferId to);
103
104 BufferPoolStatus allocateBufferHandle(
105 const std::vector<uint8_t>& params, BufferId *bufferId,
106 native_handle_t **handle);
107
108 BufferPoolStatus fetchBufferHandle(
109 TransactionId transactionId, BufferId bufferId,
110 native_handle_t **handle);
111
112 struct BlockPoolDataDtor;
113 struct ClientBuffer;
114
115 bool mLocal;
116 bool mValid;
117 std::shared_ptr<IAccessor> mAccessor;
118 std::shared_ptr<Connection> mLocalConnection;
119 std::shared_ptr<IConnection> mRemoteConnection;
120 uint32_t mSeqId;
121 ConnectionId mConnectionId;
122 int64_t mLastEvictCacheMs;
123 std::unique_ptr<BufferInvalidationListener> mInvalidationListener;
124
125 // CachedBuffers
126 struct BufferCache {
127 std::mutex mLock;
128 bool mCreating;
129 std::condition_variable mCreateCv;
130 std::map<BufferId, std::unique_ptr<ClientBuffer>> mBuffers;
131 int mActive;
132 int64_t mLastChangeMs;
133
BufferCacheaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::BufferCache134 BufferCache() : mCreating(false), mActive(0),
135 mLastChangeMs(::android::elapsedRealtime()) {}
136
incActive_laidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::BufferCache137 void incActive_l() {
138 ++mActive;
139 mLastChangeMs = ::android::elapsedRealtime();
140 }
141
decActive_laidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::BufferCache142 void decActive_l() {
143 --mActive;
144 mLastChangeMs = ::android::elapsedRealtime();
145 }
146
cachedBufferCountaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::BufferCache147 int cachedBufferCount() const {
148 return mBuffers.size() - mActive;
149 }
150 } mCache;
151
152 // FMQ - release notifier
153 struct ReleaseCache {
154 std::mutex mLock;
155 std::list<BufferId> mReleasingIds;
156 std::list<BufferId> mReleasedIds;
157 uint32_t mInvalidateId; // TODO: invalidation ACK to bufferpool
158 bool mInvalidateAck;
159 std::unique_ptr<BufferStatusChannel> mStatusChannel;
160
ReleaseCacheaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ReleaseCache161 ReleaseCache() : mInvalidateId(0), mInvalidateAck(true) {}
162 } mReleasing;
163
164 // This lock is held during synchronization from remote side.
165 // In order to minimize remote calls and locking duration, this lock is held
166 // by best effort approach using try_lock().
167 std::mutex mRemoteSyncLock;
168 };
169
170 struct BufferPoolClient::Impl::BlockPoolDataDtor {
BlockPoolDataDtoraidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::BlockPoolDataDtor171 BlockPoolDataDtor(const std::shared_ptr<BufferPoolClient::Impl> &impl)
172 : mImpl(impl) {}
173
operator ()aidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::BlockPoolDataDtor174 void operator()(BufferPoolData *buffer) {
175 BufferId id = buffer->mId;
176 delete buffer;
177
178 auto impl = mImpl.lock();
179 if (impl && impl->isValid()) {
180 impl->postBufferRelease(id);
181 }
182 }
183 const std::weak_ptr<BufferPoolClient::Impl> mImpl;
184 };
185
186 struct BufferPoolClient::Impl::ClientBuffer {
187 private:
188 int64_t mExpireMs;
189 bool mHasCache;
190 ConnectionId mConnectionId;
191 BufferId mId;
192 native_handle_t *mHandle;
193 std::weak_ptr<BufferPoolData> mCache;
194
updateExpireaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer195 void updateExpire() {
196 mExpireMs = ::android::elapsedRealtime() + kCacheTtlMs;
197 }
198
199 public:
ClientBufferaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer200 ClientBuffer(
201 ConnectionId connectionId, BufferId id, native_handle_t *handle)
202 : mHasCache(false), mConnectionId(connectionId),
203 mId(id), mHandle(handle) {
204 mExpireMs = ::android::elapsedRealtime() + kCacheTtlMs;
205 }
206
~ClientBufferaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer207 ~ClientBuffer() {
208 if (mHandle) {
209 native_handle_close(mHandle);
210 native_handle_delete(mHandle);
211 }
212 }
213
idaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer214 BufferId id() const {
215 return mId;
216 }
217
expireaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer218 bool expire() const {
219 int64_t now = ::android::elapsedRealtime();
220 return now >= mExpireMs;
221 }
222
hasCacheaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer223 bool hasCache() const {
224 return mHasCache;
225 }
226
fetchCacheaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer227 std::shared_ptr<BufferPoolData> fetchCache(native_handle_t **pHandle) {
228 if (mHasCache) {
229 std::shared_ptr<BufferPoolData> cache = mCache.lock();
230 if (cache) {
231 *pHandle = mHandle;
232 }
233 return cache;
234 }
235 return nullptr;
236 }
237
createCacheaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer238 std::shared_ptr<BufferPoolData> createCache(
239 const std::shared_ptr<BufferPoolClient::Impl> &impl,
240 native_handle_t **pHandle) {
241 if (!mHasCache) {
242 // Allocates a raw ptr in order to avoid sending #postBufferRelease
243 // from deleter, in case of native_handle_clone failure.
244 BufferPoolData *ptr = new BufferPoolData(mConnectionId, mId);
245 if (ptr) {
246 std::shared_ptr<BufferPoolData> cache(ptr, BlockPoolDataDtor(impl));
247 if (cache) {
248 mCache = cache;
249 mHasCache = true;
250 *pHandle = mHandle;
251 return cache;
252 }
253 }
254 if (ptr) {
255 delete ptr;
256 }
257 }
258 return nullptr;
259 }
260
onCacheReleaseaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer261 bool onCacheRelease() {
262 if (mHasCache) {
263 // TODO: verify mCache is not valid;
264 updateExpire();
265 mHasCache = false;
266 return true;
267 }
268 return false;
269 }
270 };
271
Impl(const std::shared_ptr<Accessor> & accessor,const std::shared_ptr<IObserver> & observer)272 BufferPoolClient::Impl::Impl(const std::shared_ptr<Accessor> &accessor,
273 const std::shared_ptr<IObserver> &observer)
274 : mLocal(true), mValid(false), mAccessor(accessor), mSeqId(0),
275 mLastEvictCacheMs(::android::elapsedRealtime()) {
276 StatusDescriptor statusDesc;
277 InvalidationDescriptor invDesc;
278 BufferPoolStatus status = accessor->connect(
279 observer, true,
280 &mLocalConnection, &mConnectionId, &mReleasing.mInvalidateId,
281 &statusDesc, &invDesc);
282 if (status == ResultStatus::OK) {
283 mReleasing.mStatusChannel =
284 std::make_unique<BufferStatusChannel>(statusDesc);
285 mInvalidationListener =
286 std::make_unique<BufferInvalidationListener>(invDesc);
287 mValid = mReleasing.mStatusChannel &&
288 mReleasing.mStatusChannel->isValid() &&
289 mInvalidationListener &&
290 mInvalidationListener->isValid();
291 }
292 }
293
Impl(const std::shared_ptr<IAccessor> & accessor,const std::shared_ptr<IObserver> & observer)294 BufferPoolClient::Impl::Impl(const std::shared_ptr<IAccessor> &accessor,
295 const std::shared_ptr<IObserver> &observer)
296 : mLocal(false), mValid(false), mAccessor(accessor), mSeqId(0),
297 mLastEvictCacheMs(::android::elapsedRealtime()) {
298 IAccessor::ConnectionInfo conInfo;
299 bool valid = false;
300 if (accessor && accessor->connect(observer, &conInfo).isOk()) {
301 auto channel = std::make_unique<BufferStatusChannel>(conInfo.toFmqDesc);
302 auto observer = std::make_unique<BufferInvalidationListener>(conInfo.fromFmqDesc);
303
304 if (channel && channel->isValid()
305 && observer && observer->isValid()) {
306 mRemoteConnection = conInfo.connection;
307 mConnectionId = conInfo.connectionId;
308 mReleasing.mInvalidateId = conInfo.msgId;
309 mReleasing.mStatusChannel = std::move(channel);
310 mInvalidationListener = std::move(observer);
311 valid = true;
312 }
313 }
314 mValid = valid;
315 }
316
isActive(int64_t * lastTransactionMs,bool clearCache)317 bool BufferPoolClient::Impl::isActive(int64_t *lastTransactionMs, bool clearCache) {
318 bool active = false;
319 {
320 std::lock_guard<std::mutex> lock(mCache.mLock);
321 syncReleased();
322 evictCaches(clearCache);
323 *lastTransactionMs = mCache.mLastChangeMs;
324 active = mCache.mActive > 0;
325 }
326 if (mValid && mLocal && mLocalConnection) {
327 mLocalConnection->cleanUp(clearCache);
328 return true;
329 }
330 return active;
331 }
332
receiveInvalidation(uint32_t messageId)333 void BufferPoolClient::Impl::receiveInvalidation(uint32_t messageId) {
334 std::lock_guard<std::mutex> lock(mCache.mLock);
335 syncReleased(messageId);
336 // TODO: evict cache required?
337 }
338
flush()339 BufferPoolStatus BufferPoolClient::Impl::flush() {
340 if (!mLocal || !mLocalConnection || !mValid) {
341 return ResultStatus::CRITICAL_ERROR;
342 }
343 {
344 std::unique_lock<std::mutex> lock(mCache.mLock);
345 syncReleased();
346 evictCaches();
347 return mLocalConnection->flush();
348 }
349 }
350
allocate(const std::vector<uint8_t> & params,native_handle_t ** pHandle,std::shared_ptr<BufferPoolData> * buffer)351 BufferPoolStatus BufferPoolClient::Impl::allocate(
352 const std::vector<uint8_t> ¶ms,
353 native_handle_t **pHandle,
354 std::shared_ptr<BufferPoolData> *buffer) {
355 if (!mLocal || !mLocalConnection || !mValid) {
356 return ResultStatus::CRITICAL_ERROR;
357 }
358 BufferId bufferId;
359 native_handle_t *handle = nullptr;
360 buffer->reset();
361 BufferPoolStatus status = allocateBufferHandle(params, &bufferId, &handle);
362 if (status == ResultStatus::OK) {
363 if (handle) {
364 std::unique_lock<std::mutex> lock(mCache.mLock);
365 syncReleased();
366 evictCaches();
367 auto cacheIt = mCache.mBuffers.find(bufferId);
368 if (cacheIt != mCache.mBuffers.end()) {
369 // TODO: verify it is recycled. (not having active ref)
370 mCache.mBuffers.erase(cacheIt);
371 }
372 auto clientBuffer = std::make_unique<ClientBuffer>(
373 mConnectionId, bufferId, handle);
374 if (clientBuffer) {
375 auto result = mCache.mBuffers.insert(std::make_pair(
376 bufferId, std::move(clientBuffer)));
377 if (result.second) {
378 *buffer = result.first->second->createCache(
379 shared_from_this(), pHandle);
380 if (*buffer) {
381 mCache.incActive_l();
382 }
383 }
384 }
385 }
386 if (!*buffer) {
387 ALOGV("client cache creation failure %d: %lld",
388 handle != nullptr, (long long)mConnectionId);
389 status = ResultStatus::NO_MEMORY;
390 postBufferRelease(bufferId);
391 }
392 }
393 return status;
394 }
395
receive(TransactionId transactionId,BufferId bufferId,int64_t timestampMs,native_handle_t ** pHandle,std::shared_ptr<BufferPoolData> * buffer)396 BufferPoolStatus BufferPoolClient::Impl::receive(
397 TransactionId transactionId, BufferId bufferId, int64_t timestampMs,
398 native_handle_t **pHandle,
399 std::shared_ptr<BufferPoolData> *buffer) {
400 if (!mValid) {
401 return ResultStatus::CRITICAL_ERROR;
402 }
403 if (timestampMs != 0) {
404 timestampMs += kReceiveTimeoutMs;
405 }
406 if (!postReceive(bufferId, transactionId, timestampMs)) {
407 return ResultStatus::CRITICAL_ERROR;
408 }
409 BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
410 buffer->reset();
411 while(1) {
412 std::unique_lock<std::mutex> lock(mCache.mLock);
413 syncReleased();
414 evictCaches();
415 auto cacheIt = mCache.mBuffers.find(bufferId);
416 if (cacheIt != mCache.mBuffers.end()) {
417 if (cacheIt->second->hasCache()) {
418 *buffer = cacheIt->second->fetchCache(pHandle);
419 if (!*buffer) {
420 // check transfer time_out
421 lock.unlock();
422 std::this_thread::yield();
423 continue;
424 }
425 ALOGV("client receive from reference %lld", (long long)mConnectionId);
426 break;
427 } else {
428 *buffer = cacheIt->second->createCache(shared_from_this(), pHandle);
429 if (*buffer) {
430 mCache.incActive_l();
431 }
432 ALOGV("client receive from cache %lld", (long long)mConnectionId);
433 break;
434 }
435 } else {
436 if (!mCache.mCreating) {
437 mCache.mCreating = true;
438 lock.unlock();
439 native_handle_t* handle = nullptr;
440 status = fetchBufferHandle(transactionId, bufferId, &handle);
441 lock.lock();
442 if (status == ResultStatus::OK) {
443 if (handle) {
444 auto clientBuffer = std::make_unique<ClientBuffer>(
445 mConnectionId, bufferId, handle);
446 if (clientBuffer) {
447 auto result = mCache.mBuffers.insert(
448 std::make_pair(bufferId, std::move(
449 clientBuffer)));
450 if (result.second) {
451 *buffer = result.first->second->createCache(
452 shared_from_this(), pHandle);
453 if (*buffer) {
454 mCache.incActive_l();
455 }
456 }
457 }
458 }
459 if (!*buffer) {
460 status = ResultStatus::NO_MEMORY;
461 }
462 }
463 mCache.mCreating = false;
464 lock.unlock();
465 mCache.mCreateCv.notify_all();
466 break;
467 }
468 mCache.mCreateCv.wait(lock);
469 }
470 }
471 bool needsSync = false;
472 bool posted = postReceiveResult(bufferId, transactionId,
473 *buffer ? true : false, &needsSync);
474 ALOGV("client receive %lld - %u : %s (%d)", (long long)mConnectionId, bufferId,
475 *buffer ? "ok" : "fail", posted);
476 if (mValid && mLocal && mLocalConnection) {
477 mLocalConnection->cleanUp(false);
478 }
479 if (needsSync && mRemoteConnection) {
480 trySyncFromRemote();
481 }
482 if (*buffer) {
483 if (!posted) {
484 buffer->reset();
485 return ResultStatus::CRITICAL_ERROR;
486 }
487 return ResultStatus::OK;
488 }
489 return status;
490 }
491
492
postBufferRelease(BufferId bufferId)493 void BufferPoolClient::Impl::postBufferRelease(BufferId bufferId) {
494 std::lock_guard<std::mutex> lock(mReleasing.mLock);
495 mReleasing.mReleasingIds.push_back(bufferId);
496 mReleasing.mStatusChannel->postBufferRelease(
497 mConnectionId, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
498 }
499
500 // TODO: revise ad-hoc posting data structure
postSend(BufferId bufferId,ConnectionId receiver,TransactionId * transactionId,int64_t * timestampMs)501 bool BufferPoolClient::Impl::postSend(
502 BufferId bufferId, ConnectionId receiver,
503 TransactionId *transactionId, int64_t *timestampMs) {
504 {
505 // TODO: don't need to call syncReleased every time
506 std::lock_guard<std::mutex> lock(mCache.mLock);
507 syncReleased();
508 }
509 bool ret = false;
510 bool needsSync = false;
511 {
512 std::lock_guard<std::mutex> lock(mReleasing.mLock);
513 *timestampMs = ::android::elapsedRealtime();
514 *transactionId = (mConnectionId << 32) | mSeqId++;
515 // TODO: retry, add timeout, target?
516 ret = mReleasing.mStatusChannel->postBufferStatusMessage(
517 *transactionId, bufferId, BufferStatus::TRANSFER_TO, mConnectionId,
518 receiver, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
519 needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
520 }
521 if (mValid && mLocal && mLocalConnection) {
522 mLocalConnection->cleanUp(false);
523 }
524 if (needsSync && mRemoteConnection) {
525 trySyncFromRemote();
526 }
527 return ret;
528 }
529
postReceive(BufferId bufferId,TransactionId transactionId,int64_t timestampMs)530 bool BufferPoolClient::Impl::postReceive(
531 BufferId bufferId, TransactionId transactionId, int64_t timestampMs) {
532 for (int i = 0; i < kPostMaxRetry; ++i) {
533 std::unique_lock<std::mutex> lock(mReleasing.mLock);
534 int64_t now = ::android::elapsedRealtime();
535 if (timestampMs == 0 || now < timestampMs) {
536 bool result = mReleasing.mStatusChannel->postBufferStatusMessage(
537 transactionId, bufferId, BufferStatus::TRANSFER_FROM,
538 mConnectionId, -1, mReleasing.mReleasingIds,
539 mReleasing.mReleasedIds);
540 if (result) {
541 return true;
542 }
543 lock.unlock();
544 std::this_thread::yield();
545 } else {
546 mReleasing.mStatusChannel->postBufferStatusMessage(
547 transactionId, bufferId, BufferStatus::TRANSFER_TIMEOUT,
548 mConnectionId, -1, mReleasing.mReleasingIds,
549 mReleasing.mReleasedIds);
550 return false;
551 }
552 }
553 return false;
554 }
555
postReceiveResult(BufferId bufferId,TransactionId transactionId,bool result,bool * needsSync)556 bool BufferPoolClient::Impl::postReceiveResult(
557 BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync) {
558 std::lock_guard<std::mutex> lock(mReleasing.mLock);
559 // TODO: retry, add timeout
560 bool ret = mReleasing.mStatusChannel->postBufferStatusMessage(
561 transactionId, bufferId,
562 result ? BufferStatus::TRANSFER_OK : BufferStatus::TRANSFER_ERROR,
563 mConnectionId, -1, mReleasing.mReleasingIds,
564 mReleasing.mReleasedIds);
565 *needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
566 return ret;
567 }
568
trySyncFromRemote()569 void BufferPoolClient::Impl::trySyncFromRemote() {
570 if (mRemoteSyncLock.try_lock()) {
571 bool needsSync = false;
572 {
573 std::lock_guard<std::mutex> lock(mReleasing.mLock);
574 needsSync = mReleasing.mStatusChannel->needsSync();
575 }
576 if (needsSync) {
577 if (!mRemoteConnection->sync().isOk()) {
578 ALOGD("sync from client %lld failed: bufferpool process died.",
579 (long long)mConnectionId);
580 }
581 }
582 mRemoteSyncLock.unlock();
583 }
584 }
585
586 // should have mCache.mLock
syncReleased(uint32_t messageId)587 bool BufferPoolClient::Impl::syncReleased(uint32_t messageId) {
588 bool cleared = false;
589 {
590 std::lock_guard<std::mutex> lock(mReleasing.mLock);
591 if (mReleasing.mReleasingIds.size() > 0) {
592 mReleasing.mStatusChannel->postBufferRelease(
593 mConnectionId, mReleasing.mReleasingIds,
594 mReleasing.mReleasedIds);
595 }
596 if (mReleasing.mReleasedIds.size() > 0) {
597 for (BufferId& id: mReleasing.mReleasedIds) {
598 ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id);
599 auto found = mCache.mBuffers.find(id);
600 if (found != mCache.mBuffers.end()) {
601 if (found->second->onCacheRelease()) {
602 mCache.decActive_l();
603 } else {
604 // should not happen!
605 ALOGW("client %lld cache release status inconsistent!",
606 (long long)mConnectionId);
607 }
608 } else {
609 // should not happen!
610 ALOGW("client %lld cache status inconsistent!", (long long)mConnectionId);
611 }
612 }
613 mReleasing.mReleasedIds.clear();
614 cleared = true;
615 }
616 }
617 std::vector<BufferInvalidationMessage> invalidations;
618 mInvalidationListener->getInvalidations(invalidations);
619 uint32_t lastMsgId = 0;
620 if (invalidations.size() > 0) {
621 for (auto it = invalidations.begin(); it != invalidations.end(); ++it) {
622 if (it->messageId != 0) {
623 lastMsgId = it->messageId;
624 }
625 if (it->fromBufferId == it->toBufferId) {
626 // TODO: handle fromBufferId = UINT32_MAX
627 invalidateBuffer(it->fromBufferId);
628 } else {
629 invalidateRange(it->fromBufferId, it->toBufferId);
630 }
631 }
632 }
633 {
634 std::lock_guard<std::mutex> lock(mReleasing.mLock);
635 if (lastMsgId != 0) {
636 if (isMessageLater(lastMsgId, mReleasing.mInvalidateId)) {
637 mReleasing.mInvalidateId = lastMsgId;
638 mReleasing.mInvalidateAck = false;
639 }
640 } else if (messageId != 0) {
641 // messages are drained.
642 if (isMessageLater(messageId, mReleasing.mInvalidateId)) {
643 mReleasing.mInvalidateId = messageId;
644 mReleasing.mInvalidateAck = true;
645 }
646 }
647 if (!mReleasing.mInvalidateAck) {
648 // post ACK
649 mReleasing.mStatusChannel->postBufferInvalidateAck(
650 mConnectionId,
651 mReleasing.mInvalidateId, &mReleasing.mInvalidateAck);
652 ALOGV("client %lld invalidateion ack (%d) %u",
653 (long long)mConnectionId,
654 mReleasing.mInvalidateAck, mReleasing.mInvalidateId);
655 }
656 }
657 return cleared;
658 }
659
660 // should have mCache.mLock
evictCaches(bool clearCache)661 void BufferPoolClient::Impl::evictCaches(bool clearCache) {
662 int64_t now = ::android::elapsedRealtime();
663 if (now >= mLastEvictCacheMs + kCacheTtlMs ||
664 clearCache || mCache.cachedBufferCount() > kMaxCachedBufferCount) {
665 size_t evicted = 0;
666 for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
667 if (!it->second->hasCache() && (it->second->expire() ||
668 clearCache || mCache.cachedBufferCount() > kCachedBufferCountTarget)) {
669 it = mCache.mBuffers.erase(it);
670 ++evicted;
671 } else {
672 ++it;
673 }
674 }
675 ALOGV("cache count %lld : total %zu, active %d, evicted %zu",
676 (long long)mConnectionId, mCache.mBuffers.size(), mCache.mActive, evicted);
677 mLastEvictCacheMs = now;
678 }
679 }
680
681 // should have mCache.mLock
invalidateBuffer(BufferId id)682 void BufferPoolClient::Impl::invalidateBuffer(BufferId id) {
683 for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end(); ++it) {
684 if (id == it->second->id()) {
685 if (!it->second->hasCache()) {
686 mCache.mBuffers.erase(it);
687 ALOGV("cache invalidated %lld : buffer %u",
688 (long long)mConnectionId, id);
689 } else {
690 ALOGW("Inconsistent invalidation %lld : activer buffer!! %u",
691 (long long)mConnectionId, (unsigned int)id);
692 }
693 break;
694 }
695 }
696 }
697
698 // should have mCache.mLock
invalidateRange(BufferId from,BufferId to)699 void BufferPoolClient::Impl::invalidateRange(BufferId from, BufferId to) {
700 size_t invalidated = 0;
701 for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
702 if (!it->second->hasCache()) {
703 BufferId bid = it->second->id();
704 if (from < to) {
705 if (from <= bid && bid < to) {
706 ++invalidated;
707 it = mCache.mBuffers.erase(it);
708 continue;
709 }
710 } else {
711 if (from <= bid || bid < to) {
712 ++invalidated;
713 it = mCache.mBuffers.erase(it);
714 continue;
715 }
716 }
717 }
718 ++it;
719 }
720 ALOGV("cache invalidated %lld : # of invalidated %zu",
721 (long long)mConnectionId, invalidated);
722 }
723
allocateBufferHandle(const std::vector<uint8_t> & params,BufferId * bufferId,native_handle_t ** handle)724 BufferPoolStatus BufferPoolClient::Impl::allocateBufferHandle(
725 const std::vector<uint8_t>& params, BufferId *bufferId,
726 native_handle_t** handle) {
727 if (mLocalConnection) {
728 const native_handle_t* allocHandle = nullptr;
729 BufferPoolStatus status = mLocalConnection->allocate(
730 params, bufferId, &allocHandle);
731 if (status == ResultStatus::OK) {
732 *handle = native_handle_clone(allocHandle);
733 }
734 ALOGV("client allocate result %lld %d : %u clone %p",
735 (long long)mConnectionId, status == ResultStatus::OK,
736 *handle ? *bufferId : 0 , *handle);
737 return status;
738 }
739 return ResultStatus::CRITICAL_ERROR;
740 }
741
fetchBufferHandle(TransactionId transactionId,BufferId bufferId,native_handle_t ** handle)742 BufferPoolStatus BufferPoolClient::Impl::fetchBufferHandle(
743 TransactionId transactionId, BufferId bufferId,
744 native_handle_t **handle) {
745 std::shared_ptr<IConnection> connection;
746 if (mLocal) {
747 connection = mLocalConnection;
748 } else {
749 connection = mRemoteConnection;
750 }
751 if (!connection) {
752 ALOGE("connection null: fetchBufferHandle()");
753 return ResultStatus::CRITICAL_ERROR;
754 }
755 std::vector<FetchInfo> infos;
756 std::vector<FetchResult> results;
757 infos.emplace_back(FetchInfo{ToAidl(transactionId), ToAidl(bufferId)});
758 ndk::ScopedAStatus status = connection->fetch(infos, &results);
759 if (!status.isOk()) {
760 BufferPoolStatus svcSpecific = status.getServiceSpecificError();
761 return svcSpecific ? svcSpecific : ResultStatus::CRITICAL_ERROR;
762 }
763 if (results[0].getTag() == FetchResult::buffer) {
764 if (results[0].get<FetchResult::buffer>().buffer.has_value()) {
765 *handle = ::android::dupFromAidl(results[0].get<FetchResult::buffer>().buffer.value());
766 } else {
767 // TODO: Support HardwareBuffer
768 ALOGW("handle nullptr");
769 *handle = nullptr;
770 }
771 return ResultStatus::OK;
772 }
773 return results[0].get<FetchResult::failure>();
774 }
775
776
BufferPoolClient(const std::shared_ptr<Accessor> & accessor,const std::shared_ptr<IObserver> & observer)777 BufferPoolClient::BufferPoolClient(const std::shared_ptr<Accessor> &accessor,
778 const std::shared_ptr<IObserver> &observer) {
779 mImpl = std::make_shared<Impl>(accessor, observer);
780 }
781
BufferPoolClient(const std::shared_ptr<IAccessor> & accessor,const std::shared_ptr<IObserver> & observer)782 BufferPoolClient::BufferPoolClient(const std::shared_ptr<IAccessor> &accessor,
783 const std::shared_ptr<IObserver> &observer) {
784 mImpl = std::make_shared<Impl>(accessor, observer);
785 }
786
~BufferPoolClient()787 BufferPoolClient::~BufferPoolClient() {
788 // TODO: how to handle orphaned buffers?
789 }
790
isValid()791 bool BufferPoolClient::isValid() {
792 return mImpl && mImpl->isValid();
793 }
794
isLocal()795 bool BufferPoolClient::isLocal() {
796 return mImpl && mImpl->isLocal();
797 }
798
isActive(int64_t * lastTransactionMs,bool clearCache)799 bool BufferPoolClient::isActive(int64_t *lastTransactionMs, bool clearCache) {
800 if (!isValid()) {
801 *lastTransactionMs = 0;
802 return false;
803 }
804 return mImpl->isActive(lastTransactionMs, clearCache);
805 }
806
getConnectionId()807 ConnectionId BufferPoolClient::getConnectionId() {
808 if (isValid()) {
809 return mImpl->getConnectionId();
810 }
811 return -1;
812 }
813
getAccessor(std::shared_ptr<IAccessor> * accessor)814 BufferPoolStatus BufferPoolClient::getAccessor(std::shared_ptr<IAccessor> *accessor) {
815 if (isValid()) {
816 *accessor = mImpl->getAccessor();
817 return ResultStatus::OK;
818 }
819 return ResultStatus::CRITICAL_ERROR;
820 }
821
receiveInvalidation(uint32_t msgId)822 void BufferPoolClient::receiveInvalidation(uint32_t msgId) {
823 ALOGV("bufferpool2 client recv inv %u", msgId);
824 if (isValid()) {
825 mImpl->receiveInvalidation(msgId);
826 }
827 }
828
flush()829 BufferPoolStatus BufferPoolClient::flush() {
830 if (isValid()) {
831 return mImpl->flush();
832 }
833 return ResultStatus::CRITICAL_ERROR;
834 }
835
allocate(const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)836 BufferPoolStatus BufferPoolClient::allocate(
837 const std::vector<uint8_t> ¶ms,
838 native_handle_t **handle,
839 std::shared_ptr<BufferPoolData> *buffer) {
840 if (isValid()) {
841 return mImpl->allocate(params, handle, buffer);
842 }
843 return ResultStatus::CRITICAL_ERROR;
844 }
845
receive(TransactionId transactionId,BufferId bufferId,int64_t timestampMs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)846 BufferPoolStatus BufferPoolClient::receive(
847 TransactionId transactionId, BufferId bufferId, int64_t timestampMs,
848 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
849 if (isValid()) {
850 return mImpl->receive(transactionId, bufferId, timestampMs, handle, buffer);
851 }
852 return ResultStatus::CRITICAL_ERROR;
853 }
854
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampMs)855 BufferPoolStatus BufferPoolClient::postSend(
856 ConnectionId receiverId,
857 const std::shared_ptr<BufferPoolData> &buffer,
858 TransactionId *transactionId,
859 int64_t *timestampMs) {
860 if (isValid()) {
861 bool result = mImpl->postSend(
862 buffer->mId, receiverId, transactionId, timestampMs);
863 return result ? ResultStatus::OK : ResultStatus::CRITICAL_ERROR;
864 }
865 return ResultStatus::CRITICAL_ERROR;
866 }
867
868 } // namespace aidl::android::hardware::media::bufferpool2::implementation
869