代码:
- from functools import wraps
- import mysql.connector
- from sshtunnel import SSHTunnelForwarder
- def singleton(cls):
- instances = {}
- @wraps(cls)
- def get_instance(*args, **kw):
- if cls not in instances:
- instances[cls] = cls(*args, **kw)
- return instances[cls]
- return get_instance
- # 数据库连接实例
- @singleton
- class MySQLSingle(object):
- def __init__(self, conn='', server=''):
- self.conn = conn
- self.server = server
- def get_conn(self, host_jump, port_jump, ssh_pk_jump, user_name_jump, host_mysql, port_mysql, user_name_mysql, password_mysql, database):
- try:
- self.server = SSHTunnelForwarder(
- (host_jump, int(port_jump)), # 跳板机的配置
- ssh_pkey=ssh_pk_jump,
- ssh_username=user_name_jump,
- remote_bind_address=(host_mysql, int(port_mysql))) # 数据库服务器的配置
- self.server.start()
- self.conn = mysql.connector.connect(host='127.0.0.1', port=self.server.local_bind_port, user=user_name_mysql,
- password=password_mysql, database=database)
- except Exception as e:
- print('File to connect database: %s' % e)
- return self.conn, self.server
使用方法如下:
- mysql_single = MySQLSingle()
- conn, server = mysql_single.get_conn(host_jump, port_jump, ssh_pk_jump, user_name_jump, host_mysql, port_mysql,
- user_name_mysql, password_mysql, database)
- # do something...
- conn.close()
- server.close()
来源: http://www.bubuko.com/infodetail-2769612.html