rdd.foreachPartition {x => {val mongoURI = new MongoClientURI(uri) val mongo = new MongoClient(mongoURI) val db = mongo.getDatabase("wenshu") val dbColl = db.getCollection("testbackup") val mongoURI2 = new MongoClientURI(uri2) val mongo2 = new MongoClient(mongoURI2) val db2 = mongo2.getDatabase("wenshu") val dbColl2 = db2.getCollection(backName) var count = 0 var resList = new ArrayList[Document] x.foreach(y => {// count = count + 1// resList add y try{ dbColl.replaceOne(eqq("_id", y.get("_id")), y, new UpdateOptions().upsert(true)) dbColl2.insertOne(y) }catch{case e: Throwable => e.printStackTrace()} // 使用这种方式插入会导致插入的数据和真实数据数量对应不上, 先注释掉有机会再找原因 // if (count == 10000){// try{ // dbColl2.insertMany(resList, new InsertManyOptions().ordered(false))// }catch{// case e: Throwable => e.printStackTrace()//}// resList.clear// count = 0// } })// if (count> 0)// try{// dbColl2.insertMany(resList, new InsertManyOptions().ordered(false))// }catch{// case e: Throwable => e.printStackTrace()//}
来源: http://www.linuxidc.com/Linux/2017-12/149535.htm