深入研究sqlalchemy连接池-程序员宅基地

技术标签: python  运维  数据库  

简介:

相对于最新的MySQL5.6,MariaDB在性能、功能、管理、NoSQL扩展方面包含了更丰富的特性。比如微秒的支持、线程池、子查询优化、组提交、进度报告等。

本文就主要探索MariaDB当中连接池的一些特性,配置。来配合我们的sqlalchemy。

一:起因

本来是不会写这个东西的,但是,写好了python--flask程序,使用sqlalchemy+mariadb,部署以后总是出问题,500错误之类的。

使用默认连接参数

engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)

错误提示是:

sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)

http://sqlalche.me/e/e3q8:

OperationalError:

Exception raised for errors that are related to the database’s operation andnot necessarily under the control of the programmer, e.g. an unexpecteddisconnect occurs, the data source name is not found, a transaction could notbe processed, a memory allocation error occurred during processing, etc.

This error is aDBAPI Errorand originates fromthe database driver (DBAPI), not SQLAlchemy itself.

TheOperationalErroris the most common (but not the only) error class usedby drivers in the context of the database connection being dropped, or notbeing able to connect to the database. For tips on how to deal with this, seethe sectionDealing with Disconnects.

意思是没有正确断开和数据库的连接。

二:处理断开

http://docs.sqlalchemy.org/en/latest/core/pooling.html#pool-disconnects

官方给了三种方案来解决这个问题:

1.悲观处理

engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)

pool_pre_ping=True

表示每次连接从池中检查,如果有错误,监测为断开的状态,连接将被立即回收。

2.自定义悲观的ping

from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy import select

some_engine = create_engine(...)

@event.listens_for(some_engine, "engine_connect")
def ping_connection(connection, branch):
    if branch:
        # "branch" refers to a sub-connection of a connection,
        # we don't want to bother pinging on these.
        return

    # turn off "close with result".  This flag is only used with
    # "connectionless" execution, otherwise will be False in any case
    save_should_close_with_result = connection.should_close_with_result
    connection.should_close_with_result = False

    try:
        # run a SELECT 1.   use a core select() so that
        # the SELECT of a scalar value without a table is
        # appropriately formatted for the backend
        connection.scalar(select([1]))
    except exc.DBAPIError as err:
        # catch SQLAlchemy's DBAPIError, which is a wrapper
        # for the DBAPI's exception.  It includes a .connection_invalidated
        # attribute which specifies if this connection is a "disconnect"
        # condition, which is based on inspection of the original exception
        # by the dialect in use.
        if err.connection_invalidated:
            # run the same SELECT again - the connection will re-validate
            # itself and establish a new connection.  The disconnect detection
            # here also causes the whole connection pool to be invalidated
            # so that all stale connections are discarded.
            connection.scalar(select([1]))
        else:
            raise
    finally:
        # restore "close with result"
        connection.should_close_with_result = save_should_close_with_result

说实话,没怎么看明白。

像是try一个select 语句,如果没问题就关闭。

 

3.乐观处理

from sqlalchemy import create_engine, exc
e = create_engine(...)
c = e.connect()

try:
    # suppose the database has been restarted.
    c.execute("SELECT * FROM table")
    c.close()
except exc.DBAPIError, e:
    # an exception is raised, Connection is invalidated.
    if e.connection_invalidated:
        print("Connection was invalidated!")

# after the invalidate event, a new connection
# starts with a new Pool
c = e.connect()
c.execute("SELECT * FROM table")

这个看懂了,try一个select语句,如果无效,就返回Connection was invalidated!,然后开一个新的连接,再去执行select。这个应该写个装饰器,放在每个查询前面。

4.使用连接池回收

from sqlalchemy import create_engine
e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)

这种方式就比较简单了,在连接参数中写上连接超时时间即可。

5.这是自己看文档找到的方法

from sqlalchemy.pool import QueuePool,NullPool,AssertionPool,StaticPool,SingletonThreadPool,Pool

在sqlalchemy.pool下有已经配置好的连接池,直接使用这些连接池也应该可以。

三:测试

docker run  --restart=always --privileged --name My_mariadb_01 -p 3301:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_02 -p 3302:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_03 -p 3303:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_04 -p 3304:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_05 -p 3305:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13

为避免因数据库交叉连接,首先开启5个MARIADB

Flask_Plan_01   8801       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)
Flask_Plan_02   8802       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_pre_ping=True)
Flask_Plan_03   8803       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=QueuePool)
Flask_Plan_04   8804       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=NullPool)
Flask_Plan_05   8805       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_recycle=3600)

用这5种连接参数进行连接测试。

如果你愿意,也可以继续开,QueuePool,NullPool,AssertionPool,StaticPool,SingletonThreadPool,Pool,把这几种都测试一下。

 

8801 8805 均会不同程度的出现500错误,8801频率还高点。

sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)
sqlalchemy.exc.OperationalError: (mysql.connector.errors.OperationalError) MySQL Connection not available. [SQL: 'SELECT public.id AS public_id, public.public_name AS public_public_name, public.public_email AS public_public_email \nFROM public \nWHERE public.public_name = %(public_name_1)s \n LIMIT %(param_1)s'] [parameters: [{}]] (Background on this error at: http://sqlalche.me/e/e3q8)


 

Internal Server Error

The server encountered an internal error and was unable to complete your request. Either the server is overloaded or there is an error in the application.

等会儿看看8802  8803 8804如何。

四:深入研究sqlalchemy源码

VENV\Flask_Base\Lib\site-packages\sqlalchemy\engine\__init__.py

看起来,没有默认值。所以engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',)报错频率比较高。

五:研究pool源码

VENV\Flask_Base\Lib\site-packages\sqlalchemy\pool.py

看来poolclass的类型都定义在这里了。

1.SingletonThreadPool

A Pool that maintains one connection per thread

每个线程维护一个连接的池。

2.QueuePool

A :class:`.Pool` that imposes a limit on the number of open connections.

这种方式限制了连接数量,QueuePool是默认的连接池方式,除非使用了方言,也就是第三方链接库。

难怪我使用MySQL-connector-python时老出错呢,没打开连接池啊。

3.NullPool

A Pool which does not pool connections...

不使用连接池

4.StaticPool

A Pool of exactly one connection, used for all requests.

一个完整的连接池,用于所有的连接。

5.AssertionPool

A :class:`.Pool` that allows at most one checked out connection at any given time.

任何时间只给一个签出连接?为了debug模式?不懂了。

看的官方说明也没这么详细。

这么看来,如果我使用默认链接库,可以不加参数试试。

mysql-python是sqlalchemy默认的mysql链接库,我在windows下装不上。放弃测试默认链接库,手动指定连接池为QueuePool。

或者指定连接池类型为:QueuePool   StaticPool   SingletonThreadPool(多线程的时候)

六:连接池类型测试

修改测试docker

docker run  --restart=always --privileged --name My_mariadb_01 -p 3301:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_02 -p 3302:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_03 -p 3303:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_04 -p 3304:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_05 -p 3305:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13
docker run  --restart=always --privileged --name My_mariadb_06 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d  mariadb:10.2.13

Flask_Plan_01   8801       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', pool_pre_ping=True))
Flask_Plan_02   8802       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=SingletonThreadPool)
Flask_Plan_03   8803       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=QueuePool)
Flask_Plan_04   8804       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=NullPool)
Flask_Plan_05   8805       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=StaticPool)
Flask_Plan_06   8806       engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan', poolclass=AssertionPool)

七:编写测试脚本

 

import requests
import time
i = 1
while True:
    try:
        r=requests.get('http://192.168.0.104:8801',timeout=5)
        if  r.status_code==200:
            print(time.strftime('%Y-%m-%d %H:%M:%S')+'---'+str(i)+'---'+str(r.status_code)+'---ok')
        else:
            print(time.strftime('%Y-%m-%d %H:%M:%S') + '---' + str(i) + '---' + str(r.status_code) + '-----------badr')
            break
        time.sleep(1)
        i+=1
    except:
        print('except')
        print(time.strftime('%Y-%m-%d %H:%M:%S') +'---'+str(i)+'-----------bad')
        break

修改地址,把几个测试服务都开始跑。

出错就会停了。

代码很烂,凑活测试而已。

从晚上22:30睡觉到早上6:10起床,pool_pre_ping=True,SingletonThreadPool,QueuePool,NullPool,StaticPool,AssertionPool,都很稳定,访问代码都是200

八:继续研究相关代码

http://docs.sqlalchemy.org/en/latest/core/pooling.html?highlight=use_threadlocal#using-connection-pools-with-multiprocessing

使用连接池进行多重处理

http://docs.sqlalchemy.org/en/latest/core/pooling.html?highlight=use_threadlocal#api-documentation-available-pool-implementations

api文档--连接池的实现

classsqlalchemy.pool.Pool(creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)

 

Parameters:    
creator–可调用的函数返回对象。
recycle– 超时回收时间。如果连接超过这个时间,连接就被关闭,换一个新的连接
logging_name - 日志标识名称
echo– 是否打印sql语句
use_threadlocal–是否使用线程,在同一应用程序的线程使用相同的连接对象
reset_on_return–在返回前的操作
    rollback,大概是自动回滚
    True 同为回滚
    commit 大概是自动提交的意思
    None 无操作
    none 无操作
    False 无操作
events– 列表元组,每个表单会传递给listen………………没搞懂
listeners - 弃用,被listen取代
dialect–链接库,使用create_engine时不使用,由引擎创建时处理
pre_ping–是否测试连接

基本上这些参数都在engine-creation-api中

http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html#engine-creation-api

Pool                  (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
StaticPool         (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
NullPool            (creator,recycle=-1,echo=None,use_threadlocal=False,logging_name=None,reset_on_return=True,listeners=None,events=None,dialect=None,pre_ping=False,_dispatch=None)
QueuePool          (creator,pool_size=5,max_overflow=10,timeout=30,**kw)
SingletonThreadPool(creator,pool_size=5,**kw)
AssertionPool      (*args,**kw)

这下清楚了,Pool,StaicPool,NullPool,都一样,直接回收,效率一定低了。

我们就指定默认的QueuePool好了。以后观察着服务器的负载,负载大了以后,调整就好了。

自定义方法如下:

engine = create_engine('mysql+mysqlconnector://plan:plan@mysql/plan',
                       pool_size=5,
                       max_overflow=10,
                       pool_timeout=30,
                       pool_pre_ping=True)

九:总结

曲折的道路,终于找到了解决方案。

sqlalchemy的教程当中,很少有讲如何部署的。很多又是linux开发。可能在linux下很容易装默认链接库,部署的时候就自动使用了QueuePool连接池。所以这种问题很少出现。

我在windows下开发,部署在linux,开发和部署都使用了非默认链接库,导致没有使用默认连接池。

那么随着深入研究,找到了连接池的配置,并掌握这一知识,为以后的开发部署工作,扫除了障碍。

虽然源码里面还有很多看不懂,但是读书百遍其义自见,还是要多读(我是懒蛋,遇到问题,再去解决,下一个问题是什么呢?)。

 

转载于:https://www.cnblogs.com/jackadam/p/8727409.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_30772105/article/details/98882352

智能推荐

class和struct的区别-程序员宅基地

文章浏览阅读101次。4.class可以有⽆参的构造函数,struct不可以,必须是有参的构造函数,⽽且在有参的构造函数必须初始。2.Struct适⽤于作为经常使⽤的⼀些数据组合成的新类型,表示诸如点、矩形等主要⽤来存储数据的轻量。1.Class⽐较适合⼤的和复杂的数据,表现抽象和多级别的对象层次时。2.class允许继承、被继承,struct不允许,只能继承接⼝。3.Struct有性能优势,Class有⾯向对象的扩展优势。3.class可以初始化变量,struct不可以。1.class是引⽤类型,struct是值类型。

android使用json后闪退,应用闪退问题:从json信息的解析开始就会闪退-程序员宅基地

文章浏览阅读586次。想实现的功能是点击顶部按钮之后按关键字进行搜索,已经可以从服务器收到反馈的json信息,但从json信息的解析开始就会闪退,加载listview也不知道行不行public abstract class loadlistview{public ListView plv;public String js;public int listlength;public int listvisit;public..._rton转json为什么会闪退

如何使用wordnet词典,得到英文句子的同义句_get_synonyms wordnet-程序员宅基地

文章浏览阅读219次。如何使用wordnet词典,得到英文句子的同义句_get_synonyms wordnet

系统项目报表导出功能开发_积木报表 多线程-程序员宅基地

文章浏览阅读521次。系统项目报表导出 导出任务队列表 + 定时扫描 + 多线程_积木报表 多线程

ajax 如何从服务器上获取数据?_ajax 获取http数据-程序员宅基地

文章浏览阅读1.1k次,点赞9次,收藏9次。使用AJAX技术的好处之一是它能够提供更好的用户体验,因为它允许在不重新加载整个页面的情况下更新网页的某一部分。另外,AJAX还使得开发人员能够创建更复杂、更动态的Web应用程序,因为它们可以在后台与服务器进行通信,而不需要打断用户的浏览体验。在Web开发中,AJAX(Asynchronous JavaScript and XML)是一种常用的技术,用于在不重新加载整个页面的情况下,从服务器获取数据并更新网页的某一部分。使用AJAX,你可以创建异步请求,从而提供更快的响应和更好的用户体验。_ajax 获取http数据

Linux图形终端与字符终端-程序员宅基地

文章浏览阅读2.8k次。登录退出、修改密码、关机重启_字符终端

随便推点

Python与Arduino绘制超声波雷达扫描_超声波扫描建模 python库-程序员宅基地

文章浏览阅读3.8k次,点赞3次,收藏51次。前段时间看到一位发烧友制作的超声波雷达扫描神器,用到了Arduino和Processing,可惜啊,我不会Processing更看不懂人家的程序,咋办呢?嘿嘿,所以我就换了个思路解决,因为我会一点Python啊,那就动手吧!在做这个案例之前先要搞明白一个问题:怎么将Arduino通过超声波检测到的距离反馈到Python端?这个嘛,我首先想到了串行通信接口。没错!就是串口。只要Arduino将数据发送给COM口,然后Python能从COM口读取到这个数据就可以啦!我先写了一个测试程序试了一下,OK!搞定_超声波扫描建模 python库

凯撒加密方法介绍及实例说明-程序员宅基地

文章浏览阅读4.2k次。端—端加密指信息由发送端自动加密,并且由TCP/IP进行数据包封装,然后作为不可阅读和不可识别的数据穿过互联网,当这些信息到达目的地,将被自动重组、解密,而成为可读的数据。不可逆加密算法的特征是加密过程中不需要使用密钥,输入明文后由系统直接经过加密算法处理成密文,这种加密后的数据是无法被解密的,只有重新输入明文,并再次经过同样不可逆的加密算法处理,得到相同的加密密文并被系统重新识别后,才能真正解密。2.使用时,加密者查找明文字母表中需要加密的消息中的每一个字母所在位置,并且写下密文字母表中对应的字母。_凯撒加密

工控协议--cip--协议解析基本记录_cip协议embedded_service_error-程序员宅基地

文章浏览阅读5.7k次。CIP报文解析常用到的几个字段:普通类型服务类型:[0x00], CIP对象:[0x02 Message Router], ioi segments:[XX]PCCC(带cmd和func)服务类型:[0x00], CIP对象:[0x02 Message Router], cmd:[0x101], fnc:[0x101]..._cip协议embedded_service_error

如何在vs2019及以后版本(如vs2022)上添加 添加ActiveX控件中的MFC类_vs添加mfc库-程序员宅基地

文章浏览阅读2.4k次,点赞9次,收藏13次。有时候我们在MFC项目开发过程中,需要用到一些微软已经提供的功能,如VC++使用EXCEL功能,这时候我们就能直接通过VS2019到如EXCEL.EXE方式,生成对应的OLE头文件,然后直接使用功能,那么,我们上篇文章中介绍了vs2017及以前的版本如何来添加。但由于微软某些方面考虑,这种方式已被放弃。从上图中可以看出,这一功能,在从vs2017版本15.9开始,后续版本已经删除了此功能。那么我们如果仍需要此功能,我们如何在新版本中添加呢。_vs添加mfc库

frame_size (1536) was not respected for a non-last frame_frame_size (1024) was not respected for a non-last-程序员宅基地

文章浏览阅读785次。用ac3编码,执行编码函数时报错入如下:[ac3 @ 0x7fed7800f200] frame_size (1536) was not respected for anon-last frame (avcodec_encode_audio2)用ac3编码时每次送入编码器的音频采样数应该是1536个采样,不然就会报上述错误。这个数字并非刻意固定,而是跟ac3内部的编码算法原理相关。全网找不到,国内音视频之路还有很长的路,音视频人一起加油吧~......_frame_size (1024) was not respected for a non-last frame

Android移动应用开发入门_在安卓移动应用开发中要在活动类文件中声迷你一个复选框变量-程序员宅基地

文章浏览阅读230次,点赞2次,收藏2次。创建Android应用程序一个项目里面可以有很多模块,而每一个模块就对应了一个应用程序。项目结构介绍_在安卓移动应用开发中要在活动类文件中声迷你一个复选框变量

推荐文章

热门文章

相关标签