1 // Copyright 2012 The Chromium Authors 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ 6 #define BASE_OBSERVER_LIST_THREADSAFE_H_ 7 8 #include <unordered_map> 9 #include <utility> 10 11 #include "base/auto_reset.h" 12 #include "base/base_export.h" 13 #include "base/check.h" 14 #include "base/check_op.h" 15 #include "base/containers/contains.h" 16 #include "base/dcheck_is_on.h" 17 #include "base/debug/stack_trace.h" 18 #include "base/functional/bind.h" 19 #include "base/location.h" 20 #include "base/memory/raw_ptr.h" 21 #include "base/memory/ref_counted.h" 22 #include "base/observer_list.h" 23 #include "base/strings/strcat.h" 24 #include "base/synchronization/lock.h" 25 #include "base/task/sequenced_task_runner.h" 26 #include "base/task/single_thread_task_runner.h" 27 #include "build/build_config.h" 28 #include "third_party/abseil-cpp/absl/base/attributes.h" 29 30 /////////////////////////////////////////////////////////////////////////////// 31 // 32 // OVERVIEW: 33 // 34 // A thread-safe container for a list of observers. This is similar to the 35 // observer_list (see observer_list.h), but it is more robust for multi- 36 // threaded situations. 37 // 38 // The following use cases are supported: 39 // * Observers can register for notifications from any sequence. They are 40 // always notified on the sequence from which they were registered. 41 // * Any sequence may trigger a notification via Notify(). 42 // * Observers can remove themselves from the observer list inside of a 43 // callback. 44 // * If one sequence is notifying observers concurrently with an observer 45 // removing itself from the observer list, the notifications will be 46 // silently dropped. However if the observer is currently inside a 47 // notification callback, the callback will finish running. 48 // 49 // By default, observers can be removed from any sequence. However this can be 50 // error-prone since an observer may be running a callback when it's removed, 51 // in which case it isn't safe to delete until the callback is finished. 52 // Consider using the RemoveObserverPolicy::kAddingSequenceOnly template 53 // parameter, which will CHECK that observers are only removed from the 54 // sequence where they were added (which is also the sequence that runs 55 // callbacks). 56 // 57 // The drawback of the threadsafe observer list is that notifications are not 58 // as real-time as the non-threadsafe version of this class. Notifications 59 // will always be done via PostTask() to another sequence, whereas with the 60 // non-thread-safe ObserverList, notifications happen synchronously. 61 // 62 // Note: this class previously supported synchronous notifications for 63 // same-sequence observers, but it was error-prone and removed in 64 // crbug.com/1193750, think twice before re-considering this paradigm. 65 // 66 /////////////////////////////////////////////////////////////////////////////// 67 68 namespace base { 69 namespace internal { 70 71 class BASE_EXPORT ObserverListThreadSafeBase 72 : public RefCountedThreadSafe<ObserverListThreadSafeBase> { 73 public: 74 struct NotificationDataBase { NotificationDataBaseNotificationDataBase75 NotificationDataBase(void* observer_list_in, const Location& from_here_in) 76 : observer_list(observer_list_in), from_here(from_here_in) {} 77 78 raw_ptr<void> observer_list; 79 Location from_here; 80 }; 81 82 ObserverListThreadSafeBase() = default; 83 ObserverListThreadSafeBase(const ObserverListThreadSafeBase&) = delete; 84 ObserverListThreadSafeBase& operator=(const ObserverListThreadSafeBase&) = 85 delete; 86 87 protected: 88 template <typename ObserverType, typename Method> 89 struct Dispatcher; 90 91 template <typename ObserverType, typename ReceiverType, typename... Params> 92 struct Dispatcher<ObserverType, void (ReceiverType::*)(Params...)> { 93 static void Run(void (ReceiverType::*m)(Params...), 94 Params... params, 95 ObserverType* obj) { 96 (obj->*m)(std::forward<Params>(params)...); 97 } 98 }; 99 100 static const NotificationDataBase*& GetCurrentNotification(); 101 102 virtual ~ObserverListThreadSafeBase() = default; 103 104 private: 105 friend class RefCountedThreadSafe<ObserverListThreadSafeBase>; 106 }; 107 108 } // namespace internal 109 110 enum class RemoveObserverPolicy { 111 // Observers can be removed from any sequence. 112 kAnySequence, 113 // Observers can only be removed from the sequence that added them. 114 kAddingSequenceOnly, 115 }; 116 117 template <class ObserverType, 118 RemoveObserverPolicy RemovePolicy = 119 RemoveObserverPolicy::kAnySequence> 120 class ObserverListThreadSafe : public internal::ObserverListThreadSafeBase { 121 using Self = ObserverListThreadSafe<ObserverType, RemovePolicy>; 122 123 public: 124 enum class AddObserverResult { 125 kBecameNonEmpty, 126 kWasAlreadyNonEmpty, 127 }; 128 enum class RemoveObserverResult { 129 kWasOrBecameEmpty, 130 kRemainsNonEmpty, 131 }; 132 133 ObserverListThreadSafe() = default; 134 explicit ObserverListThreadSafe(ObserverListPolicy policy) 135 : policy_(policy) {} 136 ObserverListThreadSafe(const ObserverListThreadSafe&) = delete; 137 ObserverListThreadSafe& operator=(const ObserverListThreadSafe&) = delete; 138 139 // Adds |observer| to the list. |observer| must not already be in the list. 140 AddObserverResult AddObserver(ObserverType* observer) { 141 DCHECK(SequencedTaskRunner::HasCurrentDefault()) 142 << "An observer can only be registered when " 143 "SequencedTaskRunner::HasCurrentDefault. If this is in a unit test, " 144 "you're likely merely missing a " 145 "base::test::(SingleThread)TaskEnvironment in your fixture. " 146 "Otherwise, try running this code on a named thread (main/UI/IO) or " 147 "from a task posted to a base::SequencedTaskRunner or " 148 "base::SingleThreadTaskRunner."; 149 150 AutoLock auto_lock(lock_); 151 152 bool was_empty = observers_.empty(); 153 154 // Add |observer| to the list of observers. 155 DCHECK(!Contains(observers_, observer)); 156 const scoped_refptr<SequencedTaskRunner> task_runner = 157 SequencedTaskRunner::GetCurrentDefault(); 158 // Each observer gets a unique identifier. These unique identifiers are used 159 // to avoid execution of pending posted-tasks over removed or released 160 // observers. 161 const size_t observer_id = ++observer_id_counter_; 162 #if DCHECK_IS_ON() 163 ObserverTaskRunnerInfo task_info = {task_runner, base::debug::StackTrace(), 164 observer_id}; 165 #else 166 ObserverTaskRunnerInfo task_info = {task_runner, observer_id}; 167 #endif 168 observers_[observer] = std::move(task_info); 169 170 // If this is called while a notification is being dispatched on this thread 171 // and |policy_| is ALL, |observer| must be notified (if a notification is 172 // being dispatched on another thread in parallel, the notification may or 173 // may not make it to |observer| depending on the outcome of the race to 174 // |lock_|). 175 if (policy_ == ObserverListPolicy::ALL) { 176 if (const NotificationDataBase* const current_notification = 177 GetCurrentNotification(); 178 current_notification && current_notification->observer_list == this) { 179 const NotificationData* notification_data = 180 static_cast<const NotificationData*>(current_notification); 181 task_runner->PostTask( 182 current_notification->from_here, 183 BindOnce(&Self::NotifyWrapper, this, 184 // While `observer` may be dangling, we pass it and 185 // check it wasn't deallocated in NotifyWrapper() which can 186 // check `observers_` to verify presence (the owner of the 187 // observer is responsible for removing it from that list 188 // before deallocation). 189 UnsafeDangling(observer), 190 NotificationData(this, observer_id, 191 current_notification->from_here, 192 notification_data->method))); 193 } 194 } 195 196 return was_empty ? AddObserverResult::kBecameNonEmpty 197 : AddObserverResult::kWasAlreadyNonEmpty; 198 } 199 200 // Remove an observer from the list if it is in the list. 201 // 202 // If a notification was sent to the observer but hasn't started to run yet, 203 // it will be aborted. If a notification has started to run, removing the 204 // observer won't stop it. 205 RemoveObserverResult RemoveObserver(ObserverType* observer) { 206 AutoLock auto_lock(lock_); 207 if constexpr (RemovePolicy == RemoveObserverPolicy::kAddingSequenceOnly) { 208 const auto it = observers_.find(observer); 209 CHECK(it == observers_.end() || 210 it->second.task_runner->RunsTasksInCurrentSequence()); 211 } 212 observers_.erase(observer); 213 return observers_.empty() ? RemoveObserverResult::kWasOrBecameEmpty 214 : RemoveObserverResult::kRemainsNonEmpty; 215 } 216 217 // Verifies that the list is currently empty (i.e. there are no observers). 218 void AssertEmpty() const { 219 #if DCHECK_IS_ON() 220 AutoLock auto_lock(lock_); 221 bool observers_is_empty = observers_.empty(); 222 DUMP_WILL_BE_CHECK(observers_is_empty) 223 << "\n" 224 << GetObserversCreationStackStringLocked(); 225 #endif 226 } 227 228 // Asynchronously invokes a callback on all observers, on their registration 229 // sequence. You cannot assume that at the completion of the Notify call that 230 // all Observers have been Notified. The notification may still be pending 231 // delivery. 232 template <typename Method, typename... Params> 233 void Notify(const Location& from_here, Method m, Params&&... params) { 234 RepeatingCallback<void(ObserverType*)> method = 235 BindRepeating(&Dispatcher<ObserverType, Method>::Run, m, 236 std::forward<Params>(params)...); 237 238 AutoLock lock(lock_); 239 for (const auto& observer : observers_) { 240 observer.second.task_runner->PostTask( 241 from_here, 242 BindOnce(&Self::NotifyWrapper, this, 243 // While `observer.first` may be dangling, we pass it and 244 // check it wasn't deallocated in NotifyWrapper() which can 245 // check `observers_` to verify presence (the owner of the 246 // observer is responsible for removing it from that list 247 // before deallocation). 248 UnsafeDangling(observer.first), 249 NotificationData(this, observer.second.observer_id, 250 from_here, method))); 251 } 252 } 253 254 private: 255 friend class RefCountedThreadSafe<ObserverListThreadSafeBase>; 256 257 struct NotificationData : public NotificationDataBase { 258 NotificationData(ObserverListThreadSafe* observer_list_in, 259 size_t observer_id_in, 260 const Location& from_here_in, 261 const RepeatingCallback<void(ObserverType*)>& method_in) 262 : NotificationDataBase(observer_list_in, from_here_in), 263 method(method_in), 264 observer_id(observer_id_in) {} 265 266 RepeatingCallback<void(ObserverType*)> method; 267 size_t observer_id; 268 }; 269 270 ~ObserverListThreadSafe() override = default; 271 272 void NotifyWrapper(MayBeDangling<ObserverType> observer, 273 const NotificationData& notification) { 274 { 275 AutoLock auto_lock(lock_); 276 277 // Check whether the observer still needs a notification. 278 DCHECK_EQ(notification.observer_list, this); 279 auto it = observers_.find(observer); 280 if (it == observers_.end() || 281 it->second.observer_id != notification.observer_id) { 282 return; 283 } 284 DCHECK(it->second.task_runner->RunsTasksInCurrentSequence()); 285 } 286 287 // Keep track of the notification being dispatched on the current thread. 288 // This will be used if the callback below calls AddObserver(). 289 // 290 // Note: GetCurrentNotification() may not return null if this runs in a 291 // nested loop started by a notification callback. In that case, it is 292 // important to save the previous value to restore it later. 293 const AutoReset<const NotificationDataBase*> resetter_( 294 &GetCurrentNotification(), ¬ification); 295 296 // Invoke the callback. 297 notification.method.Run(observer); 298 } 299 300 std::string GetObserversCreationStackStringLocked() const 301 EXCLUSIVE_LOCKS_REQUIRED(lock_) { 302 std::string result; 303 #if DCHECK_IS_ON() 304 for (const auto& observer : observers_) { 305 StrAppend(&result, 306 {observer.second.add_observer_stack_.ToString(), "\n"}); 307 } 308 #endif 309 return result; 310 } 311 312 const ObserverListPolicy policy_ = ObserverListPolicy::ALL; 313 314 mutable Lock lock_; 315 316 size_t observer_id_counter_ GUARDED_BY(lock_) = 0; 317 318 struct ObserverTaskRunnerInfo { 319 scoped_refptr<SequencedTaskRunner> task_runner; 320 #if DCHECK_IS_ON() 321 base::debug::StackTrace add_observer_stack_; 322 #endif 323 size_t observer_id = 0; 324 }; 325 326 // Keys are observers. Values are the SequencedTaskRunners on which they must 327 // be notified. 328 std::unordered_map<ObserverType*, ObserverTaskRunnerInfo> observers_ 329 GUARDED_BY(lock_); 330 }; 331 332 } // namespace base 333 334 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ 335