这里有新鲜出炉的 MongoDB 教程,程序狗速度看过来!
MongoDB 是一个基于分布式文件存储的数据库。由 C++ 语言编写。旨在为 web 应用提供可扩展的高性能数据存储解决方案。
MongoDB 的 Replication 是通过一个日志来存储写操作的,这个日志就叫做 oplog,而下面这篇文章主要给大家介绍了利用 MongoDB 中 oplog 机制实现准实时数据的操作监控的相关资料,需要的朋友可以参考借鉴,下面来一起看看吧。
前言
最近有一个需求是要实时获取到新插入到 MongoDB 的数据,而插入程序本身已经有一套处理逻辑,所以不方便直接在插入程序里写相关程序,传统的数据库大多自带这种触发器机制,但是 Mongo 没有相关的函数可以用(也可能我了解的太少了,求纠正),当然还有一点是需要 python 实现,于是收集整理了一个相应的实现方法。
一、引子
首先可以想到,这种需求其实很像数据库的主从备份机制,从数据库之所以能够同步主库是因为存在某些指标来做控制,我们知道 MongoDB 虽然没有现成触发器,但是它能够实现主从备份,所以我们就从它的主从备份机制入手。
二、OPLOG
首先,需要以 master 模式来打开 mongod 守护,命令行使用–master, 或者配置文件增加 master 键为 true。
此时,我们可以在 Mongo 的系统库 local 里见到新增的 collection——oplog,此时
里就会存储进 oplog 信息,如果此时还有充当从数据库的 Mongo 存在,就会还有一些 slaves 的信息,由于我们这里并不是主从同步,所以不存在这些集合。
- oplog.$main
再来看看 oplog 结构:
- "ts": Timestamp(6417682881216249, 1),
- 时间戳"h": NumberLong(0),
- 长度"v": 2,
- "op": "n",
- 操作类型"ns": "",
- 操作的库和集合"o2": "_id"update条件"o": {}操作值,即document
这里需要知道 op 的几种属性:
- insert,'i'update,
- 'u'remove(delete),
- 'd'cmd,
- 'c'noop,
- 'n'空操作
从上面的信息可以看出,我们只要不断读取到 ts 来做对比,然后根据 op 即可判断当前出现的是什么操作,相当于使用程序实现了一个从数据库的接收端。
三、CODE
在 Github 上找到了别人的实现方式,不过它的函数库太老旧,所以在他的基础上进行修改。
Github 地址:https://github.com/RedBeard0531/mongo-oplog-watcher
mongo_oplog_watcher.py 如下:
- # ! /usr/bin / python import pymongo import re import time from pprint import pprint#pretty printer from pymongo.errors import AutoReconnect
- class OplogWatcher(object) : def __init__(self, db = None, collection = None, poll_time = 1.0, connection = None, start_now = True) : if collection is not None: if db is None: raise ValueError('must specify db if you specify a collection') self._ns_filter = db + '.' + collection elif db is not None: self._ns_filter = re.compile(r '^%s\.' % db)
- else: self._ns_filter = None
- self.poll_time = poll_time self.connection = connection or pymongo.Connection()
- if start_now: self.start()
- @staticmethod def __get_id(op) : id = None o2 = op.get('o2') if o2 is not None: id = o2.get('_id')
- if id is None: id = op['o'].get('_id')
- return id
- def start(self) : oplog = self.connection.local['oplog.$main'] ts = oplog.find().sort('$natural', -1)[0]['ts']
- while True: if self._ns_filter is None: filter = {} else: filter = {
- 'ns': self._ns_filter
- }
- filter['ts'] = {
- '$gt': ts
- }
- try: cursor = oplog.find(filter, tailable = True) while True: for op in cursor: ts = op['ts'] id = self.__get_id(op) self.all_with_noop(ns = op['ns'], ts = ts, op = op['op'], id = id, raw = op) time.sleep(self.poll_time) if not cursor.alive: break except AutoReconnect: time.sleep(self.poll_time)
- def all_with_noop(self, ns, ts, op, id, raw) : if op == 'n': self.noop(ts = ts)
- else: self.all(ns = ns, ts = ts, op = op, id = id, raw = raw)
- def all(self, ns, ts, op, id, raw) : if op == 'i': self.insert(ns = ns, ts = ts, id = id, obj = raw['o'], raw = raw) elif op == 'u': self.update(ns = ns, ts = ts, id = id, mod = raw['o'], raw = raw) elif op == 'd': self.delete(ns = ns, ts = ts, id = id, raw = raw) elif op == 'c': self.command(ns = ns, ts = ts, cmd = raw['o'], raw = raw) elif op == 'db': self.db_declare(ns = ns, ts = ts, raw = raw)
- def noop(self, ts) : pass
- def insert(self, ns, ts, id, obj, raw, **kw) : pass
- def update(self, ns, ts, id, mod, raw, **kw) : pass
- def delete(self, ns, ts, id, raw, **kw) : pass
- def command(self, ns, ts, cmd, raw, **kw) : pass
- def db_declare(self, ns, ts, **kw) : pass
- class OplogPrinter(OplogWatcher) : def all(self, **kw) : pprint(kw) print#newline
- if __name__ == '__main__': OplogPrinter()
首先是实现一个数据库的初始化,设定一个延迟时间(准实时):
- self.poll_time = poll_time self.connection = connection or pymongo.MongoClient()
主要的函数是
, 实现一个时间的比对并进行相应字段的处理:
- start()
- def start(self) : oplog = self.connection.local['oplog.$main']#读取之前提到的库ts = oplog.find().sort('$natural', -1)[0]['ts']#获取一个时间边际
- while True: if self._ns_filter is None: filter = {} else: filter = {
- 'ns': self._ns_filter
- }
- filter['ts'] = {
- '$gt': ts
- }
- try: cursor = oplog.find(filter)#对此时间之后的进行处理
- while True: for op in cursor: ts = op['ts'] id = self.__get_id(op) self.all_with_noop(ns = op['ns'], ts = ts, op = op['op'], id = id, raw = op)#可以指定处理插入监控,更新监控或者删除监控等time.sleep(self.poll_time) if not cursor.alive: break except AutoReconnect: time.sleep(self.poll_time)
循环这个 start 函数,在 all_with_noop 这里就可以编写相应的监控处理逻辑。
这样就可以实现一个简易的准实时 Mongo 数据库操作监控器,下一步就可以配合其他操作来对新入库的程序进行相应处理。
总结
来源: http://www.phperz.com/article/17/0717/335292.html