Spark大数据量写入Mysql效率问题
背景
数据列不固定,每次全量覆盖数据到Mysql,涉及到数据表结构的变更,需要调整自动创建数据表结构
方案1:DataFrameWriter.jdbc
使用spark原生提供的DataFrameWriter.jdbc,参考代码如下:
/**
* 数据覆盖写入指定mysql表
* 批量读写参数设置参考:https://bbs.huaweicloud.com/blogs/210624
*
* @param dataFrame 数据集
* @param tableName mysql数据表名
* @param parallelism 插入并行度
*/
def override2Mysql(dataFrame: DataFrame, tableName: String, parallelism: Int): Unit = {
val property = new Properties()
property.put("user", jdbcUsername)
property.put("password", jdbcPassword)
property.put("driver", mysqlJdbcDriver)
dataFrame.repartition(parallelism).write.option("truncate", value = false).mode(SaveMode.Overwrite).jdbc(jdbcUrl, tableName, property)
}
实验如下,100万数据,并行度设置为10,插入需要26分钟。
疑问?为什么这么慢?难道不是批量插入吗?
查看DataFrameWriter.jdbc,在connectionProperties中可以设置batchsize,默认已经是1000,实验调整batchsize,没什么变化,没搞明白why?
同时,发现DataFrameWriter.jdbc自动删除并创建表存在数据类型映射的问题:spark中数据类型分类没有那么细,String类型映射到Mysql中统一转化为text类型。而text类型创建索引,必须设置前缀前缀长度,不利于索引创建。可通过在option中设置truncate=true解决,手动创建数据吧,每次写入时截断数据表,然后再插入,但不能解决数据插入效率问题。
方案2:自定义批量插入
数据插入前对数据进行分区,每个分区自定义一个数据库连接,批量插入数据,参考代码如下:
private def saveXXX(xxxData: DataFrame, nacosConfig: NacosConfig): Unit = {
val mysqlUrl = nacosConfig.getValue("user_label_mysql_url")
val mysqlUser = nacosConfig.getValue("user_label_mysql_username")
val mysqlPsw = nacosConfig.getValue("user_label_mysql_psw")
val mysqlParallelism = nacosConfig.getValue("mysql_parallelism").toInt
val jdbcUtils = JdbcUtils.build(mysqlUrl, mysqlUser, mysqlPsw)
jdbcUtils.runSql(s"truncate table $labelTable", null)
val sql=
s"""
|insert into xxx () values()
|""".stripMargin
xxxData.repartition(mysqlParallelism).foreachPartition(rows => {
jdbcUtils.insert(sql, rows)
})
}
def insert(sql: String, rows: Iterator[Row]): Unit = {
val conn = getMysqlConn(jdbcUrl, jdbcUsername, jdbcPassword)
val pstat: PreparedStatement = conn.prepareStatement(sql)
try {
var size = 0
while (rows.hasNext) {
val row: Row = rows.next()
val len = row.length
for (i <- 0 until len) {
pstat.setObject(i + 1, row.get(i))
}
pstat.addBatch()
size += 1
if (size % maxBatchSize == 0) {
pstat.executeBatch()
lgr.info("=======批量插入数据成功,数量是[{}]=======", size)
size = 0
}
}
if (size > 0) {
pstat.executeBatch()
lgr.info("=======批量插入数据成功,数量是[{}]=======", size)
}
} finally {
pstat.close()
conn.close()
}
}
同样100万数据,划分为10个分区并行插入,每次批量10000条数据,插入耗时2分钟。需要注意的并行度和batchSize不能过大,避免影响数据库正常使用。
但是,上述方案未解决数据表结构变化的问题。可根据dataFrame所属的schema.fields自动生成创建表语句,先执行drop table xxx if exists
,然后替换上述语句中truncate未自动生成的创建表语句即可。