Spark分区写入MongoDb实现
场景
数据量两千万左右,数据列不固定,需要每天更新一次数据,使用MongoDB存储(其他存储可能更佳,此处不考虑)。数据使用方式:
- 通过_id检索
- 通过任意列(一列或多列)进行count查询
实现1:单表全量覆盖写入
spark任务每天全量写入MongoDB,并创建索引,数据写入耗时19分钟左右,构建稀疏索引耗时11分钟,如下:
labelData.selectExpr("user_id", "labels")
.repartition(mongoParallelism)
.foreachPartition(rows => {
MongodbUtils.replaceBatch(mongodbUri, mongodbDb, labelTable, rows)
})
...
val indexList = new util.ArrayList[IndexModel]()
LabelsConstants.labelFieldNames.foreach(fieldName => {
indexList.add(new IndexModel(
new BasicDBObject().append(fieldName, 1),
new IndexOptions().sparse(true))
)
})
collection.createIndexes(indexList)
存在问题如下:
- 全列索引创建比较耗时(每列单独创建索引或者yo数组上创建通用索引)
- count查询性能差(索引命中数据达百万,耗时1分以上)
方案2:增量分表更新
数据虽然有两千万,但实际每天变化数据100万左右,增量更新效果更好。并通过分表解决大数据量count性能查问题(线程池并发查询)
labelData.selectExpr("user_id", "labels")
.rdd
.map(row => (row.getAs[String]("user_id").last, row))
.partitionBy(new Partitioner {
override def numPartitions: Int = 16
override def getPartition(key: Any): Int = {
Integer.parseInt(key.toString, 16)
}
})
.foreachPartition(rows => {
MongodbUtils.replaceBatch(mongodbUri, mongodbDb, labelTablePrefix, rows)
})
...
val row = next._2
val id = row.getAs[String]("user_id")
val labels = row.getAs[String]("labels")
val replaceDocument = new Document()
replaceDocument.append("_id", id)
labels.split(",").foreach(label => {
val kv = label.split("\\.")
replaceDocument.append(kv(0), kv(1).transfer)
})
val filterDocument = new Document()
filterDocument.append("_id", id)
documents.add(new ReplaceOneModel[Document](
filterDocument,
replaceDocument,
replaceOptions))
if (documents.size() % maxBatchSize == 0) {
collection.bulkWrite(documents, bulkWriteOptions)
documents.clear()
}
user id最后一位对应16进制的0~F,且数据比较均衡,单表更新10万以内,1分钟内更新完毕。count并发查询汇总耗时5秒左右(可优化)
注意事项:
- repartition:hash分区,可指定hash列,hash后和分区数求余
- repartitionByRange:范围分区,通过指定列的值采样结果分区
- partitionBy:自定义分区实现,PairRDDFunctions的方法,只适用于【k,v】类型的rdd
参考:
- Spark中的分区方法详解
- 无法使用partitionBy()