场景: 需要将从 ODPS 数仓中计算得到的大额可疑交易信息导入到业务系统的 mysql 中供业务系统审核. 最简单的方式是用阿里云的组件自动进行数据同步了. 但是本系统是开放是为了产品化, 要保证不同环境的可移植性, 同时同步的表也就 6 个表, 那么就利用现有的基于 jdbc 的规则引擎工程来自己实现数据的同步.
完整的工程代码可以参考我的 github https://github.com/intsmaze/SqlAdapter
JDBC 手动将一个库的数据导入到另一个数据库中, 如何避免人工映射操作, 提高开发效率
讨厌的方式
查询数据, 将结果映射到 javabean 对象中
- ps = conn.prepareStatement(select id,name,age from intsmaze);
- rs = ps.executeQuery();
- Intsmaze intsmaze = null;
- while (rs.next()) {
- intsmaze = new Intsmaze();
- intsmaze.setId(rs.getInt(1));
- intsmaze.setName(rs.getString(2));
- intsmaze.setAge(rs.getInt(3));
- }
添加数据, 将 javabean 对象字段映射到对应的表列
- String sql = "insert into intsmaze(id,name,age) values (?,?,?)";
- ps = conn.prepareStatement(sql);
- ps.setInt(1, intsmaze.getId());
- ps.setString(2, intsmaze.getName());
- ps.setInt(3, intsmaze.getAge());
- ps.executeUpdate();
使用 JDBC 大家都会使用上面的方式进行开发, 但是如果我们的表的字段有 50 个, 而且查询不能使用 select * from 必须指定列名呢? 添加数据不能使用 insert into intsmaze values() 必须指定插入的列名呢? 你干脆杀了我吧.
50 个字段你要做 2 次字段列名映射, 稍有不慎就会将字段列名映射到错误的位置, 导致最后数据错误, 最可怕的是, 还要编写 sql 语句, 如果后面有新增或删除列名, 那么你又要去看一眼映射关系, 看看是否影响到. 这就是一种费力却没有技术含量的事情, 而且还很容易出错. 下面就是我们要做的各种映射, 你真的很考验我的眼神.
- result.getObjectByName("FieldName")
- javabean.setFieldValue()
- PreparedStatement.setString(number,FieldVale)
根据 javabean 自动生成 insert,select 语句, 完成字段列名映射
根据 javabean 自动生成 insert,select 语句, 完成字段列名映射
当初开发时, 一看到这么多字段映射我烦躁不安, 然后花了半天用反射把代码重新编写了下, 后面有新的表要进行同步时, 用一个工具类生成 javabean 的 java 文件, 然后直接就在下面模板代码中替换 javabean 类就完成了数据同步, 整个操作 10 分钟搞定, 是不是很爽.
当然你可以引入 orm 框架, 但是除了 hibernate 框架, mybatis 框架虽然免去了 select 和 insert 的映射, 但是还是要编写前缀列名, 而且我就一个小工程, 我再引入 ORM 框架, 麻不麻烦啊, 有这时间还不如自己写一写.
- public class ModelServer extends ApplicationServer {
- private static final Logger logger = LoggerFactory.getLogger(ExportBlockCustomerServer.class);
- private final static String SQL = "get_customer_infor";
- private MysqlService<TestGroup> mysqlService = new MysqlService<TestGroup>();
- @Override
- public String[] getPaths() {
- return new String[] { "com/hand/service/exe/blocktrade/blocktrade.xml" };
- }
- private int date = 1;
- @Override
- public void addOptions(Options options) {
- options.addOption("date", true, "天数");
- }
- @Override
- public void setupOptionValue(CommandLine cmd) {
- date = Integer.parseInt(cmd.getOptionValue("date", "1"));
- logger.debug("date is {}", date);
- }
- public void service() throws Exception {
- mysqlService.setMysqlDao(this.getMysqlDao());
- AmlException exception = null;
- for (int i = 1; i <= date; i++) {
- String exeSql = (String) this.getSqlMap().get(SQL);
- Result result = this.getSqlAdapter().select(this.getDao(),"SELECT * from test_group");// 向 odps 数据仓库查询数据, 并导入到 mysql 中
- String[] names = FilesNameUtils.getFiledName(new TestGroup());// 得到这个 bean 类的所有字段名称, 要保证 bean 类的字段名称和数据库表的列名一致
- String insertSql = SqlUtils.getInsertSql("test_group", names);// 组装成 insert into test_group (id,cny,d,party_id,age) values (?,?,?,?,?) 语句
- List<TestGroup> list = new ArrayList<TestGroup>(100);
- int number = 0;
- while (result.hasNext()) {
- result.next();
- TestGroup br = (TestGroup) tableToBean(result, i,names);// 将 odps 查询的数据反射到 TestGroup 类中, 不用反射见重载函数
- list.add(br);
- number++;
- if (number % 100 == 0) {
- try {
- mysqlService.insert(insertSql, list, names);
- } catch (Exception e) {
- exception = new AmlException(e);
- } finally {
- list.clear();
- }
- }
- }
- try {
- mysqlService.insert(insertSql, list, names);
- logger.info("insert data number is {}", number);
- } catch (Exception e) {
- logger.info("insert data number is {}", number);
- exception = new AmlException(e);
- } finally {
- result.close();
- }
- }
- if (exception != null) {
- throw exception;
- }
- }
- public static void main(String[] args) throws Exception {
- ModelServer applicationServer = new ModelServer();
- applicationServer.run(args);
- logger.info("execute sucess......");
- System.exit(0);
- }
- private Object tableToBean(Result result, int i, String[] names) throws Exception {
- Class clazz = TestGroup.class;
- TestGroup testGroup = (TestGroup) clazz.newInstance();
- for (int j = 0; j <names.length; j++) {
- Object object = result.getObjectByName(names[j]);
- if (object instanceof String) {
- Field f = clazz.getDeclaredField(names[j]);
- f.setAccessible(true);
- f.set(testGroup, object);
- } else if (object instanceof Date) {
- Field f = clazz.getDeclaredField(names[j]);
- f.setAccessible(true);
- f.set(testGroup, new java.sql.Date(((Date)object).getTime()));
- }
- else if (object instanceof Long) {
- Field f = clazz.getDeclaredField(names[j]);
- f.setAccessible(true);
- f.set(testGroup, object);
- }
- }
- return testGroup;
- }
- private Bigamountreport tableToBean(Result result, int i)
- throws SQLException {
- Bigamountreport bigamountreport = new Bigamountreport();
- bigamountreport.setSeqno((String) result.getObjectByName("aml_id"));
- bigamountreport.setCustomerId((String) result
- .getObjectByName("party_id"));
...... 疯狂的 set 操作 return bigamountreport;
}
}
获得传入对象的字段名称的字符串数据, 为了拼接 sql 使用
- public static String[] getFiledName(Object o) {
- Field[] fields = o.getClass().getDeclaredFields();
- String[] fieldNames = new String[fields.length];
- for (int i = 0; i < fields.length; i++) {
- fieldNames[i] = fields[i].getName();
- }
- return fieldNames;
- }
拼接 insert 的 sql 语句
- public static String getInsertSql(String tableName ,String[] names)
- {
- String insertSql = StringUtils.join("insert into",tableName ,"(#{field_name}) values (#{field_value})");
- String fieldName="";
- String fieldValue="";
- for(int j = 0; j < names.length; j++)
- {
- if(j==names.length-1)
- {
- fieldName=StringUtils.join(fieldName,names[j]);
- fieldValue=StringUtils.join(fieldValue,"?");
- }
- else
- {
- fieldName=StringUtils.join(fieldName,names[j],",");
- fieldValue=StringUtils.join(fieldValue,"?",",");
- }
- }
- insertSql=insertSql.replace("#{field_name}", fieldName).replace("#{field_value}", fieldValue);
- logger.debug("the insert sql is :{}",insertSql);
- return insertSql;
- }
最重要的是 assembleBeantoPS 方法, 用于根据映射字段列名
- public class MysqlService<T> {
- private static final Logger logger = LoggerFactory.getLogger(MysqlService.class);
- private MysqlDao mysqlDao;
- public void assembleBeantoPS(PreparedStatement ps, int number,
- String FileName, Object bean) throws Exception {
- Type fileType = FilesNameUtils.getFieldType(FileName, bean);// 根据属性名称返回字段类型
- if (fileType == String.class) {
- ps.setString(number + 1,
- (String) FilesNameUtils.getFieldValueByName(FileName, bean));
- }
- else if ("long".equals(fileType.toString()+"")) {
- ps.setLong(number + 1,(Long) FilesNameUtils.getFieldValueByName(FileName, bean));
- }
- else if (fileType == Long.class) {
- ps.setLong(number + 1,(Long) FilesNameUtils.getFieldValueByName(FileName, bean));
- }
- else if (fileType == Date.class) {
- ps.setDate(number + 1,new java.sql.Date(((Date)FilesNameUtils.getFieldValueByName(FileName, bean)).getTime()));
- }
- }
- /**
- * @author:YangLiu
- * @date:2017 年 12 月 25 日 下午 3:52:20
- * @describe:
- */
- public void insert(String sql, List<T> list,
- String[] names) throws Exception {
- boolean iserror=false;
- PreparedStatement ps = null;
- try {
- ps = mysqlDao.getConnection().prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
- for (int i = 0; i <list.size(); i++) {
- T bigamount = list.get(i);
- try{
- for (int j = 0; j < names.length; j++) {
- assembleBeantoPS(ps, j, names[j], bigamount);
- }
- ps.executeUpdate();
- }catch (Exception e) {
- iserror=true;
- logger.error("插入数据发生错误, occur {}", e);
- logger.error("异常数据 {}", bigamount);
- }
- }
- } catch (Exception e) {
- mysqlDao.getInstance().free(null, ps, mysqlDao.getConnection());
- logger.error("the sql: {} occur {}", sql, e);
- throw new AmlException("mysql 建立连接时发生异常");
- } finally {
- mysqlDao.getInstance().free(null, ps);
- if(iserror)
- {
- throw new AmlException("向 mysql 中导入数据时发生异常");
- }
- }
- }
- /**
- * @deprecated
- * @author:YangLiu
- * @date:2017 年 12 月 25 日 下午 3:52:20
- * @describe:SB 写法
- */
- public boolean insertBatchBigamountrecord(String sql,
- List<Bigamountrecord> list) throws Exception {
- PreparedStatement ps = null;
- try {
- // dseqno,transOrgId,policyNo,antPolicyNo,periodPrem,transactionAmountCny,transactionAmountUsd
- ps = mysqlDao.getConnection().prepareStatement(sql,
- Statement.RETURN_GENERATED_KEYS);
- for (int i = 0; i < list.size(); i++) {
- Bigamountrecord bigamountrecord = list.get(i);
- int j = 1;
- ps.setString(j++, bigamountrecord.getDseqno());
- ps.setString(j++, bigamountrecord.getTransOrgId());
- ps.setString(j++, bigamountrecord.getPolicyNo());
...... 疯狂的 set 操作
- ps.addBatch();
- }
- ps.executeBatch();
- } catch (SQLException e) {
- mysqlDao.getInstance().free(null, ps, mysqlDao.getConnection());
- return false;
- } finally {
- mysqlDao.getInstance().free(null, ps);
- }
- return true;
- }
- }
来源: https://www.cnblogs.com/intsmaze/p/9172854.html