c++11智能指针的线程安全问题以及实现

智能指针: 究竟是个什么东西?

c++11中提供了智能指针, 它主要就是维持了引用计数, 在引用计数值为0时可以安全析构。shared_ptr是个值语义, 在它析构时, 引用计数会减小, 直到计数减为0时, 才会delete智能指针里面保存的指针本体。

如果在单线程程序中使用智能指针, 它的作用是用来减轻程序员的心智负担。虽然有一种最佳实践叫”谁new的谁delete”, 但是不是每个程序员都足够细心, 粗心的程序员也许可能忘记delete某个指针而造成内存泄漏。

如果在多线程中使用智能指针, 它可以用来管理生命周期模糊不清的对象。比如在陈硕的muduo框架中, tcp连接对象被智能指针管理, 因为用户可能保存这个指针, 所以它的生命周期模糊不清, 此时最好的做法就是用引用计数自动管理。

使用智能指针管理内存, 在c++达到的效果是, 一旦某块内存不再被引用, 就刻不容缓地向操作系统归还这块内存。相比gc语言用完随处乱扔, 然后在某个时间点STW(Stop The World)统一回收大量内存, c++的做法不会造成业务停摆, 效果非常稳定。在即时性很高的应用中, STW是不可被容忍的, c++的优势正在于此。

shared_ptr的线程安全级别

抛砖引玉, 考虑下面的代码的安全性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <memory>
#include <thread>
#include <cstdio>

class Test {
public:
Test() {
}

void DoSomething() {
::printf("do something %d\n", std::this_thread::get_id());
}

~Test() {
::printf("析构\n");
}
};

std::shared_ptr<Test> p(new Test);

int main() {
auto threadFunc = []() {
long i = 100000;
while(1) {
std::shared_ptr<Test> s(p);
s->DoSomething();
if (i-- == 0) {
break;
}
}
};

std::thread t1(threadFunc);
std::thread t2(threadFunc);
t1.join();
t2.join();
::printf("%d\n", int(p.unique()));
}

我们运行代码, 发现最终打印的值为1, 即在很多次操作后, p变成独享的了。那么从实验结果来说, 对于同一个shared_ptr并发读是安全的。那么其它更多情况呢? 是否在任何情况下对同一个shared_ptr的操作都安全? 下面我们借助陈硕的说法来解答问题:

虽然我们借shared_ptr来实现线程安全的对象释放, 但是shared_ptr本身不是100%线程安全的, 它的引用计数本身是安全且无锁的, 但对象的读写不是, 因为shared_ptr有两个数据成员, 读写操作不能原子化。根据”文档”(https://www.boost.org/doc/libs/1_81_0/libs/smart_ptr/doc/html/smart_ptr.html#shared_ptr_thread_safety),shared_ptr的线程安全级别和内建类型,标准库容器,std::string一样, 即: 1.一个shared_ptr对象实体可以被多个线程同时读取。2.两个shared_ptr对象实体可以被两个线程同时写入(析构算写操作)。3.如果要从多个线程读写同一个shared_ptr对象, 那么需要加锁。

下面是”文档”讨论的很多种情况, 背景代码如下:

1
shared_ptr<int> p(new int(42));

两个线程同时从一个shared_ptr读取, 安全:

1
2
3
4
5
// thread A
shared_ptr<int> p2(p); // reads p

// thread B
shared_ptr<int> p3(p); // OK, multiple reads are safe

两个线程分别用两个不同的shared_ptr写, 安全:

1
2
3
4
5
// thread A
p.reset(new int(1912)); // writes p

// thread B
p2.reset(); // OK, writes p2

不同线程从同一个shared_ptr读写, 未定义:

1
2
3
4
5
// thread A
p = p3; // reads p3, writes p

// thread B
p3.reset(); // writes p3; undefined, simultaneous read/write

两个线程, 一个读, 一个正在析构同一个shared_ptr, 未定义:

1
2
3
4
5
// thread A
p3 = p2; // reads p2, writes p3

// thread B
// p2 goes out of scope: undefined, the destructor is considered a "write access"

两个线程, 一起写同一个shared_ptr, 未定义:

1
2
3
4
5
// thread A
p3.reset(new int(1));

// thread B
p3.reset(new int(2)); // undefined, multiple writes

简而言之, 对于同一个shared_ptr, 除了并发读, 其它的情况都要加锁。这个和c++的其它容器表现是差不多的。

那么我们如何从一个线程把一个对象共享出去, 给另外一个线程使用呢?

下面给一个代码示例, 展示了一个生产者消费者队列, product的生命周期由shared_ptr管理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <memory>
#include <thread>
#include <cstdio>
#include <list>
#include <vector>
#include <mutex>
#include <condition_variable>


std::list<std::shared_ptr<int>> productQueue;
std::mutex mutex;
std::condition_variable cond;

void consumer() {
while(1) {
std::unique_lock ul(mutex);
if(productQueue.empty()) {
cond.wait(ul);
continue;
} else {
auto sp = productQueue.front();
productQueue.pop_front();
::printf("%d\n", *sp);
}
}
}

int main() {
std::thread t(consumer);
t.detach();

int i = 0;
while(1) {
std::shared_ptr<int> p(new int(i++));
std::this_thread::sleep_for(std::chrono::seconds(1));
mutex.lock();
productQueue.push_back(p);
cond.notify_one();
mutex.unlock();
}
}

避免意外延长生命周期

陈硕在muduo一书中提到了shared_ptr的技术陷阱, 意外延长生命周期可能造成内存泄漏, 在长期运行的系统中, 这很不可取。在容器中永久保存shared_ptr是一种典型的错误。此外还要注意的是用bind绑定参数的情况, 如果参数中有shared_ptr, 那么返回的Functor也相当于保存了shared_ptr, 此时如果我们永久地保存Functor, 对象的生命周期也将意外地被延长。

要解决这个问题, 保存weak_ptr是一种很好的策略, 使用weak_ptr能尝试性的探查对象是否消亡, 结合弱回调技术清理掉weak_ptr, 就能完全解决内存泄漏的问题(这里留下一个小问题, 如果永久保存weak_ptr, 会造成轻微的内存泄漏, 具体原因后面再谈), 借此实现线程安全的observer(详细请参见陈硕的muduo一书)。

shared_ptr的实现

shared_ptr要考虑到下面几个方面:

  1. 支持两个计数: 即为weak_ptr计数和shared_ptr计数
  2. 支持shared_from_this
  3. 符合上面提到的线程安全
  4. 支持自定义删除器

首先我们先不考虑shared_from_this, 仅仅满足1, 3两个方面, 之后再来考虑2(为了简便我们不支持4以及一些无关紧要的接口, 比如swap)。一般的实现中, 用的是原子操作, 为了简便我们使用锁来实现, 要实现这样的一个智能指针, 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
#include <mutex>

/* SharedPtr的状态
状态1: 保存的指针是nullptr, 此时两个refCount以及mutex都是nullptr
状态2: 保存的指针不是nullptr, 此时两个refCount以及mutex都不是nullptr
*/

template <typename T>
class WeakPtr;

template<typename T>
class SharedPtr {
public:
SharedPtr(T *ptr = nullptr): _ptr(ptr) {
if (ptr != nullptr) {
// 创建控制块
_sharedRefCount = new int(1);
_weakRefCount = new int(0);
_mutex = new std::mutex;
}
// 如果ptr为nullptr, 不需要控制块
// else {}
}

// 此处保证要求传参一个SharedPtr, 因此可以保证安全复制
SharedPtr(const SharedPtr &anotherPtr):
_ptr(anotherPtr._ptr), _sharedRefCount(anotherPtr._sharedRefCount),
_weakRefCount(anotherPtr._weakRefCount), _mutex(anotherPtr._mutex)
{
if(anotherPtr._ptr != nullptr) {
std::lock_guard<std::mutex> guard(*_mutex);
(*_sharedRefCount) ++;
}
// 当另外一个sharedPtr为nullptr时什么也不做
// else {}
}

SharedPtr(SharedPtr &&sptr) {
_ptr = sptr._ptr;
_sharedRefCount = sptr._sharedRefCount;
_weakRefCount = sptr._weakRefCount;
_mutex = sptr._mutex;
}

~SharedPtr() {
if(_ptr != nullptr) {
int sharedCount, weakCount;
{
std::lock_guard<std::mutex> guard(*_mutex);
(*_sharedRefCount) --;
sharedCount = *_sharedRefCount;
weakCount = *_weakRefCount;
}
// 两个计数都为0才清空控制快
if (sharedCount == 0 && weakCount == 0) {
delete _ptr;
delete _sharedRefCount;
delete _weakRefCount;
delete _mutex;
}
}
// 如果自身保存的_ptr为nullptr则什么也不用做
// else {}
}

// 重置SharedPtr的状态, 此时需要递减Shared计数器
void reset() {
// 当保存的_ptr为nullptr的时候, 什么也不用管
if(_ptr != nullptr) {
int sharedCount, weakCount;
{
std::lock_guard<std::mutex> guard(*_mutex);
(*_sharedRefCount) --;
sharedCount = *_sharedRefCount;
weakCount = *_weakRefCount;
}
// 引用计数都为0, 清空控制块并delete保存的指针
if (sharedCount == 0 && weakCount == 0) {
delete _ptr;
delete _sharedRefCount;
delete _weakRefCount;
delete _mutex;
_ptr = _sharedRefCount = _weakRefCount = _mutex = nullptr;
}
}
// 当自身保存的_ptr为nullptr时什么都不用做
// else {}
}

T *get() const {
return _ptr;
}

int useCount() const {
int ret;
if (_ptr == nullptr) {
ret = 0;
} else {
std::lock_guard<std::mutex> guard(*_mutex);
ret = *_sharedRefCount;
}
return ret;
}

bool unique() {
int use = useCount();
// useCount为0时, 视为unique
return use == 1 || use == 0;
}

operator bool() const {
return _ptr != nullptr;
}

T &operator*() const {
return *_ptr;
}

T *operator->() const {
return _ptr;
}

SharedPtr& operator=(SharedPtr &targetPtr) {
// 当两个指针都是nullptr时什么也不做
if(targetPtr._ptr == nullptr && _ptr == nullptr) {
// empty
// 当targetPtr是nullptr, 自身不是nullptr时, 要考虑一下释放控制块
} else if (targetPtr._ptr == nullptr && _ptr != nullptr) {
int sharedCount, weakCount;
{
std::lock_guard<std::mutex> guard(*_mutex);
(*_sharedRefCount) --;
sharedCount = *_sharedRefCount;
weakCount = *_weakRefCount;
}
if (sharedCount == 0 && weakCount == 0) {
delete _ptr;
delete _sharedRefCount;
delete _weakRefCount;
delete _mutex;
}
_ptr = _sharedRefCount = _weakRefCount = _mutex = nullptr;
} else if (targetPtr._ptr != nullptr && _ptr == nullptr) {
_ptr = targetPtr._ptr;
// 拿anotherPtr的控制块
_sharedRefCount = targetPtr._sharedRefCount;
_weakRefCount = targetPtr._weakRefCount;
_mutex = targetPtr._mutex;
std::lock_guard<std::mutex> guard(*_mutex);
(*_sharedRefCount)++;
} else {
int sharedCount, weakCount;
{
std::lock_guard<std::mutex> guard(*_mutex);
(*this->_sharedRefCount) --;
sharedCount = *_sharedRefCount;
weakCount = *_weakRefCount;
}
if (sharedCount == 0 && weakCount == 0) {
delete _ptr;
delete this->_sharedRefCount;
delete this->_weakRefCount;
delete this->_mutex;
}
_ptr = targetPtr._ptr;
_sharedRefCount = targetPtr._sharedRefCount;
_weakRefCount = targetPtr._weakRefCount;
_mutex = targetPtr._mutex;
std::lock_guard<std::mutex> guard(*_mutex);
(*_sharedRefCount) ++;
}

return *this;
}

private:
T *_ptr;

// 下面三个变量被称为控制块
int *_sharedRefCount;
int *_weakRefCount;
std::mutex *_mutex;

// 友元
friend WeakPtr<T>;
};

template <typename T>
class WeakPtr {
public:
WeakPtr() {}

WeakPtr(const SharedPtr<T> &sptr) {
_ptr = sptr._ptr;
_mutex = sptr._mutex;
_sharedRefCount = sptr._sharedRefCount;
_weakRefCount = sptr._weakRefCount;
std::lock_guard<std::mutex> guard(*_mutex);
(*_weakRefCount)++;
}

WeakPtr(const WeakPtr &wptr) {
_ptr = wptr._ptr;
_mutex = wptr._mutex;
_sharedRefCount = wptr._sharedRefCount;
_weakRefCount = wptr._weakRefCount;
std::lock_guard<std::mutex> guard(*_mutex);
(*_weakRefCount)++;
}

WeakPtr(WeakPtr && wptr) {
_ptr = wptr._ptr;
_mutex = wptr._mutex;
_sharedRefCount = wptr._sharedRefCount;
_weakRefCount = wptr._weakRefCount;
}

~WeakPtr() {
if (_ptr != nullptr) {
int sharedCount, weakCount;
{
std::lock_guard<std::mutex> guard(*_mutex);
sharedCount = *_sharedRefCount;
weakCount = *_weakRefCount;
}
if (sharedCount == 0 && weakCount == 0) {
delete _ptr;
delete _mutex;
delete _sharedRefCount;
delete _weakRefCount;
}
}
// 此时什么也不用做
// else {}
}

WeakPtr &operator=(const WeakPtr<T> &wptr) {
if (_ptr == nullptr && wptr._ptr != nullptr) {
_ptr = wptr._ptr;
_mutex = wptr._mutex;
_sharedRefCount = wptr._sharedRefCount;
_weakRefCount = wptr._weakRefCount;
std::lock_guard<std::mutex> guard(*_mutex);
(*_weakRefCount) ++;
// 双方均为nullptr, 不用做任何操作
} else if (_ptr == nullptr && wptr._ptr == nullptr) {
// empty
// 自身不是nullptr, 但是对方是nullptr
} else if (_ptr != nullptr && wptr._ptr == nullptr) {
int sharedCount, weakCount;
{
std::lock_guard<std::mutex> guard(*_mutex);
(*_weakRefCount) --;
sharedCount = *_sharedRefCount;
weakCount = *_weakRefCount;
}
if (sharedCount == 0 && weakCount == 0) {
delete _ptr;
delete _mutex;
delete _sharedRefCount;
delete _weakRefCount;
}
_ptr = _mutex = _sharedRefCount = _weakRefCount = nullptr;
// 自身和对方都不是nullptr
} else {
int sharedCount, weakCount;
{
std::lock_guard<std::mutex> guard(*_mutex);
(*_weakRefCount) --;
sharedCount = *_sharedRefCount;
weakCount = *_weakRefCount;
}
// 双计数为0时清空控制块
if (sharedCount == 0 && weakCount == 0) {
delete _ptr;
delete _mutex;
delete _sharedRefCount;
delete _weakRefCount;
}
_ptr = wptr._ptr;
_mutex = wptr._mutex;
_sharedRefCount = wptr._sharedRefCount;
_weakRefCount = wptr._weakRefCount;
std::lock_guard<std::mutex> guard(*_mutex);
(*_weakRefCount) ++;
}
}

WeakPtr &operator=(const SharedPtr<T> &sptr) {
if (_ptr == nullptr && sptr._ptr != nullptr) {
_ptr = sptr._ptr;
_mutex = sptr._mutex;
_sharedRefCount = sptr._sharedRefCount;
_weakRefCount = sptr._weakRefCount;
std::lock_guard<std::mutex> guard(*_mutex);
(*_weakRefCount)++;
// 都为nullptr什么都不用做
} else if (_ptr == nullptr && sptr._ptr == nullptr) {
// empty
} else if (_ptr != nullptr && sptr._ptr == nullptr) {
int sharedCount, weakCount;
{
std::lock_guard<std::mutex> guard(*_mutex);
(*_weakRefCount) --;
sharedCount = *_sharedRefCount;
weakCount = *_weakRefCount;
}
// 双计数为0时清空控制块
if (sharedCount == 0 && weakCount == 0) {
delete _ptr;
delete _mutex;
delete _sharedRefCount;
delete _weakRefCount;
}
_ptr = _mutex = _sharedRefCount = _weakRefCount = nullptr;

// 均有效的情况
} else {

int sharedCount, weakCount;
{
std::lock_guard<std::mutex> guard(*_mutex);
(*_weakRefCount) --;
sharedCount = *_sharedRefCount;
weakCount = *_weakRefCount;
}
// 双计数为0时清空控制块
if (sharedCount == 0 && weakCount == 0) {
delete _ptr;
delete _mutex;
delete _sharedRefCount;
delete _weakRefCount;
}
_ptr = sptr._ptr;
_mutex = sptr._mutex;
_sharedRefCount = sptr._sharedRefCount;
_weakRefCount = sptr._weakRefCount;
std::lock_guard<std::mutex> guard(*_mutex);
(*_weakRefCount) ++;
}

return *this;
}

bool expired() const {
int use = useCount();
// useCount为0则对象已经被析构
return use == 0;
}

SharedPtr<T> lock() const {
SharedPtr<T> ret;
// 无视_ptr为nullptr的情况
if(_ptr != nullptr) {
int sharedCount;
{
std::lock_guard<std::mutex> guard(*_mutex);
if ((sharedCount = *_sharedRefCount) != 0) {
(*_sharedRefCount) ++;
}
}
if (sharedCount != 0) {
ret._ptr = _ptr;
ret._mutex = _mutex;
ret._sharedRefCount = _sharedRefCount;
ret._weakRefCount = _weakRefCount;
}
}
return ret;
}

int useCount() const {
int ret;
if (_ptr == nullptr) {
ret = 0;
} else {
std::lock_guard<std::mutex> guard(*_mutex);
ret = *_sharedRefCount;
}
return ret;
}

private:
T *_ptr;
// 保存控制块
std::mutex *_mutex;
int *_sharedRefCount;
int *_weakRefCount;
friend SharedPtr<T>;
};

我们的实现相比标准库性能更低, 因为我们用到了互斥锁, 而标准库使用的原子操作更快。

所以为什么保存weak_ptr会造成轻微的内存泄漏呢?

原因很简单, 只要两个引用计数有一个不为0, 那么控制块就不会被清空。控制块的内存不会被清除就会造成轻微的内存泄漏。

enable_shared_from_this的用法

一些情况下, 我们需要在自己的类成员函数中, 拿到自身的shared_ptr。我们知道, 我们的一块内存, 要么自己控制, 要么用智能指针控制, 这是一个选择题。当我们选择使用shared_ptr进行控制时, 在类的代码中要把自身的指针递交给别人(不论是shared还是weak), 就得传智能指针, 现在有下面一种简单的方法, 通过传参做到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class MyObserver {
// 省略更多代码...
void observe(shared_ptr<MyObserable> mySharedPtr) {
// ...
}
// 省略更多代码...
};

class MyObserable {
// 省略更多代码...
void register(const MyObservable &mo, shared_ptr<MyObserable> &mySharedPtr) {
mo.observe(mySharedPtr);
}
// 省略更多代码...
};

int main() {
MyObserver mo;
shared_ptr<MyObserable> mc(new MyObserable);
mc.register(mo, mc);
}

传参是可行的, 然而很鸡肋, 这样做接口设计的太难看了。

如果我们的类注明只能用智能指针管理, 那么明显我们可以用一种侵入式的方式拿到自身的智能指针, 那么具体怎么做呢? enable_shared_from_this是解决方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class MyObserver {
// 省略更多代码...
void observe(shared_ptr<MyObserable> mySharedPtr) {
// ...
}
// 省略更多代码...
};

class MyObserable: enable_shared_from_this {
// 省略更多代码...
void register(const MyObservable &mo) {
mo.observe(shared_from_this());
}
// 省略更多代码...
};

int main() {
MyObserver mo;
shared_ptr<MyObserable> mc(new MyObserable);
mc.register(mo);
}

需要知道的是, 继承了enable_shared_from_this可以使用shared_from_this是通过侵入shared_ptr的数据结构实现的。需要这个类本身先用shared_ptr进行构造, 构造后类才能在其它成员函数(非构造函数)中使用shared_from_this

如何实现enable_shared_from_this?

enable_shared_from_this<T>是一个模板类, 它有weak_ptr<T>作为它的成员(我们记作member)。shared_ptr<T>的构造函数会探测T是否继承自enable_shared_from_this

如果T确实继承自enable_shared_from_this<T>, 那么shared_ptr<T>的构造函数就会将*this赋值给我们之前提到的member。下面是伪代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
template<class D>
class enable_shared_from_this {
protected:
constexpr enable_shared_from_this() { }
enable_shared_from_this(enable_shared_from_this const&) { }
enable_shared_from_this& operator=(enable_shared_from_this const&) {
return *this;
}

public:
shared_ptr<T> shared_from_this() { return self_.lock(); }
shared_ptr<T const> shared_from_this() const { return self_.lock(); }

private:
weak_ptr<D> self_;

friend shared_ptr<D>;
};

template<typename T>
shared_ptr<T>::shared_ptr(T* ptr) {
// ...
// Code that creates control block goes here.
// ...

// NOTE: This if check is pseudo-code. Won't compile. There's a few
// issues not being taken in to account that would make this example
// rather noisy.
if (is_base_of<enable_shared_from_this<T>, T>::value) {
enable_shared_from_this<T>& base = *ptr;
base.self_ = *this;
}
}