Spark分区写入MongoDb实现


场景

数据量两千万左右,数据列不固定,需要每天更新一次数据,使用MongoDB存储(其他存储可能更佳,此处不考虑)。数据使用方式:

  • 通过_id检索
  • 通过任意列(一列或多列)进行count查询

实现1:单表全量覆盖写入

spark任务每天全量写入MongoDB,并创建索引,数据写入耗时19分钟左右,构建稀疏索引耗时11分钟,如下:

labelData.selectExpr("user_id", "labels")
      .repartition(mongoParallelism)
      .foreachPartition(rows => {
        MongodbUtils.replaceBatch(mongodbUri, mongodbDb, labelTable, rows)
      })
...

val indexList = new util.ArrayList[IndexModel]()
LabelsConstants.labelFieldNames.foreach(fieldName => {
	indexList.add(new IndexModel(
		new BasicDBObject().append(fieldName, 1),
		new IndexOptions().sparse(true))
	)
})
collection.createIndexes(indexList)

存在问题如下:

  • 全列索引创建比较耗时(每列单独创建索引或者yo数组上创建通用索引)
  • count查询性能差(索引命中数据达百万,耗时1分以上)

方案2:增量分表更新

数据虽然有两千万,但实际每天变化数据100万左右,增量更新效果更好。并通过分表解决大数据量count性能查问题(线程池并发查询)

labelData.selectExpr("user_id", "labels")
      .rdd
      .map(row => (row.getAs[String]("user_id").last, row))
      .partitionBy(new Partitioner {
        override def numPartitions: Int = 16

        override def getPartition(key: Any): Int = {
          Integer.parseInt(key.toString, 16)
        }
      })
      .foreachPartition(rows => {
        MongodbUtils.replaceBatch(mongodbUri, mongodbDb, labelTablePrefix, rows)
      })

...

val row = next._2
val id = row.getAs[String]("user_id")
val labels = row.getAs[String]("labels")
val replaceDocument = new Document()
replaceDocument.append("_id", id)
labels.split(",").foreach(label => {
	val kv = label.split("\\.")
	replaceDocument.append(kv(0), kv(1).transfer)
})
val filterDocument = new Document()
filterDocument.append("_id", id)
documents.add(new ReplaceOneModel[Document](
	filterDocument,
	replaceDocument,
	replaceOptions))
if (documents.size() % maxBatchSize == 0) {
	collection.bulkWrite(documents, bulkWriteOptions)
	documents.clear()
}

user id最后一位对应16进制的0~F,且数据比较均衡,单表更新10万以内,1分钟内更新完毕。count并发查询汇总耗时5秒左右(可优化)

注意事项:

  • repartition:hash分区,可指定hash列,hash后和分区数求余
  • repartitionByRange:范围分区,通过指定列的值采样结果分区
  • partitionBy:自定义分区实现,PairRDDFunctions的方法,只适用于【k,v】类型的rdd

参考:

  • Spark中的分区方法详解
  • 无法使用partitionBy()