1. 背景说明 笔者在某个项目中中采用了 event-driven 的架构,并使用 Celery 来作为架构模式中的 worker 角色。Celery 在收到 master 发出的任务执行信号后,即会从数据库中读取任务信息并开始执行,于此同时,在执行过程中还会将必要的信息持久化到数据库中,而在这整个流程中,所有的数据库操作都通过 Django orm 框架来完成。
mac OS 10.13.4
Python 2.7.9
Django 1.8.11
Celery 3.1.18
Django-celery 3.1.16
Mysql 5.7.22
2. 出现问题 Celery worker 在执行的过程中,会偶发性的出现一些由 PyMySQL 抛出的异常,如(出现过的错误包括但不仅限于以下错误):
error: Error -3 while decompressing data: incorrect header check
OperationalError: (2006, "MySQL server has gone away (error(32, 'Broken pipe'))")
error: Command Out Of Sync
通过多次测试和排查,发现问题是由于多个 Celery worker 使用了同一个数据库连接而导致的,由于多个 worker 使用了同一条连接,所以在进行数据库读写的时候,就很有可能会因为读到了由另一个 Worker 写入的脏数据或者是因为事务隔离而导致的数据不一致而引发异常,部分不可恢复的异常抛出后,由于连接被断开,就会导致其他 worker 在使用同一条链接时引发 MySQL server has gone away
的错误,而且由于并行程序存调度的不确定性,导致很难精确的复现某个错误,使得问题的根源具有一定的隐蔽性。那么,究竟是什么原因导致多个 worker 使用了同一条数据库连接呢?
3. 追根溯源 3.1 Celery 多进程(prefork)模式
By default multiprocessing is used to perform concurrent execution of tasks , but you can also use Eventlet.
Celery 的官方文档中有说明,Celery 默认使用 multiprocessing 库来实现任务的并行,而 multiprocessing 是通过 fork 来实现进程的创建操作的,那么 Celery 的 worker 进程会不会都是由一个父进程通过 fork 创建出来的呢?
在 celery worker 启动后,通过 ps
1 2 3 4 5 6 $ ps -ef | grep celery 501 46229 54985 0 4:50下午 ttys004 0:01.41 python manage.py celery worker -c 4 501 46251 46229 0 4:50下午 ttys004 0:00.02 python manage.py celery worker -c 4 501 46252 46229 0 4:50下午 ttys004 0:00.02 python manage.py celery worker -c 4 501 46253 46229 0 4:50下午 ttys004 0:00.02 python manage.py celery worker -c 4 501 46254 46229 0 4:50下午 ttys004 0:00.02 python manage.py celery worker -c 4
通过进程状态查看工具,可以发现进程 46251
都是由进程 46299
fork 出来的,而在 fork 操作时子进程会复制父进程内存空间中的所有数据,所以在这个时候,所有的子进程都拿到了父进程中的数据库连接,当多个子进程操作该连接时,就会引发上述问题。
至于为什么子进程会拿到和父进程一样的连接,还得从 Django ORM 连接管理部分的实现说起。
3.2 Django ORM 连接管理 既然问题是由于共享数据库连接导致的,Django ORM 中数据库连接管理的实现就是问题的根源所在。
首先,Django ORM 中所有的数据库操作都会通过 django.db
模块下的 connections
1 2 3 4 5 6 7 8 9 from django.db import connectionsdef get_compiler (self, using=None, connection=None) : if using is None and connection is None : raise ValueError("Need either using or connection" ) if using: connection = connections[using] return connection.ops.compiler(self.compiler)(self, connection, using)
1 2 3 4 5 6 7 from django.db.utils import ConnectionHandlerconnections = ConnectionHandler()
原来 connections
这个全局变量是一个 ConnectionHandler
类,那么这个 ConnectionHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from threading import localclass ConnectionHandler (object) : def __init__ (self, databases=None) : self._databases = databases self._connections = local() def __getitem__ (self, alias) : if hasattr(self._connections, alias): return getattr(self._connections, alias) self.ensure_defaults(alias) self.prepare_test_settings(alias) db = self.databases[alias] backend = load_backend(db['ENGINE' ]) conn = backend.DatabaseWrapper(db, alias) setattr(self._connections, alias, conn) return conn def __setitem__ (self, key, value) : setattr(self._connections, key, value) def __delitem__ (self, key) : delattr(self._connections, key)
当我们通过 connections[usings]
获取数据库连接时,就会调用 ConnectionHandler
的 __getitem__()
方法,此时 ConnectionHandler
会先通过我们在 settings.py
中的配置的数据库信息加载相应的数据库后端,同时初始化该后端实现的 DatabaseWrapper
是 Django 用于表示数据库连接的一个包装类,由数据库后端自行实现,并且该必须继承自 BaseDatabaseWrapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class BaseDatabaseWrapper (object) : """ Represents a database connection. """ def __init__ (self, settings_dict, alias=DEFAULT_DB_ALIAS, allow_thread_sharing=False) : self.connection = None self.close_at = None self.allow_thread_sharing = allow_thread_sharing def get_connection_params (self) : raise NotImplementedError('subclasses of BaseDatabaseWrapper may require a get_connection_params() method' ) def get_new_connection (self, conn_params) : raise NotImplementedError('subclasses of BaseDatabaseWrapper may require a get_new_connection() method' ) def ensure_connection (self) : if self.connection is None : with self.wrap_database_errors: self.connect() def connect (self) : conn_params = self.get_connection_params() self.connection = self.get_new_connection(conn_params)
绕来绕去,笔者终于找到了真正的数据库连接的藏身之处,Django 通过 BaseDatabaseWrapper
1 2 3 4 5 6 7 8 9 10 11 DATABASES = { 'default' : { 'ENGINE' : 'django.db.backends.mysql' , 'NAME' : '' , 'USER' : '' , 'PASSWORD' : '' , 'HOST' : 'localhost' , 'PORT' : '3306' , 'CONN_MAX_AGE' : 3600 , }, }
顺着声明,找到相应后端的 DatabaseWrapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 try : import MySQLdb as Database except ImportError as e: from django.core.exceptions import ImproperlyConfigured raise ImproperlyConfigured("Error loading MySQLdb module: %s" % e) class DatabaseWrapper (BaseDatabaseWrapper) : def get_new_connection (self, conn_params) : conn = Database.connect(**conn_params) conn.encoders[SafeText] = conn.encoders[six.text_type] conn.encoders[SafeBytes] = conn.encoders[bytes] return conn
至此,Django ORM 中对数据库连接的管理已经非常清晰了:Django 通过 ConnectionHandler
来管理整个 Django APP 中的所有数据库连接,并且使用 ThreadLocal
来隔离不同线程所能获取到的数据库连接,同时,通过 DatabaseWrapper
3.3 验证猜想 在了解了 Django ORM 对数据库连接的管理方式之后,我们就能够来探讨导致多个 worker 共享同一个数据库连接的真正原因了。为了验证不同的子进程确实使用了同一个数据库连接,笔者监听了 Celery worker 进程初始化的信号,并在 worker 进程初始化完成后查看当前进程的相关信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import logging import threading import os from celery import signalsfrom django.db import connections@signals.worker_process_init.connect def worker_init (**kwargs) : import threading import os for conn in connections.all(): logger.error( '\n <pid>: %s\n ' '---------------------------------- \n ' '<connection>: %s\n ' '<wrapper>: %s\n ' '---------------------------------- \n ' '<thread>: %s\n ' '<local>: %s\n ' '<handler>: %s\n ' '<parent pid>: %s \n ' '<allow_thread_sharing>: %s\n' % ( os.getpid(), conn.connection, conn, threading.currentThread(), threading.local(), connections, os.getppid(), conn.allow_thread_sharing))
的 all()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 DEFAULT_DB_ALIAS = 'default' class ConnectionHandler (object) : @cached_property def databases (self) : if self._databases is None : self._databases = settings.DATABASES if self._databases == {}: self._databases = { DEFAULT_DB_ALIAS: { 'ENGINE' : 'django.db.backends.dummy' , }, } if DEFAULT_DB_ALIAS not in self._databases: raise ImproperlyConfigured("You must define a '%s' database" % DEFAULT_DB_ALIAS) return self._databases def __iter__ (self) : return iter(self.databases) def all (self) : return [self[alias] for alias in self]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 [2018-07-20 09:50:05,601: ERROR/Worker-3] <pid>: 41293 ---------------------------------- <connection>: <pymysql.connections.Connection object at 0x107b6e350> <wrapper>: <django.db.backends.mysql.base.DatabaseWrapper object at 0x10739d250> ---------------------------------- <thread>: <_MainThread(MainThread, started 140735510033280)> <local>: <thread._local object at 0x107df54d0> <handler>: <django.db.utils.ConnectionHandler object at 0x1061edd50> <parent pid>: 41281 <allow_thread_sharing>: False [2018-07-20 09:50:05,601: ERROR/Worker-2] <pid>: 41292 ---------------------------------- <connection>: <pymysql.connections.Connection object at 0x107b6e350> <wrapper>: <django.db.backends.mysql.base.DatabaseWrapper object at 0x10739d250> ---------------------------------- <thread>: <_MainThread(MainThread, started 140735510033280)> <local>: <thread._local object at 0x107df44d0> <handler>: <django.db.utils.ConnectionHandler object at 0x1061edd50> <parent pid>: 41281 <allow_thread_sharing>: False [2018-07-20 09:50:05,604: ERROR/Worker-1] <pid>: 41291 ---------------------------------- <connection>: <pymysql.connections.Connection object at 0x107b6e350> <wrapper>: <django.db.backends.mysql.base.DatabaseWrapper object at 0x10739d250> ---------------------------------- <thread>: <_MainThread(MainThread, started 140735510033280)> <local>: <thread._local object at 0x107df44d0> <handler>: <django.db.utils.ConnectionHandler object at 0x1061edd50> <parent pid>: 41281 <allow_thread_sharing>: False [2018-07-20 09:50:05,604: ERROR/Worker-4] <pid>: 41294 ---------------------------------- <connection>: <pymysql.connections.Connection object at 0x107b6e350> <wrapper>: <django.db.backends.mysql.base.DatabaseWrapper object at 0x10739d250> ---------------------------------- <thread>: <_MainThread(MainThread, started 140735510033280)> <local>: <thread._local object at 0x107df54d0> <handler>: <django.db.utils.ConnectionHandler object at 0x1061edd50> <parent pid>: 41281 <allow_thread_sharing>: False
可以看到,4个 worker 进程 41291
, 41292
, 41293
, 41294
被启动了,随后通过 lsof
查看这些进程打开的 mysql 连接:
1 2 3 4 5 $ lsof -p 41291,41292,41293,41294 | grep :mysql python2.7 41291 lianghonghao 10u IPv6 0xb37e546feaec2979 0t0 TCP localhost:62502->localhost:mysql (ESTABLISHED) python2.7 41292 lianghonghao 10u IPv6 0xb37e546feaec2979 0t0 TCP localhost:62502->localhost:mysql (ESTABLISHED) python2.7 41293 lianghonghao 10u IPv6 0xb37e546feaec2979 0t0 TCP localhost:62502->localhost:mysql (ESTABLISHED) python2.7 41294 lianghonghao 10u IPv6 0xb37e546feaec2979 0t0 TCP localhost:62502->localhost:mysql (ESTABLISHED)
果然,四个进程同时使用了 localhost:62502->localhost:mysql
这条 TCP 连接。
如果我们在 Worker 进程初始化之后手动关闭 DatabseWrapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @signals.worker_process_init.connect def worker_init (**kwargs) : import threading import os for conn in connections.all(): conn.close() conn.connect() logger.error( '\n <pid>: %s\n ' '---------------------------------- \n ' '<connection>: %s\n ' '<wrapper>: %s\n ' '---------------------------------- \n ' '<thread>: %s\n ' '<local>: %s\n ' '<handler>: %s\n ' '<parent pid>: %s \n ' '<allow_thread_sharing>: %s\n' % ( os.getpid(), conn.connection, conn, threading.currentThread(), threading.local(), connections, os.getppid(), conn.allow_thread_sharing))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 [2018-07-20 09:55:49,137: ERROR/Worker-2] <pid>: 42715 ---------------------------------- <connection>: <pymysql.connections.Connection object at 0x10fb87b50> <wrapper>: <django.db.backends.mysql.base.DatabaseWrapper object at 0x10f33a250> ---------------------------------- <thread>: <_MainThread(MainThread, started 140735510033280)> <local>: <thread._local object at 0x10fd904d0> <handler>: <django.db.utils.ConnectionHandler object at 0x10e18ad50> <parent pid>: 42705 <allow_thread_sharing>: False [2018-07-20 09:55:49,138: ERROR/Worker-3] <pid>: 42716 ---------------------------------- <connection>: <pymysql.connections.Connection object at 0x10fb87b50> <wrapper>: <django.db.backends.mysql.base.DatabaseWrapper object at 0x10f33a250> ---------------------------------- <thread>: <_MainThread(MainThread, started 140735510033280)> <local>: <thread._local object at 0x10fd914d0> <handler>: <django.db.utils.ConnectionHandler object at 0x10e18ad50> <parent pid>: 42705 <allow_thread_sharing>: False [2018-07-20 09:55:49,140: ERROR/Worker-1] <pid>: 42714 ---------------------------------- <connection>: <pymysql.connections.Connection object at 0x10fb87b50> <wrapper>: <django.db.backends.mysql.base.DatabaseWrapper object at 0x10f33a250> ---------------------------------- <thread>: <_MainThread(MainThread, started 140735510033280)> <local>: <thread._local object at 0x10fd904d0> <handler>: <django.db.utils.ConnectionHandler object at 0x10e18ad50> <parent pid>: 42705 <allow_thread_sharing>: False [2018-07-20 09:55:49,141: ERROR/Worker-4] <pid>: 42717 ---------------------------------- <connection>: <pymysql.connections.Connection object at 0x10fb87b50> <wrapper>: <django.db.backends.mysql.base.DatabaseWrapper object at 0x10f33a250> ---------------------------------- <thread>: <_MainThread(MainThread, started 140735510033280)> <local>: <thread._local object at 0x10fd914d0> <handler>: <django.db.utils.ConnectionHandler object at 0x10e18ad50> <parent pid>: 42705 <allow_thread_sharing>: False
查看 worker 进程打开的连接:
1 2 3 4 5 $ lsof -p 42714,42715,42716,42717 | grep :mysql python2.7 42714 lianghonghao 3u IPv6 0xb37e546fd9bde279 0t0 TCP localhost:62813->localhost:mysql (ESTABLISHED) python2.7 42715 lianghonghao 3u IPv6 0xb37e546feaec34f9 0t0 TCP localhost:62811->localhost:mysql (ESTABLISHED) python2.7 42716 lianghonghao 3u IPv6 0xb37e546feaec4079 0t0 TCP localhost:62812->localhost:mysql (ESTABLISHED) python2.7 42717 lianghonghao 3u IPv6 0xb37e546fd9bdff39 0t0 TCP localhost:62814->localhost:mysql (ESTABLISHED)
可以看到,在子进程初始化后断开并重新建立之后,就不会出现多个 worker 同时占用一个连接的情况了。
3.4 致命组合 首先,普及一个知识,Django 中是没有数据库连接池的,这就意味着每次请求都需要经历一次连接建立和连接断开的过程。
在 Django 1.6 版本之后,加入了一个新的功能:persistent connections ,开发者能够通过在数据库配置中设置 CONN_MAX_AGE
变量的值来指定一个数据库从建立到销毁前能够被保留的时间。至于 Django 为什么不采用数据库连接池而是使用连接持久化的方案,可以参考一下这个帖子中的讨论:Database pooling vs. persistent connections 。
Django 建立每个连接时都会将该连接到期的时间设置到 DatabaseWrapper
的 close_at
1 2 3 4 5 6 7 8 9 10 11 class BaseDatabaseWrapper (object) : def connect (self) : max_age = self.settings_dict['CONN_MAX_AGE' ] self.close_at = None if max_age is None else time.time() + max_age
Persistent connections avoid the overhead of re-establishing a connection to the database in each request. They’re controlled by the CONN_MAX_AGE
parameter which defines the maximum lifetime of a connection. It can be set independently for each database.
1 2 3 4 5 6 7 8 9 10 from django.core import signalsdef close_old_connections (**kwargs) : for conn in connections.all(): conn.close_if_unusable_or_obsolete() signals.request_started.connect(close_old_connections) signals.request_finished.connect(close_old_connections)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 class BaseDatabaseWrapper (object) : def close_if_unusable_or_obsolete (self) : if self.connection is not None : if self.get_autocommit() != self.settings_dict['AUTOCOMMIT' ]: self.close() return if self.errors_occurred: if self.is_usable(): self.errors_occurred = False else : self.close() return if self.close_at is not None and time.time() >= self.close_at: self.close() return def close (self) : self.validate_thread_sharing() if self.closed_in_transaction or self.connection is None : return try : self._close() finally : if self.in_atomic_block: self.closed_in_transaction = True self.needs_rollback = True else : self.connection = None def _close (self) : if self.connection is not None : with self.wrap_database_errors: return self.connection.close()
其实这个功能的出发点是好的,但是和 multiprocessing 结合使用,就有可能导致致命的错误,通过查阅资料和阅读 Celery 和 djcelery 的源码后,笔者发现两者都在某些时刻尝试去清理子进程从父进程处获得的数据库连接以保证正常运行,如果没有使用 persistent connections 功能,就能够在子进程初始化完成后 和任务开始执行前 成功关闭从父进程处继承来的连接,当子进程需要进行数据库操作时,就会重新建立新的连接,也就不会出现多个进程同时使用一条连接的情况。以下为 celery
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class DjangoWorkerFixup (object) : def __init__ (self, app) : try : self._close_old_connections = symbol_by_name( 'django.db:close_old_connections' , ) except (ImportError, AttributeError): self._close_old_connections = None def install (self) : signals.task_prerun.connect(self.on_task_prerun) signals.task_postrun.connect(self.on_task_postrun) self.close_database() self.close_cache() return self def on_task_prerun (self, sender, **kwargs) : """Called before every task.""" if not getattr(sender.request, 'is_eager' , False ): self.close_database() def close_database (self, **kwargs) : if self._close_old_connections: return self._close_old_connections()
以下为 djcelery
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class DjangoLoader (BaseLoader) : def on_process_cleanup (self) : """Does everything necessary for Django to work in a long-living, multiprocessing environment. """ self.close_database() self.close_cache() def on_task_init (self, task_id, task) : """Called before every task.""" try : is_eager = task.request.is_eager except AttributeError: is_eager = False if not is_eager: self.close_database() def _close_database (self) : try : funs = [conn.close for conn in db.connections] except AttributeError: if hasattr(db, 'close_old_connections' ): funs = [db.close_old_connections] else : funs = [db.close_connection] for close in funs: try : close() except DATABASE_ERRORS as exc: str_exc = str(exc) if 'closed' not in str_exc and 'not connected' not in str_exc: raise def close_database (self, **kwargs) : db_reuse_max = self.conf.get('CELERY_DB_REUSE_MAX' , None ) if not db_reuse_max: return self._close_database() if self._db_reuse >= db_reuse_max * 2 : self._db_reuse = 0 self._close_database() self._db_reuse += 1
而在笔者的数据库配置中,已经设置了 CONN_MAX_AGE
为 3600(一个连接从建立成功后有 3600s 的有效期),而 djcelery
和 celery
清理都是通过调用 django.db.close_old_connections()
来清理数据库连接的,正是因为该方法在检查没有出现异常的连接时,若该连接没有过期则不会进行关闭,才导致了 djcelery
和 celery
1 2 3 4 5 if self.close_at is not None and time.time() >= self.close_at: self.close() return
所以一个完美的 BUG 就这么出现了:
父进程在进行子进程初始化工作之前调用了 Django ORM 进行数据库操作,导致父进程中的 DatabaseWrapper
子进程被 fork 出来后,由于完全复制了父进程的内存数据,导致所有 worker 共享了同一个 MySQL 连接(同一个 socket file)。
djcelery 和 Celery 虽然在子进程初始化和任务开始时对子进程中的连接进行了清理,但是由于 persistent connections 功能的存在,导致数据库连接没有被关闭。
3.5 结论及解决方案 所以,Django 的 persistent connections 在与 multiprocessing
3.5.1 关闭 persistent connections 功能 关闭 persistent connections 功能,这样在子进程初始化完成后和任务开始前会把从父进程中继承过来的连接关闭。
3.5.2 手动关闭 Django ORM 中维护的连接 不关闭 persistent connections 功能,监听子进程初始化完成和任务开始的信号,并在收到信号时手动强制关闭当前进程中 Django ORM 的连接,实现如下:
1 2 3 4 5 6 7 8 9 10 11 from django.db import connections@signals.task_prerun.connect def task_prerun (**kwargs) : for conn in connections.all(): conn.close() @signals.worker_process_init.connect def worker_init (**kwargs) : for conn in connections.all(): conn.close()
3.5.4 使用其他的并发实现方式 目前,只有在多进程模式下使用 Django ORM 会出现上述问题,Django 通过 ThreadLocal 来管理不同线程所对应的数据库连接,如果不使用多进程模式就不会出现内存数据相同的问题,也就不会出现共享文件描述符的问题,而 Celery 提供了若干种并发的模式:eventlet, gevent, solo, threads,在能够满足功能性需求和非功能性需求的前提下,不妨考虑其他的并发模式。
4. 后记 这个 BUG 是两种框架的特性冲突而导致,但是 Django 官方文档在介绍 persistent connections 功能时并没有提醒开发者注意该事项,而出现错误后又很难定位问题,感觉在一定程度上对开发者有些不友好。