背景
最近新上线的一个服务,偶尔会有超时告警,其主要逻辑仅仅只是简单的读/写mongodb,而且服务上线初期,流量并不大,因而理论上来说,每次请求都应该很快才对,事实上分析日志也证实90%以上的请求都在100ms内返回,大部分请求耗时都在10ms内,但是依然有1%不到的请求会显示耗时超过1s,极端个例耗时可达2-3s,这几天相对比较有空,于是决心仔细研究一下原因,最终定位到是由于对mongoegine的model机制中的QuerySet使用机制不够了解而踩坑了,这里记录一下。
问题浮现
mongoengine的使用非常简单,用户通过简单的定义一个继承于mongonengine.Document的子类,即可方便的通过该类实现对对应mongodb中collection的各种操作,官方tutorial中的一个例子:
首先需要告诉程序需要连接到哪个mongod实例,使用下面的connect函数:
from mongoengine import * connect(‘tumblelog‘)
在不显示指定host和port的情况下,默认连接localhost的27017端口,显示指定则加上host和port参数即可:
connect(‘project1‘, username=‘webapp‘, password=‘pwd123‘)
定义下面一个User类,即可访问对应mongod数据库中的user colletion了。
from datetime import datetimeclass User(Document): email = StringField(required=True) first_name = StringField(max_length=50) last_name = StringField(max_length=50) ctime = DateTimeField(default=datetime.utcnow)
如果要获取mongodb中first_name为Jack的记录,一下代码即可轻松实现:
records = User.objects(first_name=‘Jack‘).order_by(‘ctime‘)
而后通过获取到的records就可以对其进行各种操作了:
if not records: # 判断数据是否为空 if len(records) > 1: # 判断记录条数是否符合要求 pk = records[0].pk # 获取首条记录的pk pk = records[0][‘ctime‘] # 获取收条记录的创建时间
这样的使用逻辑是没有问题的,从业务逻辑上来说完全可以达到编码者的目的,然而却存在一个耗时较高、对mongodb访问过于平凡的隐藏坑。
之前一直以为向上面的一段代码,程序在开始定义records时,即已经通过User.objects指定查找和排序条件执行完将记录结果赋值给了records,因而后面的records使用都是直接访问已经存在本地的返回结果进行的操作,然而事实上,上面这段代码,每一行都会触发一次对mongodb实时网络访问,总共是四次网络访问,至于records定义赋值的哪一行,反而没有对mongodb进行访问,远超多次的网络访问,不但增加了服务本身处理请求的总耗时,使其对网络和mongodb的波动更加敏感,而且使mongodb的访问量毫无必要的增加了好几倍,危害甚大。
mongoengine的这种表现,着实有些出乎自己的直觉预期,在网上尝试找寻相关资料,却并没有找到关于这一问题描述的合适资料,于是深究了一下其源码。
源码剖析
首先要弄清楚的是,为什么这行代码不会马上触发网络访问,从mongodb获取到按ctime排序的查询结果呢?
records = User.objects(first_name=‘Jack‘).order_by(‘ctime‘) #records实际类型为:<class ‘mongoengine.queryset.queryset.QuerySet‘>
其实这行代码的赋值对象records命名存在误导性,让人容易误以为返回的结果就是已经按条件查询好的一个记录list,然而实际返回的对象并非如此,而是一个QuerySet,这个QuerySet对象的创建,并不需要去远程调用mongodb,而只是把查询的相关条件(first_name=‘Jack‘、order_by排序条件等)记录在QuerySet对象之中,在后面真正需要访问具体的记录属性时,才会根据条件去远程查询mongodb。
在QuerySet这个类之中,重载了对QuerySet对象执行len()函数时的行为,相关代码如下:
#source file: mongoengine/queryset/queryset.pyclass QuerySet(BaseQuerySet): """The default queryset, that builds queries and handles a set of results returned from a query. Wraps a MongoDB cursor, providing :class:`~mongoengine.Document` objects as the results.... def __len__(self): """Since __len__ is called quite frequently (for example, as part of list(qs) we populate the result cache and cache the length. """ if self._len is not None: return self._len if self._has_more: # populate the cache list(self._iter_results()) self._len = len(self._result_cache) return self._len
QuerySet继承于BaseQuerySet,其中定义了根据下标/slice语法访问对象行为的内部函数__getitem__()、对象转化为布尔值时的执行的内部函数(__nonzero__/__bool__),以及使对象具备可调用属性的__call__函数。
# source file: mongoengine/queryset/base.pyclass BaseQuerySet(object): """A set of results returned from a query. Wraps a MongoDB cursor, providing :class:`~mongoengine.Document` objects as the results. """ ... def __init__(self, document, collection): self._document = document self._collection_obj = collection self._mongo_query = None self._query_obj = Q() self._initial_query = {} self._where_clause = None self._loaded_fields = QueryFieldList() self._ordering = None self._snapshot = False self._timeout = True self._class_check = True self._slave_okay = False self._read_preference = None self._iter = False self._scalar = [] self._none = False self._as_pymongo = False self._as_pymongo_coerce = False self._search_text = None # If inheritance is allowed, only return instances and instances of # subclasses of the class being used if document._meta.get(‘allow_inheritance‘) is True: if len(self._document._subclasses) == 1: self._initial_query = {"_cls": self._document._subclasses[0]} else: self._initial_query = { "_cls": {"$in": self._document._subclasses}} self._loaded_fields = QueryFieldList(always_include=[‘_cls‘]) self._cursor_obj = None self._limit = None self._skip = None self._hint = -1 # Using -1 as None is a valid value for hint self.only_fields = [] self._max_time_ms = None def __call__(self, q_obj=None, class_check=True, read_preference=None, **query): """Filter the selected documents by calling the :class:`~mongoengine.queryset.QuerySet` with a query. :param q_obj: a :class:`~mongoengine.queryset.Q` object to be used in the query; the :class:`~mongoengine.queryset.QuerySet` is filtered multiple times with different :class:`~mongoengine.queryset.Q` objects, only the last one will be used :param class_check: If set to False bypass class name check when querying collection :param read_preference: if set, overrides connection-level read_preference from `ReplicaSetConnection`. :param query: Django-style query keyword arguments """ query = Q(**query) if q_obj: # make sure proper query object is passed if not isinstance(q_obj, QNode): msg = ("Not a query object: %s. " "Did you intend to use key=value?" % q_obj) raise InvalidQueryError(msg) query &= q_obj if read_preference is None: queryset = self.clone() else: # Use the clone provided when setting read_preference queryset = self.read_preference(read_preference) queryset._query_obj &= query queryset._mongo_query = None queryset._cursor_obj = None queryset._class_check = class_check return queryset... def __getitem__(self, key): """Support skip and limit using getitem and slicing syntax. """ queryset = self.clone() # Slice provided if isinstance(key, slice): try: queryset._cursor_obj = queryset._cursor[key] queryset._skip, queryset._limit = key.start, key.stop if key.start and key.stop: queryset._limit = key.stop - key.start except IndexError, err: # PyMongo raises an error if key.start == key.stop, catch it, # bin it, kill it. start = key.start or 0 if start >= 0 and key.stop >= 0 and key.step is None: if start == key.stop: queryset.limit(0) queryset._skip = key.start queryset._limit = key.stop - start return queryset raise err # Allow further QuerySet modifications to be performed return queryset # Integer index provided elif isinstance(key, int): if queryset._scalar: return queryset._get_scalar( queryset._document._from_son(queryset._cursor[key], _auto_dereference=self._auto_dereference, only_fields=self.only_fields)) if queryset._as_pymongo: return queryset._get_as_pymongo(queryset._cursor[key]) return queryset._document._from_son(queryset._cursor[key], _auto_dereference=self._auto_dereference, only_fields=self.only_fields) raise AttributeError... def _has_data(self): """ Retrieves whether cursor has any data. """ queryset = self.order_by() return False if queryset.first() is None else True def __nonzero__(self): """ Avoid to open all records in an if stmt in Py2. """ return self._has_data() def __bool__(self): """ Avoid to open all records in an if stmt in Py3. """ return self._has_data() # Core functions def all(self): """Returns all documents.""" return self.__call__() def filter(self, *q_objs, **query): """An alias of :meth:`~mongoengine.queryset.QuerySet.__call__` """ return self.__call__(*q_objs, **query)... @property def _cursor(self): if self._cursor_obj is None: # In PyMongo 3+, we define the read preference on a collection # level, not a cursor level. Thus, we need to get a cloned # collection object using `with_options` first. if IS_PYMONGO_3 and self._read_preference is not None: self._cursor_obj = self._collection\ .with_options(read_preference=self._read_preference)\ .find(self._query, **self._cursor_args) else: self._cursor_obj = self._collection.find(self._query, **self._cursor_args) # Apply where clauses to cursor if self._where_clause: where_clause = self._sub_js_fields(self._where_clause) self._cursor_obj.where(where_clause) if self._ordering: # Apply query ordering self._cursor_obj.sort(self._ordering) elif self._ordering is None and self._document._meta[‘ordering‘]: # Otherwise, apply the ordering from the document model, unless # it‘s been explicitly cleared via order_by with no arguments order = self._get_order_by(self._document._meta[‘ordering‘]) self._cursor_obj.sort(order) if self._limit is not None: self._cursor_obj.limit(self._limit) if self._skip is not None: self._cursor_obj.skip(self._skip) if self._hint != -1: self._cursor_obj.hint(self._hint) return self._cursor_obj ...
从代码可以看出来了,实际QuerySet对象创建时并不会立即去查询mongodb获取结果,而是在真正使用时,根据重载的使用行为再去查询mongodb,其查询mongodb的相关代码位于BaseQuerySet基类的_cursor函数中。
所以对于User.objects返回结果其实更好的命名风格应该是:
query_set = User.objects(first_name=‘Jack‘).order_by(‘ctime‘) or qs = User.objects(first_name=‘Jack‘).order_by(‘ctime‘)
而为了避免之后对queryset的直接操作导致预期之外的多次网络请求mongodb,在获取query_set后,可以直接执行一次查询将其所有查询到的结果单独存到一个本地list对象中:
records = [rc for rc in qs]
之后对records操作就不会再触发对mongodb的网络访问了,这么做相比之前唯一的缺陷就是如果本地使用records时耗时过长,可能导致本地数据和mongodb端不一致,不过对于绝大部分场景,查询完mongodb后,对其结果的引用应该会在很短的时间内完成,基本不会存在不一致的问题,应用者根据场景评估下即可。