1 //
2 // Copyright © 2019 Arm Ltd. All rights reserved.
3 // SPDX-License-Identifier: MIT
4 //
5
6 #include "BufferManager.hpp"
7 #include "PacketBuffer.hpp"
8
9 namespace arm
10 {
11
12 namespace pipe
13 {
14
BufferManager(unsigned int numberOfBuffers,unsigned int maxPacketSize)15 BufferManager::BufferManager(unsigned int numberOfBuffers, unsigned int maxPacketSize)
16 : m_MaxBufferSize(maxPacketSize),
17 m_NumberOfBuffers(numberOfBuffers),
18 m_MaxNumberOfBuffers(numberOfBuffers * 3),
19 m_CurrentNumberOfBuffers(numberOfBuffers)
20 {
21 Initialize();
22 }
23
Reserve(unsigned int requestedSize,unsigned int & reservedSize)24 IPacketBufferPtr BufferManager::Reserve(unsigned int requestedSize, unsigned int& reservedSize)
25 {
26 reservedSize = 0;
27 #if !defined(ARMNN_DISABLE_THREADS)
28 std::unique_lock<std::mutex> availableListLock(m_AvailableMutex, std::defer_lock);
29 #endif
30 if (requestedSize > m_MaxBufferSize)
31 {
32 return nullptr;
33 }
34 #if !defined(ARMNN_DISABLE_THREADS)
35 availableListLock.lock();
36 #endif
37 if (m_AvailableList.empty())
38 {
39 if (m_CurrentNumberOfBuffers < m_MaxNumberOfBuffers)
40 {
41 // create a temporary overflow/surge buffer and hand it back
42 m_CurrentNumberOfBuffers++;
43 #if !defined(ARMNN_DISABLE_THREADS)
44 availableListLock.unlock();
45 #endif
46 IPacketBufferPtr buffer = std::make_unique<PacketBuffer>(m_MaxBufferSize);
47 reservedSize = requestedSize;
48 return buffer;
49 }
50 else
51 {
52 // we have totally busted the limit. call a halt to new memory allocations.
53 #if !defined(ARMNN_DISABLE_THREADS)
54 availableListLock.unlock();
55 #endif
56 return nullptr;
57 }
58 }
59 IPacketBufferPtr buffer = std::move(m_AvailableList.back());
60 m_AvailableList.pop_back();
61 #if !defined(ARMNN_DISABLE_THREADS)
62 availableListLock.unlock();
63 #endif
64 reservedSize = requestedSize;
65 return buffer;
66 }
67
Commit(IPacketBufferPtr & packetBuffer,unsigned int size,bool notifyConsumer)68 void BufferManager::Commit(IPacketBufferPtr& packetBuffer, unsigned int size, bool notifyConsumer)
69 {
70 #if !defined(ARMNN_DISABLE_THREADS)
71 std::unique_lock<std::mutex> readableListLock(m_ReadableMutex, std::defer_lock);
72 #endif
73 packetBuffer->Commit(size);
74 #if !defined(ARMNN_DISABLE_THREADS)
75 readableListLock.lock();
76 #endif
77 m_ReadableList.push(std::move(packetBuffer));
78 #if !defined(ARMNN_DISABLE_THREADS)
79 readableListLock.unlock();
80 #endif
81 if (notifyConsumer)
82 {
83 FlushReadList();
84 }
85 }
86
Initialize()87 void BufferManager::Initialize()
88 {
89 m_AvailableList.reserve(m_NumberOfBuffers);
90 m_CurrentNumberOfBuffers = m_NumberOfBuffers;
91 for (unsigned int i = 0; i < m_NumberOfBuffers; ++i)
92 {
93 IPacketBufferPtr buffer = std::make_unique<PacketBuffer>(m_MaxBufferSize);
94 m_AvailableList.emplace_back(std::move(buffer));
95 }
96 }
97
Release(IPacketBufferPtr & packetBuffer)98 void BufferManager::Release(IPacketBufferPtr& packetBuffer)
99 {
100 #if !defined(ARMNN_DISABLE_THREADS)
101 std::unique_lock<std::mutex> availableListLock(m_AvailableMutex, std::defer_lock);
102 #endif
103 packetBuffer->Release();
104 #if !defined(ARMNN_DISABLE_THREADS)
105 availableListLock.lock();
106 #endif
107 if (m_AvailableList.size() <= m_NumberOfBuffers)
108 {
109 m_AvailableList.push_back(std::move(packetBuffer));
110 }
111 else
112 {
113 // we have been handed a temporary overflow/surge buffer get rid of it
114 packetBuffer->Destroy();
115 if (m_CurrentNumberOfBuffers > m_NumberOfBuffers)
116 {
117 --m_CurrentNumberOfBuffers;
118 }
119 }
120 #if !defined(ARMNN_DISABLE_THREADS)
121 availableListLock.unlock();
122 #endif
123 }
124
Reset()125 void BufferManager::Reset()
126 {
127 //This method should only be called once all threads have been joined
128 #if !defined(ARMNN_DISABLE_THREADS)
129 std::lock_guard<std::mutex> readableListLock(m_ReadableMutex);
130 std::lock_guard<std::mutex> availableListLock(m_AvailableMutex);
131 #endif
132
133 m_AvailableList.clear();
134 std::queue<IPacketBufferPtr>().swap(m_ReadableList);
135
136 Initialize();
137 }
138
GetReadableBuffer()139 IPacketBufferPtr BufferManager::GetReadableBuffer()
140 {
141 #if !defined(ARMNN_DISABLE_THREADS)
142 std::unique_lock<std::mutex> readableListLock(m_ReadableMutex);
143 #endif
144 if (!m_ReadableList.empty())
145 {
146 IPacketBufferPtr buffer = std::move(m_ReadableList.front());
147 m_ReadableList.pop();
148 #if !defined(ARMNN_DISABLE_THREADS)
149 readableListLock.unlock();
150 #endif
151 return buffer;
152 }
153 return nullptr;
154 }
155
MarkRead(IPacketBufferPtr & packetBuffer)156 void BufferManager::MarkRead(IPacketBufferPtr& packetBuffer)
157 {
158 #if !defined(ARMNN_DISABLE_THREADS)
159 std::unique_lock<std::mutex> availableListLock(m_AvailableMutex, std::defer_lock);
160 #endif
161 packetBuffer->MarkRead();
162 #if !defined(ARMNN_DISABLE_THREADS)
163 availableListLock.lock();
164 #endif
165 if (m_AvailableList.size() <= m_NumberOfBuffers)
166 {
167 m_AvailableList.push_back(std::move(packetBuffer));
168 }
169 else
170 {
171 // we have been handed a temporary overflow/surge buffer get rid of it
172 packetBuffer->Destroy();
173 if (m_CurrentNumberOfBuffers > m_NumberOfBuffers)
174 {
175 --m_CurrentNumberOfBuffers;
176 }
177 }
178 #if !defined(ARMNN_DISABLE_THREADS)
179 availableListLock.unlock();
180 #endif
181 }
182
SetConsumer(IConsumer * consumer)183 void BufferManager::SetConsumer(IConsumer* consumer)
184 {
185 m_Consumer = consumer;
186 }
187
FlushReadList()188 void BufferManager::FlushReadList()
189 {
190 // notify consumer that packet is ready to read
191 if (m_Consumer != nullptr)
192 {
193 m_Consumer->SetReadyToRead();
194 }
195 }
196
197 } // namespace pipe
198
199 } // namespace arm
200