xref: /aosp_15_r20/hardware/interfaces/media/bufferpool/aidl/default/BufferPoolClient.cpp (revision 4d7e907c777eeecc4c5bd7cf640a754fac206ff7)
1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "AidlBufferPoolCli"
18 //#define LOG_NDEBUG 0
19 
20 #include <thread>
21 #include <aidlcommonsupport/NativeHandle.h>
22 #include <utils/Log.h>
23 #include "BufferPoolClient.h"
24 #include "Accessor.h"
25 #include "Connection.h"
26 
27 namespace aidl::android::hardware::media::bufferpool2::implementation {
28 
29 using aidl::android::hardware::media::bufferpool2::IConnection;
30 using aidl::android::hardware::media::bufferpool2::ResultStatus;
31 using FetchInfo = aidl::android::hardware::media::bufferpool2::IConnection::FetchInfo;
32 using FetchResult = aidl::android::hardware::media::bufferpool2::IConnection::FetchResult;
33 
34 static constexpr int64_t kReceiveTimeoutMs = 2000; // 2s
35 static constexpr int kPostMaxRetry = 3;
36 static constexpr int kCacheTtlMs = 1000;
37 static constexpr size_t kMaxCachedBufferCount = 64;
38 static constexpr size_t kCachedBufferCountTarget = kMaxCachedBufferCount - 16;
39 
40 class BufferPoolClient::Impl
41         : public std::enable_shared_from_this<BufferPoolClient::Impl> {
42 public:
43     explicit Impl(const std::shared_ptr<Accessor> &accessor,
44                   const std::shared_ptr<IObserver> &observer);
45 
46     explicit Impl(const std::shared_ptr<IAccessor> &accessor,
47                   const std::shared_ptr<IObserver> &observer);
48 
isValid()49     bool isValid() {
50         return mValid;
51     }
52 
isLocal()53     bool isLocal() {
54         return mValid && mLocal;
55     }
56 
getConnectionId()57     ConnectionId getConnectionId() {
58         return mConnectionId;
59     }
60 
getAccessor()61     std::shared_ptr<IAccessor> &getAccessor() {
62         return mAccessor;
63     }
64 
65     bool isActive(int64_t *lastTransactionMs, bool clearCache);
66 
67     void receiveInvalidation(uint32_t msgID);
68 
69     BufferPoolStatus flush();
70 
71     BufferPoolStatus allocate(const std::vector<uint8_t> &params,
72                           native_handle_t **handle,
73                           std::shared_ptr<BufferPoolData> *buffer);
74 
75     BufferPoolStatus receive(
76             TransactionId transactionId, BufferId bufferId,
77             int64_t timestampMs,
78             native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer);
79 
80     void postBufferRelease(BufferId bufferId);
81 
82     bool postSend(
83             BufferId bufferId, ConnectionId receiver,
84             TransactionId *transactionId, int64_t *timestampMs);
85 private:
86 
87     bool postReceive(
88             BufferId bufferId, TransactionId transactionId,
89             int64_t timestampMs);
90 
91     bool postReceiveResult(
92             BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync);
93 
94     void trySyncFromRemote();
95 
96     bool syncReleased(uint32_t msgId = 0);
97 
98     void evictCaches(bool clearCache = false);
99 
100     void invalidateBuffer(BufferId id);
101 
102     void invalidateRange(BufferId from, BufferId to);
103 
104     BufferPoolStatus allocateBufferHandle(
105             const std::vector<uint8_t>& params, BufferId *bufferId,
106             native_handle_t **handle);
107 
108     BufferPoolStatus fetchBufferHandle(
109             TransactionId transactionId, BufferId bufferId,
110             native_handle_t **handle);
111 
112     struct BlockPoolDataDtor;
113     struct ClientBuffer;
114 
115     bool mLocal;
116     bool mValid;
117     std::shared_ptr<IAccessor> mAccessor;
118     std::shared_ptr<Connection> mLocalConnection;
119     std::shared_ptr<IConnection> mRemoteConnection;
120     uint32_t mSeqId;
121     ConnectionId mConnectionId;
122     int64_t mLastEvictCacheMs;
123     std::unique_ptr<BufferInvalidationListener> mInvalidationListener;
124 
125     // CachedBuffers
126     struct BufferCache {
127         std::mutex mLock;
128         bool mCreating;
129         std::condition_variable mCreateCv;
130         std::map<BufferId, std::unique_ptr<ClientBuffer>> mBuffers;
131         int mActive;
132         int64_t mLastChangeMs;
133 
BufferCacheaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::BufferCache134         BufferCache() : mCreating(false), mActive(0),
135                 mLastChangeMs(::android::elapsedRealtime()) {}
136 
incActive_laidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::BufferCache137         void incActive_l() {
138             ++mActive;
139             mLastChangeMs = ::android::elapsedRealtime();
140         }
141 
decActive_laidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::BufferCache142         void decActive_l() {
143             --mActive;
144             mLastChangeMs = ::android::elapsedRealtime();
145         }
146 
cachedBufferCountaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::BufferCache147         int cachedBufferCount() const {
148             return mBuffers.size() - mActive;
149         }
150     } mCache;
151 
152     // FMQ - release notifier
153     struct ReleaseCache {
154         std::mutex mLock;
155         std::list<BufferId> mReleasingIds;
156         std::list<BufferId> mReleasedIds;
157         uint32_t mInvalidateId; // TODO: invalidation ACK to bufferpool
158         bool mInvalidateAck;
159         std::unique_ptr<BufferStatusChannel> mStatusChannel;
160 
ReleaseCacheaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ReleaseCache161         ReleaseCache() : mInvalidateId(0), mInvalidateAck(true) {}
162     } mReleasing;
163 
164     // This lock is held during synchronization from remote side.
165     // In order to minimize remote calls and locking duration, this lock is held
166     // by best effort approach using try_lock().
167     std::mutex mRemoteSyncLock;
168 };
169 
170 struct BufferPoolClient::Impl::BlockPoolDataDtor {
BlockPoolDataDtoraidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::BlockPoolDataDtor171     BlockPoolDataDtor(const std::shared_ptr<BufferPoolClient::Impl> &impl)
172             : mImpl(impl) {}
173 
operator ()aidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::BlockPoolDataDtor174     void operator()(BufferPoolData *buffer) {
175         BufferId id = buffer->mId;
176         delete buffer;
177 
178         auto impl = mImpl.lock();
179         if (impl && impl->isValid()) {
180             impl->postBufferRelease(id);
181         }
182     }
183     const std::weak_ptr<BufferPoolClient::Impl> mImpl;
184 };
185 
186 struct BufferPoolClient::Impl::ClientBuffer {
187 private:
188     int64_t mExpireMs;
189     bool mHasCache;
190     ConnectionId mConnectionId;
191     BufferId mId;
192     native_handle_t *mHandle;
193     std::weak_ptr<BufferPoolData> mCache;
194 
updateExpireaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer195     void updateExpire() {
196         mExpireMs = ::android::elapsedRealtime() + kCacheTtlMs;
197     }
198 
199 public:
ClientBufferaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer200     ClientBuffer(
201             ConnectionId connectionId, BufferId id, native_handle_t *handle)
202             : mHasCache(false), mConnectionId(connectionId),
203               mId(id), mHandle(handle) {
204         mExpireMs = ::android::elapsedRealtime() + kCacheTtlMs;
205     }
206 
~ClientBufferaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer207     ~ClientBuffer() {
208         if (mHandle) {
209             native_handle_close(mHandle);
210             native_handle_delete(mHandle);
211         }
212     }
213 
idaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer214     BufferId id() const {
215         return mId;
216     }
217 
expireaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer218     bool expire() const {
219         int64_t now = ::android::elapsedRealtime();
220         return now >= mExpireMs;
221     }
222 
hasCacheaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer223     bool hasCache() const {
224         return mHasCache;
225     }
226 
fetchCacheaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer227     std::shared_ptr<BufferPoolData> fetchCache(native_handle_t **pHandle) {
228         if (mHasCache) {
229             std::shared_ptr<BufferPoolData> cache = mCache.lock();
230             if (cache) {
231                 *pHandle = mHandle;
232             }
233             return cache;
234         }
235         return nullptr;
236     }
237 
createCacheaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer238     std::shared_ptr<BufferPoolData> createCache(
239             const std::shared_ptr<BufferPoolClient::Impl> &impl,
240             native_handle_t **pHandle) {
241         if (!mHasCache) {
242             // Allocates a raw ptr in order to avoid sending #postBufferRelease
243             // from deleter, in case of native_handle_clone failure.
244             BufferPoolData *ptr = new BufferPoolData(mConnectionId, mId);
245             if (ptr) {
246                 std::shared_ptr<BufferPoolData> cache(ptr, BlockPoolDataDtor(impl));
247                 if (cache) {
248                     mCache = cache;
249                     mHasCache = true;
250                     *pHandle = mHandle;
251                     return cache;
252                 }
253             }
254             if (ptr) {
255                 delete ptr;
256             }
257         }
258         return nullptr;
259     }
260 
onCacheReleaseaidl::android::hardware::media::bufferpool2::implementation::BufferPoolClient::Impl::ClientBuffer261     bool onCacheRelease() {
262         if (mHasCache) {
263             // TODO: verify mCache is not valid;
264             updateExpire();
265             mHasCache = false;
266             return true;
267         }
268         return false;
269     }
270 };
271 
Impl(const std::shared_ptr<Accessor> & accessor,const std::shared_ptr<IObserver> & observer)272 BufferPoolClient::Impl::Impl(const std::shared_ptr<Accessor> &accessor,
273                              const std::shared_ptr<IObserver> &observer)
274     : mLocal(true), mValid(false), mAccessor(accessor), mSeqId(0),
275       mLastEvictCacheMs(::android::elapsedRealtime()) {
276     StatusDescriptor statusDesc;
277     InvalidationDescriptor invDesc;
278     BufferPoolStatus status = accessor->connect(
279             observer, true,
280             &mLocalConnection, &mConnectionId, &mReleasing.mInvalidateId,
281             &statusDesc, &invDesc);
282     if (status == ResultStatus::OK) {
283         mReleasing.mStatusChannel =
284                 std::make_unique<BufferStatusChannel>(statusDesc);
285         mInvalidationListener =
286                 std::make_unique<BufferInvalidationListener>(invDesc);
287         mValid = mReleasing.mStatusChannel &&
288                 mReleasing.mStatusChannel->isValid() &&
289                 mInvalidationListener &&
290                 mInvalidationListener->isValid();
291     }
292 }
293 
Impl(const std::shared_ptr<IAccessor> & accessor,const std::shared_ptr<IObserver> & observer)294 BufferPoolClient::Impl::Impl(const std::shared_ptr<IAccessor> &accessor,
295                              const std::shared_ptr<IObserver> &observer)
296     : mLocal(false), mValid(false), mAccessor(accessor), mSeqId(0),
297       mLastEvictCacheMs(::android::elapsedRealtime()) {
298     IAccessor::ConnectionInfo conInfo;
299     bool valid = false;
300     if (accessor && accessor->connect(observer, &conInfo).isOk()) {
301         auto channel = std::make_unique<BufferStatusChannel>(conInfo.toFmqDesc);
302         auto observer = std::make_unique<BufferInvalidationListener>(conInfo.fromFmqDesc);
303 
304         if (channel && channel->isValid()
305             && observer && observer->isValid()) {
306             mRemoteConnection = conInfo.connection;
307             mConnectionId = conInfo.connectionId;
308             mReleasing.mInvalidateId = conInfo.msgId;
309             mReleasing.mStatusChannel = std::move(channel);
310             mInvalidationListener = std::move(observer);
311             valid = true;
312         }
313     }
314     mValid = valid;
315 }
316 
isActive(int64_t * lastTransactionMs,bool clearCache)317 bool BufferPoolClient::Impl::isActive(int64_t *lastTransactionMs, bool clearCache) {
318     bool active = false;
319     {
320         std::lock_guard<std::mutex> lock(mCache.mLock);
321         syncReleased();
322         evictCaches(clearCache);
323         *lastTransactionMs = mCache.mLastChangeMs;
324         active = mCache.mActive > 0;
325     }
326     if (mValid && mLocal && mLocalConnection) {
327         mLocalConnection->cleanUp(clearCache);
328         return true;
329     }
330     return active;
331 }
332 
receiveInvalidation(uint32_t messageId)333 void BufferPoolClient::Impl::receiveInvalidation(uint32_t messageId) {
334     std::lock_guard<std::mutex> lock(mCache.mLock);
335     syncReleased(messageId);
336     // TODO: evict cache required?
337 }
338 
flush()339 BufferPoolStatus BufferPoolClient::Impl::flush() {
340     if (!mLocal || !mLocalConnection || !mValid) {
341         return ResultStatus::CRITICAL_ERROR;
342     }
343     {
344         std::unique_lock<std::mutex> lock(mCache.mLock);
345         syncReleased();
346         evictCaches();
347         return mLocalConnection->flush();
348     }
349 }
350 
allocate(const std::vector<uint8_t> & params,native_handle_t ** pHandle,std::shared_ptr<BufferPoolData> * buffer)351 BufferPoolStatus BufferPoolClient::Impl::allocate(
352         const std::vector<uint8_t> &params,
353         native_handle_t **pHandle,
354         std::shared_ptr<BufferPoolData> *buffer) {
355     if (!mLocal || !mLocalConnection || !mValid) {
356         return ResultStatus::CRITICAL_ERROR;
357     }
358     BufferId bufferId;
359     native_handle_t *handle = nullptr;
360     buffer->reset();
361     BufferPoolStatus status = allocateBufferHandle(params, &bufferId, &handle);
362     if (status == ResultStatus::OK) {
363         if (handle) {
364             std::unique_lock<std::mutex> lock(mCache.mLock);
365             syncReleased();
366             evictCaches();
367             auto cacheIt = mCache.mBuffers.find(bufferId);
368             if (cacheIt != mCache.mBuffers.end()) {
369                 // TODO: verify it is recycled. (not having active ref)
370                 mCache.mBuffers.erase(cacheIt);
371             }
372             auto clientBuffer = std::make_unique<ClientBuffer>(
373                     mConnectionId, bufferId, handle);
374             if (clientBuffer) {
375                 auto result = mCache.mBuffers.insert(std::make_pair(
376                         bufferId, std::move(clientBuffer)));
377                 if (result.second) {
378                     *buffer = result.first->second->createCache(
379                             shared_from_this(), pHandle);
380                     if (*buffer) {
381                         mCache.incActive_l();
382                     }
383                 }
384             }
385         }
386         if (!*buffer) {
387             ALOGV("client cache creation failure %d: %lld",
388                   handle != nullptr, (long long)mConnectionId);
389             status = ResultStatus::NO_MEMORY;
390             postBufferRelease(bufferId);
391         }
392     }
393     return status;
394 }
395 
receive(TransactionId transactionId,BufferId bufferId,int64_t timestampMs,native_handle_t ** pHandle,std::shared_ptr<BufferPoolData> * buffer)396 BufferPoolStatus BufferPoolClient::Impl::receive(
397         TransactionId transactionId, BufferId bufferId, int64_t timestampMs,
398         native_handle_t **pHandle,
399         std::shared_ptr<BufferPoolData> *buffer) {
400     if (!mValid) {
401         return ResultStatus::CRITICAL_ERROR;
402     }
403     if (timestampMs != 0) {
404         timestampMs += kReceiveTimeoutMs;
405     }
406     if (!postReceive(bufferId, transactionId, timestampMs)) {
407         return ResultStatus::CRITICAL_ERROR;
408     }
409     BufferPoolStatus status = ResultStatus::CRITICAL_ERROR;
410     buffer->reset();
411     while(1) {
412         std::unique_lock<std::mutex> lock(mCache.mLock);
413         syncReleased();
414         evictCaches();
415         auto cacheIt = mCache.mBuffers.find(bufferId);
416         if (cacheIt != mCache.mBuffers.end()) {
417             if (cacheIt->second->hasCache()) {
418                 *buffer = cacheIt->second->fetchCache(pHandle);
419                 if (!*buffer) {
420                     // check transfer time_out
421                     lock.unlock();
422                     std::this_thread::yield();
423                     continue;
424                 }
425                 ALOGV("client receive from reference %lld", (long long)mConnectionId);
426                 break;
427             } else {
428                 *buffer = cacheIt->second->createCache(shared_from_this(), pHandle);
429                 if (*buffer) {
430                     mCache.incActive_l();
431                 }
432                 ALOGV("client receive from cache %lld", (long long)mConnectionId);
433                 break;
434             }
435         } else {
436             if (!mCache.mCreating) {
437                 mCache.mCreating = true;
438                 lock.unlock();
439                 native_handle_t* handle = nullptr;
440                 status = fetchBufferHandle(transactionId, bufferId, &handle);
441                 lock.lock();
442                 if (status == ResultStatus::OK) {
443                     if (handle) {
444                         auto clientBuffer = std::make_unique<ClientBuffer>(
445                                 mConnectionId, bufferId, handle);
446                         if (clientBuffer) {
447                             auto result = mCache.mBuffers.insert(
448                                     std::make_pair(bufferId, std::move(
449                                             clientBuffer)));
450                             if (result.second) {
451                                 *buffer = result.first->second->createCache(
452                                         shared_from_this(), pHandle);
453                                 if (*buffer) {
454                                     mCache.incActive_l();
455                                 }
456                             }
457                         }
458                     }
459                     if (!*buffer) {
460                         status = ResultStatus::NO_MEMORY;
461                     }
462                 }
463                 mCache.mCreating = false;
464                 lock.unlock();
465                 mCache.mCreateCv.notify_all();
466                 break;
467             }
468             mCache.mCreateCv.wait(lock);
469         }
470     }
471     bool needsSync = false;
472     bool posted = postReceiveResult(bufferId, transactionId,
473                                       *buffer ? true : false, &needsSync);
474     ALOGV("client receive %lld - %u : %s (%d)", (long long)mConnectionId, bufferId,
475           *buffer ? "ok" : "fail", posted);
476     if (mValid && mLocal && mLocalConnection) {
477         mLocalConnection->cleanUp(false);
478     }
479     if (needsSync && mRemoteConnection) {
480         trySyncFromRemote();
481     }
482     if (*buffer) {
483         if (!posted) {
484             buffer->reset();
485             return ResultStatus::CRITICAL_ERROR;
486         }
487         return ResultStatus::OK;
488     }
489     return status;
490 }
491 
492 
postBufferRelease(BufferId bufferId)493 void BufferPoolClient::Impl::postBufferRelease(BufferId bufferId) {
494     std::lock_guard<std::mutex> lock(mReleasing.mLock);
495     mReleasing.mReleasingIds.push_back(bufferId);
496     mReleasing.mStatusChannel->postBufferRelease(
497             mConnectionId, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
498 }
499 
500 // TODO: revise ad-hoc posting data structure
postSend(BufferId bufferId,ConnectionId receiver,TransactionId * transactionId,int64_t * timestampMs)501 bool BufferPoolClient::Impl::postSend(
502         BufferId bufferId, ConnectionId receiver,
503         TransactionId *transactionId, int64_t *timestampMs) {
504     {
505         // TODO: don't need to call syncReleased every time
506         std::lock_guard<std::mutex> lock(mCache.mLock);
507         syncReleased();
508     }
509     bool ret = false;
510     bool needsSync = false;
511     {
512         std::lock_guard<std::mutex> lock(mReleasing.mLock);
513         *timestampMs = ::android::elapsedRealtime();
514         *transactionId = (mConnectionId << 32) | mSeqId++;
515         // TODO: retry, add timeout, target?
516         ret =  mReleasing.mStatusChannel->postBufferStatusMessage(
517                 *transactionId, bufferId, BufferStatus::TRANSFER_TO, mConnectionId,
518                 receiver, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
519         needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
520     }
521     if (mValid && mLocal && mLocalConnection) {
522         mLocalConnection->cleanUp(false);
523     }
524     if (needsSync && mRemoteConnection) {
525         trySyncFromRemote();
526     }
527     return ret;
528 }
529 
postReceive(BufferId bufferId,TransactionId transactionId,int64_t timestampMs)530 bool BufferPoolClient::Impl::postReceive(
531         BufferId bufferId, TransactionId transactionId, int64_t timestampMs) {
532     for (int i = 0; i < kPostMaxRetry; ++i) {
533         std::unique_lock<std::mutex> lock(mReleasing.mLock);
534         int64_t now = ::android::elapsedRealtime();
535         if (timestampMs == 0 || now < timestampMs) {
536             bool result = mReleasing.mStatusChannel->postBufferStatusMessage(
537                     transactionId, bufferId, BufferStatus::TRANSFER_FROM,
538                     mConnectionId, -1, mReleasing.mReleasingIds,
539                     mReleasing.mReleasedIds);
540             if (result) {
541                 return true;
542             }
543             lock.unlock();
544             std::this_thread::yield();
545         } else {
546             mReleasing.mStatusChannel->postBufferStatusMessage(
547                     transactionId, bufferId, BufferStatus::TRANSFER_TIMEOUT,
548                     mConnectionId, -1, mReleasing.mReleasingIds,
549                     mReleasing.mReleasedIds);
550             return false;
551         }
552     }
553     return false;
554 }
555 
postReceiveResult(BufferId bufferId,TransactionId transactionId,bool result,bool * needsSync)556 bool BufferPoolClient::Impl::postReceiveResult(
557         BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync) {
558     std::lock_guard<std::mutex> lock(mReleasing.mLock);
559     // TODO: retry, add timeout
560     bool ret = mReleasing.mStatusChannel->postBufferStatusMessage(
561             transactionId, bufferId,
562             result ? BufferStatus::TRANSFER_OK : BufferStatus::TRANSFER_ERROR,
563             mConnectionId, -1, mReleasing.mReleasingIds,
564             mReleasing.mReleasedIds);
565     *needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
566     return ret;
567 }
568 
trySyncFromRemote()569 void BufferPoolClient::Impl::trySyncFromRemote() {
570     if (mRemoteSyncLock.try_lock()) {
571         bool needsSync = false;
572         {
573             std::lock_guard<std::mutex> lock(mReleasing.mLock);
574             needsSync = mReleasing.mStatusChannel->needsSync();
575         }
576         if (needsSync) {
577             if (!mRemoteConnection->sync().isOk()) {
578                 ALOGD("sync from client %lld failed: bufferpool process died.",
579                       (long long)mConnectionId);
580             }
581         }
582         mRemoteSyncLock.unlock();
583     }
584 }
585 
586 // should have mCache.mLock
syncReleased(uint32_t messageId)587 bool BufferPoolClient::Impl::syncReleased(uint32_t messageId) {
588     bool cleared = false;
589     {
590         std::lock_guard<std::mutex> lock(mReleasing.mLock);
591         if (mReleasing.mReleasingIds.size() > 0) {
592             mReleasing.mStatusChannel->postBufferRelease(
593                     mConnectionId, mReleasing.mReleasingIds,
594                     mReleasing.mReleasedIds);
595         }
596         if (mReleasing.mReleasedIds.size() > 0) {
597             for (BufferId& id: mReleasing.mReleasedIds) {
598                 ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id);
599                 auto found = mCache.mBuffers.find(id);
600                 if (found != mCache.mBuffers.end()) {
601                     if (found->second->onCacheRelease()) {
602                         mCache.decActive_l();
603                     } else {
604                         // should not happen!
605                         ALOGW("client %lld cache release status inconsistent!",
606                             (long long)mConnectionId);
607                     }
608                 } else {
609                     // should not happen!
610                     ALOGW("client %lld cache status inconsistent!", (long long)mConnectionId);
611                 }
612             }
613             mReleasing.mReleasedIds.clear();
614             cleared = true;
615         }
616     }
617     std::vector<BufferInvalidationMessage> invalidations;
618     mInvalidationListener->getInvalidations(invalidations);
619     uint32_t lastMsgId = 0;
620     if (invalidations.size() > 0) {
621         for (auto it = invalidations.begin(); it != invalidations.end(); ++it) {
622             if (it->messageId != 0) {
623                 lastMsgId = it->messageId;
624             }
625             if (it->fromBufferId == it->toBufferId) {
626                 // TODO: handle fromBufferId = UINT32_MAX
627                 invalidateBuffer(it->fromBufferId);
628             } else {
629                 invalidateRange(it->fromBufferId, it->toBufferId);
630             }
631         }
632     }
633     {
634         std::lock_guard<std::mutex> lock(mReleasing.mLock);
635         if (lastMsgId != 0) {
636             if (isMessageLater(lastMsgId, mReleasing.mInvalidateId)) {
637                 mReleasing.mInvalidateId = lastMsgId;
638                 mReleasing.mInvalidateAck = false;
639             }
640         } else if (messageId != 0) {
641             // messages are drained.
642             if (isMessageLater(messageId, mReleasing.mInvalidateId)) {
643                 mReleasing.mInvalidateId = messageId;
644                 mReleasing.mInvalidateAck = true;
645             }
646         }
647         if (!mReleasing.mInvalidateAck) {
648             // post ACK
649             mReleasing.mStatusChannel->postBufferInvalidateAck(
650                     mConnectionId,
651                     mReleasing.mInvalidateId, &mReleasing.mInvalidateAck);
652             ALOGV("client %lld invalidateion ack (%d) %u",
653                 (long long)mConnectionId,
654                 mReleasing.mInvalidateAck, mReleasing.mInvalidateId);
655         }
656     }
657     return cleared;
658 }
659 
660 // should have mCache.mLock
evictCaches(bool clearCache)661 void BufferPoolClient::Impl::evictCaches(bool clearCache) {
662     int64_t now = ::android::elapsedRealtime();
663     if (now >= mLastEvictCacheMs + kCacheTtlMs ||
664             clearCache || mCache.cachedBufferCount() > kMaxCachedBufferCount) {
665         size_t evicted = 0;
666         for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
667             if (!it->second->hasCache() && (it->second->expire() ||
668                         clearCache || mCache.cachedBufferCount() > kCachedBufferCountTarget)) {
669                 it = mCache.mBuffers.erase(it);
670                 ++evicted;
671             } else {
672                 ++it;
673             }
674         }
675         ALOGV("cache count %lld : total %zu, active %d, evicted %zu",
676               (long long)mConnectionId, mCache.mBuffers.size(), mCache.mActive, evicted);
677         mLastEvictCacheMs = now;
678     }
679 }
680 
681 // should have mCache.mLock
invalidateBuffer(BufferId id)682 void BufferPoolClient::Impl::invalidateBuffer(BufferId id) {
683     for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end(); ++it) {
684         if (id == it->second->id()) {
685             if (!it->second->hasCache()) {
686                 mCache.mBuffers.erase(it);
687                 ALOGV("cache invalidated %lld : buffer %u",
688                       (long long)mConnectionId, id);
689             } else {
690                 ALOGW("Inconsistent invalidation %lld : activer buffer!! %u",
691                       (long long)mConnectionId, (unsigned int)id);
692             }
693             break;
694         }
695     }
696 }
697 
698 // should have mCache.mLock
invalidateRange(BufferId from,BufferId to)699 void BufferPoolClient::Impl::invalidateRange(BufferId from, BufferId to) {
700     size_t invalidated = 0;
701     for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
702         if (!it->second->hasCache()) {
703             BufferId bid = it->second->id();
704             if (from < to) {
705                 if (from <= bid && bid < to) {
706                     ++invalidated;
707                     it = mCache.mBuffers.erase(it);
708                     continue;
709                 }
710             } else {
711                 if (from <= bid || bid < to) {
712                     ++invalidated;
713                     it = mCache.mBuffers.erase(it);
714                     continue;
715                 }
716             }
717         }
718         ++it;
719     }
720     ALOGV("cache invalidated %lld : # of invalidated %zu",
721           (long long)mConnectionId, invalidated);
722 }
723 
allocateBufferHandle(const std::vector<uint8_t> & params,BufferId * bufferId,native_handle_t ** handle)724 BufferPoolStatus BufferPoolClient::Impl::allocateBufferHandle(
725         const std::vector<uint8_t>& params, BufferId *bufferId,
726         native_handle_t** handle) {
727     if (mLocalConnection) {
728         const native_handle_t* allocHandle = nullptr;
729         BufferPoolStatus status = mLocalConnection->allocate(
730                 params, bufferId, &allocHandle);
731         if (status == ResultStatus::OK) {
732             *handle = native_handle_clone(allocHandle);
733         }
734         ALOGV("client allocate result %lld %d : %u clone %p",
735               (long long)mConnectionId, status == ResultStatus::OK,
736               *handle ? *bufferId : 0 , *handle);
737         return status;
738     }
739     return ResultStatus::CRITICAL_ERROR;
740 }
741 
fetchBufferHandle(TransactionId transactionId,BufferId bufferId,native_handle_t ** handle)742 BufferPoolStatus BufferPoolClient::Impl::fetchBufferHandle(
743         TransactionId transactionId, BufferId bufferId,
744         native_handle_t **handle) {
745     std::shared_ptr<IConnection> connection;
746     if (mLocal) {
747         connection = mLocalConnection;
748     } else {
749         connection = mRemoteConnection;
750     }
751     if (!connection) {
752         ALOGE("connection null: fetchBufferHandle()");
753         return ResultStatus::CRITICAL_ERROR;
754     }
755     std::vector<FetchInfo> infos;
756     std::vector<FetchResult> results;
757     infos.emplace_back(FetchInfo{ToAidl(transactionId), ToAidl(bufferId)});
758     ndk::ScopedAStatus status = connection->fetch(infos, &results);
759     if (!status.isOk()) {
760         BufferPoolStatus svcSpecific = status.getServiceSpecificError();
761         return svcSpecific ? svcSpecific : ResultStatus::CRITICAL_ERROR;
762     }
763     if (results[0].getTag() == FetchResult::buffer) {
764         if (results[0].get<FetchResult::buffer>().buffer.has_value()) {
765             *handle = ::android::dupFromAidl(results[0].get<FetchResult::buffer>().buffer.value());
766         } else {
767             // TODO: Support HardwareBuffer
768             ALOGW("handle nullptr");
769             *handle = nullptr;
770         }
771         return ResultStatus::OK;
772     }
773     return results[0].get<FetchResult::failure>();
774 }
775 
776 
BufferPoolClient(const std::shared_ptr<Accessor> & accessor,const std::shared_ptr<IObserver> & observer)777 BufferPoolClient::BufferPoolClient(const std::shared_ptr<Accessor> &accessor,
778                                    const std::shared_ptr<IObserver> &observer) {
779     mImpl = std::make_shared<Impl>(accessor, observer);
780 }
781 
BufferPoolClient(const std::shared_ptr<IAccessor> & accessor,const std::shared_ptr<IObserver> & observer)782 BufferPoolClient::BufferPoolClient(const std::shared_ptr<IAccessor> &accessor,
783                                    const std::shared_ptr<IObserver> &observer) {
784     mImpl = std::make_shared<Impl>(accessor, observer);
785 }
786 
~BufferPoolClient()787 BufferPoolClient::~BufferPoolClient() {
788     // TODO: how to handle orphaned buffers?
789 }
790 
isValid()791 bool BufferPoolClient::isValid() {
792     return mImpl && mImpl->isValid();
793 }
794 
isLocal()795 bool BufferPoolClient::isLocal() {
796     return mImpl && mImpl->isLocal();
797 }
798 
isActive(int64_t * lastTransactionMs,bool clearCache)799 bool BufferPoolClient::isActive(int64_t *lastTransactionMs, bool clearCache) {
800     if (!isValid()) {
801         *lastTransactionMs = 0;
802         return false;
803     }
804     return mImpl->isActive(lastTransactionMs, clearCache);
805 }
806 
getConnectionId()807 ConnectionId BufferPoolClient::getConnectionId() {
808     if (isValid()) {
809         return mImpl->getConnectionId();
810     }
811     return -1;
812 }
813 
getAccessor(std::shared_ptr<IAccessor> * accessor)814 BufferPoolStatus BufferPoolClient::getAccessor(std::shared_ptr<IAccessor> *accessor) {
815     if (isValid()) {
816         *accessor = mImpl->getAccessor();
817         return ResultStatus::OK;
818     }
819     return ResultStatus::CRITICAL_ERROR;
820 }
821 
receiveInvalidation(uint32_t msgId)822 void BufferPoolClient::receiveInvalidation(uint32_t msgId) {
823     ALOGV("bufferpool2 client recv inv %u", msgId);
824     if (isValid()) {
825         mImpl->receiveInvalidation(msgId);
826     }
827 }
828 
flush()829 BufferPoolStatus BufferPoolClient::flush() {
830     if (isValid()) {
831         return mImpl->flush();
832     }
833     return ResultStatus::CRITICAL_ERROR;
834 }
835 
allocate(const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)836 BufferPoolStatus BufferPoolClient::allocate(
837         const std::vector<uint8_t> &params,
838         native_handle_t **handle,
839         std::shared_ptr<BufferPoolData> *buffer) {
840     if (isValid()) {
841         return mImpl->allocate(params, handle, buffer);
842     }
843     return ResultStatus::CRITICAL_ERROR;
844 }
845 
receive(TransactionId transactionId,BufferId bufferId,int64_t timestampMs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)846 BufferPoolStatus BufferPoolClient::receive(
847         TransactionId transactionId, BufferId bufferId, int64_t timestampMs,
848         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
849     if (isValid()) {
850         return mImpl->receive(transactionId, bufferId, timestampMs, handle, buffer);
851     }
852     return ResultStatus::CRITICAL_ERROR;
853 }
854 
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampMs)855 BufferPoolStatus BufferPoolClient::postSend(
856         ConnectionId receiverId,
857         const std::shared_ptr<BufferPoolData> &buffer,
858         TransactionId *transactionId,
859         int64_t *timestampMs) {
860     if (isValid()) {
861         bool result = mImpl->postSend(
862                 buffer->mId, receiverId, transactionId, timestampMs);
863         return result ? ResultStatus::OK : ResultStatus::CRITICAL_ERROR;
864     }
865     return ResultStatus::CRITICAL_ERROR;
866 }
867 
868 }  // namespace aidl::android::hardware::media::bufferpool2::implementation
869