- #coding=utf-8
- from _mysql import IntegrityError
- import logging
- import traceback
- import MySQLdb
- import datetime
- #========================数据库配置信息================
- HOST = "192.168.1.240"
- USER = "root"
- PWD = "root"
- DB = "portal"
- #需要复制数据的表名称 (源表,目标表),运行时需检查数据库中是否已经存在这些表
- SOURCE_TARGET = [
- ("details_cftaccountdetail", "idata_caccountdetail"),
- ("details_cftdealdetail", "idata_cdealdetail"),
- ("details_cftorderdetail", "idata_corderdetail"),
- ("details_cftpositiondetail", "idata_cpositiondetail"),
- ("details_cftpositionstatics", "idata_cpositionstatics"),
- ]
- # 查询一次的最大数据量
- MAX_LIMIT = 10000
- #=====================logger 配置===================
- # 创建一个logger
- logger = logging.getLogger('mylogger')
- # 日志级别: logging.DEBUG, logging.INFO, logging.ERROR
- logger.setLevel(logging.INFO)
- #禁止向控制台输出日志
- logger.disabled = 0
- # 创建一个handler,用于写入日志文件
- fh = logging.FileHandler('copy_data.log')
- fh.setLevel(logging.DEBUG)
- # 再创建一个handler,用于输出到控制台
- ch = logging.StreamHandler()
- ch.setLevel(logging.DEBUG)
- # 给logger添加handler
- logger.addHandler(fh)
- logger.addHandler(ch)
- # 主键分隔符
- KEY_SPLITTER = "____"
- # 表字段结构语句模板
- COLUMNS_SQL = "select COLUMN_NAME from information_schema.columns where table_name='%s' AND \\
- TABLE_SCHEMA='ttmgrportal' order by COLUMN_NAME ASC;"
- # 插入语句模板
- INSERT_SQL = "insert into %s (%s) values (%s)"
- def get_conn_and_cursor():
- """获取数据库连接"""
- conn = MySQLdb.connect(host=HOST, user=USER, passwd=PWD, db=DB, charset="utf8")
- return conn, conn.cursor()
- def close_conn(cursor, conn):
- """关闭数据库连接"""
- try:
- if conn and cursor:
- cursor.close()
- conn.close()
- except Exception:
- traceback.print_exc()
- def get_time():
- """获取当前时间"""
- return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- class DetailClass(object):
- def __init__(self, **kwargs):
- for k, v in kwargs.iteritems():
- setattr(self, k, v)
- class IdataClass(object):
- def __init__(self, table, columns, detail):
- self.table = table
- self.columns = columns
- self.columns_str = ",".join(self.columns)
- self.detail = detail
- self.set_attrs()
- def gen_idata_key(self):
- """组装idata表的主键,由detail的 m_strTagKey + KEY_SPLITTER +m_strTradingDay 组成"""
- if not self.detail:
- return None
- return str(getattr(self.detail, 'm_strTagKey', '')) + KEY_SPLITTER \\
- + str(getattr(self.detail, 'm_strTradingDay', ''))
- def set_attrs(self):
- """设置Idata表字段的值,可以在里面添加对特殊字段的处理,如 brokerID"""
- for col in self.columns:
- key = getattr(self.detail, col, None)
- if col == "m_priKey_tag":
- key = self.gen_idata_key()
- if not key:
- raise Exception("idata key gen error!!!!!!")
- elif col == "m_strBrokerID":
- key = getattr(self.detail, "m_nBrokerID", None)
- setattr(self, col, key)
- def insert_sql(self):
- """创建insert sql"""
- values = []
- for col in self.columns:
- val = getattr(self, col, None)
- if val is None:
- values.append("null")
- else:
- values.append("'%s'" % val)
- values = ",".join(values)
- sql = INSERT_SQL % (self.table, self.columns_str, values)
- return sql
- class Main(object):
- """复制数据的主类,对象调用copy_to_idata()即可实现复制"""
- def __init__(self, conn, cursor, source_table, target_table):
- """复制数据的主类初始化函数"""
- self.conn, self.cursor = conn, cursor
- self.source_table = ""
- self.target_table = ""
- self.source_table = source_table
- self.target_table = target_table
- self.source_columns = self.get_columns(self.source_table)
- self.target_columns = self.get_columns(self.target_table)
- self.query_columns = ",".join(self.source_columns)
- def get_columns(self, table_name):
- """获取某表的所有列名"""
- self.cursor.execute(COLUMNS_SQL % table_name)
- columns = []
- for col, in self.cursor.fetchall():
- columns.append(col)
- return columns
- def query_source(self, start, rows):
- """
- 从源表中查询列的数据
- start: 查询的开始
- rows : 一次查询最大多少钱行数据
- """
- self.cursor.execute("select %s from %s order by id limit %d, %d" %
- (self.query_columns, self.source_table, start, rows))
- source_datas = []
- for row in self.cursor.fetchall():
- source_datas.append(row)
- return source_datas
- def make_detail_instances(self, start, rows):
- """将查询出的数据转换成DetailClass对象,并放进列表里"""
- source_datas = self.query_source(start, rows)
- detail_instances = []
- for data in source_datas:
- if data:
- dc = DetailClass(**dict(zip(self.source_columns, data)))
- detail_instances.append(dc)
- return detail_instances
- def count_total(self):
- """统计表数据总数"""
- self.cursor.execute("select count(id) from %s" % self.source_table)
- num = self.cursor.fetchone()
- return num[0]
- def limits(self):
- """查询的分段限制"""
- total = self.count_total()
- mo = total % MAX_LIMIT
- li = [i * MAX_LIMIT for i in range(0, total / MAX_LIMIT)]
- if mo != 0:
- li.append(total / MAX_LIMIT * MAX_LIMIT)
- return total, li
- def copy_to_idata(self):
- """主要方法"""
- start_time = get_time()
- logger.info("source_table: %s \\ntarget_table: %s\\nstart at %s" %
- (self.source_table, self.target_table, start_time))
- total, limits = self.limits()
- logger.info("total: %s " % int(total))
- for p, start in enumerate(limits):
- logger.info("%s. processing: %s --- %s, please wait....." % (p, start, start + MAX_LIMIT))
- details = self.make_detail_instances(start, MAX_LIMIT)
- for i, detail in enumerate(details):
- idata = IdataClass(self.target_table, self.target_columns, detail)
- sql = idata.insert_sql()
- try:
- self.cursor.execute(sql)
- logger.debug("Success: %s, sql: %s" % (i, sql))
- except IntegrityError, ie:
- logger.error(" Warning: Duplicate key 'PRIMARY, ignore this sql : %s " % sql)
- self.conn.commit()
- end_time = get_time()
- logger.info("Done.....\\nsource_table: %s\\ntarget_table: %s\\nend at %s" %
- (self.source_table, self.target_table, end_time))
- if __name__ == "__main__":
- conn, cursor, target_table = None, None, None
- try:
- conn, cursor = get_conn_and_cursor()
- for source, target in SOURCE_TARGET:
- target_table = target
- m = Main(conn, cursor, source, target)
- m.copy_to_idata()
- except Exception, e:
- traceback.print_exc()
- if target_table and conn and cursor:
- cursor.execute("truncate table %s" % target_table)
- finally:
- close_conn(conn, cursor)
- #该片段来自于http://www.codesnippet.cn/detail/221020136562.html
来源: http://www.codesnippet.cn/detail/221020136562.html