Spark大数据量写入Mysql效率问题


背景

数据列不固定,每次全量覆盖数据到Mysql,涉及到数据表结构的变更,需要调整自动创建数据表结构

方案1:DataFrameWriter.jdbc

使用spark原生提供的DataFrameWriter.jdbc,参考代码如下:

/**
   * 数据覆盖写入指定mysql表
   * 批量读写参数设置参考:https://bbs.huaweicloud.com/blogs/210624
   *
   * @param dataFrame   数据集
   * @param tableName   mysql数据表名
   * @param parallelism 插入并行度
   */
  def override2Mysql(dataFrame: DataFrame, tableName: String, parallelism: Int): Unit = {
    val property = new Properties()
    property.put("user", jdbcUsername)
    property.put("password", jdbcPassword)
    property.put("driver", mysqlJdbcDriver)
    dataFrame.repartition(parallelism).write.option("truncate", value = false).mode(SaveMode.Overwrite).jdbc(jdbcUrl, tableName, property)
  }

实验如下,100万数据,并行度设置为10,插入需要26分钟。
疑问?为什么这么慢?难道不是批量插入吗?
查看DataFrameWriter.jdbc,在connectionProperties中可以设置batchsize,默认已经是1000,实验调整batchsize,没什么变化,没搞明白why?

同时,发现DataFrameWriter.jdbc自动删除并创建表存在数据类型映射的问题:spark中数据类型分类没有那么细,String类型映射到Mysql中统一转化为text类型。而text类型创建索引,必须设置前缀前缀长度,不利于索引创建。可通过在option中设置truncate=true解决,手动创建数据吧,每次写入时截断数据表,然后再插入,但不能解决数据插入效率问题。

方案2:自定义批量插入

数据插入前对数据进行分区,每个分区自定义一个数据库连接,批量插入数据,参考代码如下:

private def saveXXX(xxxData: DataFrame, nacosConfig: NacosConfig): Unit = {
      val mysqlUrl = nacosConfig.getValue("user_label_mysql_url")
      val mysqlUser = nacosConfig.getValue("user_label_mysql_username")
      val mysqlPsw = nacosConfig.getValue("user_label_mysql_psw")
      val mysqlParallelism = nacosConfig.getValue("mysql_parallelism").toInt
      val jdbcUtils = JdbcUtils.build(mysqlUrl, mysqlUser, mysqlPsw)
      jdbcUtils.runSql(s"truncate table $labelTable", null)
      val sql=
        s"""
          |insert into xxx () values()
          |""".stripMargin
      xxxData.repartition(mysqlParallelism).foreachPartition(rows => {
        jdbcUtils.insert(sql, rows)
      })
  }
  
  def insert(sql: String, rows: Iterator[Row]): Unit = {
    val conn = getMysqlConn(jdbcUrl, jdbcUsername, jdbcPassword)
    val pstat: PreparedStatement = conn.prepareStatement(sql)
    try {
      var size = 0
      while (rows.hasNext) {
        val row: Row = rows.next()
        val len = row.length
        for (i <- 0 until len) {
          pstat.setObject(i + 1, row.get(i))
        }
        pstat.addBatch()
        size += 1
        if (size % maxBatchSize == 0) {
          pstat.executeBatch()
          lgr.info("=======批量插入数据成功,数量是[{}]=======", size)
          size = 0
        }
      }
      if (size > 0) {
        pstat.executeBatch()
        lgr.info("=======批量插入数据成功,数量是[{}]=======", size)
      }
    } finally {
      pstat.close()
      conn.close()
    }
  }

同样100万数据,划分为10个分区并行插入,每次批量10000条数据,插入耗时2分钟。需要注意的并行度和batchSize不能过大,避免影响数据库正常使用。

但是,上述方案未解决数据表结构变化的问题。可根据dataFrame所属的schema.fields自动生成创建表语句,先执行drop table xxx if exists,然后替换上述语句中truncate未自动生成的创建表语句即可。