关于创建mysql连接池的具体思路和实现

思路

首先连接池做的就是管理连接的作用

而用户需要的是连接

我们就是管理连接并且提供连接的过程

所以我们围绕的是 getconnection,如何让用户获得一个连接。

目的

原本的 mysql 的 api 或者封装的 mysql 类,都可以建立连接,为什么要做一个连接池来管理呢?

因为大量的用户连接请求会导致大量的连接创建和销毁,消耗资源,所以就想能不能优化这一部分?

其中大量的连接是重复的,是可以被重新调用访问数据库的。也就是说很多是不需要销毁的,如果恰好够用且不销毁,那么就不会大量创建,所以其中创建和销毁是重复的。

所以整个的数据库连接池就是为了提升效率,减少重复的冗余操作。

整个 mysql 结构图

单例设计思路

单例推荐采用的是 c++11 支持的性质:”在 C++中,函数内部可以声明静态局部变量。这些变量在函数第一次执行到它们的声明时进行初始化,并且在程序的整个运行期间保持它们的值,即使函数已经返回。每次函数被调用时,这些变量都会保持它们上一次调用结束时的状态。”

也就是说,只要被执行一次,那么就会初始化,且只初始化一次。

所以也引出我们单例设计的模式:“magic static“

其他的单例模式不是很推荐,因为或多或少都有其他的问题,像 饿汉模式,如果程序运行过程中可能不会使用到该单例,会导致资源浪费,而 双重检查锁定模式 会因为编译器的优化导致出现线程安全问题,就是你想的和编译器以为的不一样。还有就是 枚举类型 实现起来繁琐且不直观。

所以采用 magic static 的方式,简单且安全

1
2
3
4
5
// 获取单例实例
connection_pool& connection_pool::GetInstance() {
static connection_pool connPool;
return connPool;
}

RAII 设计思路

我们获取当前的连接池的单例后,要考虑的就是如何写我们的构造函数,这个应该参照 RAII 设计

RAII 模式,也就是“资源获取即初始化”,为的是将资源的获取和释放与对象的生命周期绑定在一起。当对象被创建时,它负责获取所需的资源;当对象被销毁时,它负责释放资源。

所以我们的构造函数和析构函数就要围绕构造的时候获取什么?析构的时候顺带销毁什么?

连接池,那就是 sql 连接管理的池子,自然就是构造函数获取初始连接,析构函数析构所有连接

所以设计上应该需要在构造函数和析构函数中设置两个循环,分别管理连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ConnectionPool::~ConnectionPool()

{

// 释放队列中的所有连接

while(!m_connectionQ.empty()){

MySqlConn *conn = m_connectionQ.front();

m_connectionQ.pop();

delete conn;

}



// 释放在外面的连接

for (MySqlConn* conn : m_memoryS) {

delete conn;

}

}

生产者线程,消费者线程

  • 首先生产者线程应该有生产函数
  • 消费者线程有消费函数
  • 两者线程应该分离开来区别于主线程

所以引出三个问题
主线程是什么?
消费者函数怎么写?
生产者函数怎么写?

主线程

母庸质疑就是调用线程池的 main 方法。

那么不可能在调用 main 方法的地方我们分离生产者函数和消费者函数,因为我们是封装好的类,应该只提供让用户 get 的接口,而不是让用户操作,应该让用户将其看成黑盒,而不是做更多的操作。

所以,应该如何分离消费者线程和和生产者线程呢?

首先,从线程安全的角度看,如果生产者,消费者线程,启动得太快,那么就会访问到未初始化的数据,如果启动得太慢,又无法动态调整连接数量,导致响应过慢,影响效率。

再从职责上讲,函数无非就是 getconnectionproduceConnectionrecycleConnection getInstance
什么样的函数就应该只做怎么样的事情,所以选择在构造函数中分离生产函数和消费函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ConnectionPool::ConnectionPool()

{
//加载配置文件

if(!parseJsonFile()){

std::cerr<<"pareseJsonFile is failed"<<std::endl;

return ;
}

m_CntSize=0;

//创建连接个数

for(int i=0;i<m_minSize;i++){
addConnection();
}
//调用两个线程分别生产和消费
//选择在构造函数中分离生产函数和消费函数
std::thread producer(&ConnectionPool::produceConnection,this);
std::thread recycler(&ConnectionPool::recycleConnection,this);
producer.detach();
recycler.detach();
}

生产者函数

生产函数 produceConnection

首先他需要线程分离,然后 不断 地去判断一些条件,符合条件的就生产,因为 不断 所以我们必须用 while 来覆盖住整个函数。

前面提到,如果我们需要按时间回收,用队列是比较好的选择,所以我们存储方式是选择队列,当符合条件就生产,加入队列。

那么这个条件是什么呢?

这个条件有两点

  • 第一如果队列中有可用连接就不生产
  • 第二如果生产连接达到上限就不生产

这样做就可以让连接维持在一个规定的上界,同时让队列始终保持有连接。

随后当我们调用完函数以后就需要通知消费者消费,所以函数最后要加一个信号量通知。

但有一个问题,生产者如果不生产需不需要通知消费者?
事实上是不需要的,等待调用连接结束,可以通过智能指针回收连接到队列当中,同时再唤醒消费者即可。

所以整个的逻辑是
当消费者消费时,生产者就得被唤醒,判断队列是否为空,如果为空,就说明需要生产连接,再判断是否生产在上限以内,如果是就生产,然后唤醒,不是就重新循环等待被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void ConnectionPool::produceConnection()

{
while(true){
//创建自动管理锁的对象
std::unique_lock<std::mutex> locker(m_mutexQ);
while(!m_connectionQ.empty()){
m_cond.wait(locker);
}
if(m_CntSize<m_maxSize){
addConnection();
}
//生产后要去唤醒消费者线程
m_cond.notify_all();
}
}

消费者线程

getconnection

消费者线程就是 getConnection 函数和recycleConnection函数两个函数,我们先说 getConnectiongetConnection 是主线程用户调用的线程,通过调用获得连接。

getConnection无非就是拿的过程,从什么地方拿?从队列中拿,拿取头一个元素,返回的是一个指针类型。

那么拿走的连接如何还回到队列当中呢?时间上肯定是需要单用户结束调用的时候,还回到队列当中,一般的想法是提供一个用户接口,让用户把连接放进去,但是这样太不优雅了,不应该让用户管理释放。

那么回到刚刚说的,返回的是一个指针类型,是不是就可以考虑智能指针,来管理连接。通过lambda函数,把释放的链接放入队列。另外的关于智能指针的选择,推荐使用独占智能制造,因为共享智能指针可能会被用户错误的使用导致循环引用等问题,同时出现bug也不好寻找析构的代码行。所以优先考虑独占智能指针。

那么回到代码上来,首先getConnection函数是通过用户主线程调用的,所以是不需要向分离线程一样,使用一个while来维持线程运作,这是区别于生产者线程和回收者线程的。

首先通过锁实现线程同步,再去访问当前的队列有没有连接,有连接就拿。没有连接就阻塞一段时间,再访问,如果没有得到打印错误信息,直接返回空。得到的连接用智能指针管理,释放智能指针的时候,同时把连接放回队列当中。最后通知生产者判断有没有必要生产,返回一个智能指针类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
shared_ptr<MySqlConn>ConnectionPool::getConnection()

{


unique_lock<mutex> locker(m_mutexQ);

//先判断是否有连接可用

//如果是空的,那么就保证阻塞

while (m_connectionQ.empty()) {


if (cv_status::timeout == m_cond.wait_for(locker, chrono::milliseconds(m_timeout))) {

//如果阻塞后仍然为空那么就记入次数,同时再次进入阻塞

if (m_connectionQ.empty()) {

throw std::runtime_error("Unable to acquire connection from pool");

return nullptr;

}

}

}

//记忆当前的连接

m_memoryS.insert(m_connectionQ.front());

//然后直接获取

shared_ptr<MySqlConn>connptr(m_connectionQ.front(),[this](MySqlConn*conn){

//由于资源是共享的,所以要加锁

lock_guard<mutex> locker(m_mutexQ);



//如果找到就删除

if (m_memoryS.find(conn) != m_memoryS.end()) {

m_memoryS.erase(conn);

}

conn->refreshAliveTime();

m_connectionQ.push(conn);

});

m_connectionQ.pop();

m_cond.notify_all();

return connptr;

}

回收线程

回收线程的设计是基于如果多余的连接超时就释放连接,以保证空闲时没有大量的连接占用。

因为需要再后台去轮询所有的连接是否超时,所以需要用到线程分离,那么就需要一个while循环这个函数代码。
但是也不能一直轮询,会浪费cpu调度,那么就使用一个sleep隔一段时间回收一次。
回收的时候需要对队列资源进行访问,所以要用到锁,上一个锁然后开始访问资源,访问哪一个资源呢?如果回收连接的话,回收那一刻一定是最新的,所以在队头的一定是最旧的,所以从队头到队尾轮询时间。
如果此时连接数量大于初始设置数量,且超时,那么就回收连接,否则就直接返回,因为如果当前没有超市,后面更新的也不会超时了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
  void ConnectionPool::recycleConnection()

{

while(1){

// std::cerr<<"recycleConnection"<<std::endl;

std::this_thread::sleep_for(std::chrono::seconds(1));


lock_guard<mutex> locker(m_mutexQ);

while(m_CntSize>m_minSize){

MySqlConn* conn=m_connectionQ.front();

//为什么队头的存活时间是最长的?

if(conn->getAliveTime()>=m_maxIdleTime){

m_connectionQ.pop();

m_CntSize--;

delete conn;

}

else{

break;

}

}

}

}

析构函数

那么因为我们使用的是RAII原则,所以创建的要再析构中释放掉,那么我们释放的是什么,是连接对吧,所以我们把队列中的全部弹出来释放掉就可以了。

但是有没有发现一个问题,如果说用户提前调用了析构函数,那么是不是队列没了,队列里面的连接释放了,但是分出去的连接还会回来访问队列,出现未定义的行为?

那么如果我们把析构函数设置成私有的似乎是可以的,不让用户提前删除,但是如果用户就想关闭数据库怎么办?
而且因为析构函数是私有的,那么栈上就无法释放这个实例,因为当作用域结束,编译器是会自动调用析构函数的,如果私有将无法调用,从而出现内存泄漏。

一些额外的问题

如果我们考虑使用一个close函数通过while配合信号量,当返回的时候通知close判断是否符合析构条件,如果析构,听起来好像还行,但是问题在于,我们的析构函数是不应该被成员函数调用的,如果成员函数使用了成员变量,那么这个成员变量被析构了又在成员函数作用域,这样容易引起错误。

所以我对此的想法就是做再做一个集合,存放分出去的连接,当析构函数被调用的时候,两个集合都便利一遍,释放就可以了。

所以在getConnection函数中我们把连接添加到分出去集合
lambda中,我们从分发集合中删除该连接。

所以我们通过分发集合和空闲队列一起管理连接的建立。

这样就把在外的和在内的连接都回收了就不会出现连接回收的问题。

源码

.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
#include "ConnectionPool.h"

#include <rapidjson/document.h>

#include <rapidjson/istreamwrapper.h>

#include <fstream>

#include <iostream>

#include <thread>

#include <mutex>

#include <chrono>

ConnectionPool::~ConnectionPool()

{

// 释放队列中的所有连接

while(!m_connectionQ.empty()){

MySqlConn *conn = m_connectionQ.front();

m_connectionQ.pop();

delete conn;

}



// 释放在外面的连接

for (MySqlConn* conn : m_memoryS) {

delete conn;

}

}



ConnectionPool *ConnectionPool::getInstance()

{

static ConnectionPool instance; // 局部静态变量,线程安全的初始化

return &instance;

}

//用户获取接口(可以用优先级队列优化)

shared_ptr<MySqlConn>ConnectionPool::getConnection()

{

// std::cerr<<"getConnection()locker(m_mutexQ)"<<std::endl;

unique_lock<mutex> locker(m_mutexQ);

//先判断是否有连接可用

while (m_connectionQ.empty()) {

// std::cerr<<" getConnection()_while (m_connectionQ.empty()"<<std::endl;

if (cv_status::timeout == m_cond.wait_for(locker, chrono::milliseconds(m_timeout))) {

//如果阻塞后仍然为空那么就记入次数,同时再次进入阻塞

if (m_connectionQ.empty()) {

throw std::runtime_error("Unable to acquire connection from pool");

return nullptr;

}

}

}

//记忆当前的连接

m_memoryS.insert(m_connectionQ.front());

//然后直接获取

shared_ptr<MySqlConn>connptr(m_connectionQ.front(),[this](MySqlConn*conn){

//由于资源是共享的,所以要加锁

lock_guard<mutex> locker(m_mutexQ);



//如果找到就删除

if (m_memoryS.find(conn) != m_memoryS.end()) {

m_memoryS.erase(conn);

}

conn->refreshAliveTime();

m_connectionQ.push(conn);

});

m_connectionQ.pop();

m_cond.notify_all();

return connptr;

}

//连接池初始化

ConnectionPool::ConnectionPool()

{

//加载配置文件

if(!parseJsonFile()){

std::cerr<<"pareseJsonFile is failed"<<std::endl;

return ;

}

m_CntSize=0;

//创建连接个数

for(int i=0;i<m_minSize;i++){

addConnection();

}

//调用两个线程分别生产和消费

std::thread producer(&ConnectionPool::produceConnection,this);

std::thread recycler(&ConnectionPool::recycleConnection,this);

producer.detach();

recycler.detach();

}



//初始化数据库数据

bool ConnectionPool::parseJsonFile() {

std::ifstream ifs("dbconf.json");

rapidjson::IStreamWrapper isw(ifs);

rapidjson::Document d;

d.ParseStream(isw);



if (d.IsObject()) {

m_ip = d["ip"].GetString();

m_port = d["port"].GetUint();

m_user = d["userName"].GetString();

m_passwd = d["password"].GetString();

m_dbName = d["dbName"].GetString();

m_minSize = d["minSize"].GetInt();

m_maxSize = d["maxSize"].GetInt();

m_maxIdleTime = d["maxIdleTime"].GetInt();

m_timeout = d["timeout"].GetInt();

return true;

}

return false;

}

//生产数据库连接

void ConnectionPool::produceConnection()

{

while(true){

// std::cerr<<"produceConnection()_while(true)"<<std::endl;

//创建自动管理锁的对象

std::unique_lock<std::mutex> locker(m_mutexQ);

//如果当前还有就阻塞

while(!m_connectionQ.empty()){

// std::cerr<<"produceConnection()_while(!m_connectionQ.empty())"<<std::endl;

m_cond.wait(locker);

}

//如果当前没有就生产

if(m_CntSize<m_maxSize){

// std::cerr<<"produceConnection()_if(m_CntSize<m_maxSize)"<<std::endl;

addConnection();

//生产后要去唤醒消费者线程

m_cond.notify_all();

}

}

}



void ConnectionPool::recycleConnection()

{

while(1){

// std::cerr<<"recycleConnection"<<std::endl;

std::this_thread::sleep_for(std::chrono::seconds(1));

lock_guard<mutex> locker(m_mutexQ);

while(m_CntSize>m_minSize){

// std::cerr<<"recycleConnection_while(m_CntSize>m_minSize)"<<std::endl;

MySqlConn* conn=m_connectionQ.front();

//为什么队头的存活时间是最长的?

if(conn->getAliveTime()>=m_maxIdleTime){

m_connectionQ.pop();

m_CntSize--;

delete conn;

}

else{

break;

}

}

}

}

void ConnectionPool::addConnection()

{



// std::cerr<<"addConnection()"<<std::endl;

MySqlConn *p = new MySqlConn();

p->connect(m_user, m_passwd, m_dbName, m_ip, m_port);

p->refreshAliveTime(); // 刷新一下开始空闲的起始时间

m_connectionQ.push(p);

m_CntSize++;

}

.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#include <string>

#include <queue>

#include <mutex>

#include <set>

#include <condition_variable>

#include "MySqlConn.h"

using std::string;

using std::queue;

class ConnectionPool {

public:

// 禁用拷贝构造函数和赋值操作符

ConnectionPool(const ConnectionPool& obj) = delete;

ConnectionPool& operator=(const ConnectionPool&) = delete;

~ConnectionPool();

static ConnectionPool* getInstance();

//如何让用户得到链接?(因为要动态管理所以用非静态的接口)

shared_ptr<MySqlConn> getConnection();



private:

ConnectionPool() ;

//json使用文件

bool parseJsonFile();

void produceConnection() ;

void recycleConnection () ;

void addConnection();

// 不需要静态指针成员,因为我们使用局部静态变量

//数据库的ip

string m_ip;

//数据库管理用户名

string m_user;

//数据库的密码

string m_passwd;

//数据库的库名

string m_dbName;

//我的端口

unsigned short m_port;

//规定上下界

int m_CntSize=0;

int m_minSize;

int m_maxSize;

//最大请求连接重试次数

int maxRetryCount;

//超时的等待时长

int m_timeout;

//最长的sql连接空闲时长

int m_maxIdleTime;

//链接队列

std::queue<MySqlConn*> m_connectionQ;

std::set<MySqlConn*> m_memoryS;

//访问队列的互斥锁

std::mutex m_mutexQ ;

//消费者线程和生产者线程的条件变量

std::condition_variable m_cond;

};