[Flink] 从 RabbitMQ 读取并计算后输出到 MySQL


实现的需求是从 RabbitMQ 读取 JSON 格式的消息,处理结果输出到 MySQL
主要参考了 这篇博客 和 Apache Flink 中文文档 。

编程语言: Scala 2.12.10
构建工具: sbt 1.3.0
IDE:IntelliJ IDEA Community 2019.1

开发环境的搭建可以参考 这篇博客 => 通过 IntelliJ IDEA 打包 Flink Scala 项目 。

这篇博客。

这篇博客 。

package octopus.ba

import java.sql.Connection

import com.alibaba.druid.pool.DruidDataSource
import octopus.ba.config.MySqlConfig

/**
  * MySql 连接工厂
  */
object MySqlConnection {
  private var druidDataSource = new DruidDataSource

  def getConnection() = {
    druidDataSource.setDriverClassName("com.mysql.jdbc.Driver")
    // 例:"jdbc:mysql://192.168.0.1:3306/mydbname?serverTimezone=UTC"
    druidDataSource.setUrl(MySqlConfig.url)
    druidDataSource.setUsername(MySqlConfig.username)
    druidDataSource.setPassword(MySqlConfig.password)
    // 设置连接池的一些参数
    // 1.数据库连接池初始化的连接个数
    druidDataSource.setInitialSize(50)
    // 2.指定最大的连接数,同一时刻可以同时向数据库申请的连接数
    druidDataSource.setMaxActive(200)
    // 3.指定小连接数:在数据库连接池空闲状态下,连接池中保存的最少的空闲连接数
    druidDataSource.setMinIdle(30)
    var con:Connection = null
    try {
      con = druidDataSource.getConnection
      System.out.println("创建连接池:" + con)
    } catch {
      case e: Exception =>
        System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage)
    }
    con
  }
}

这篇博客

这篇博客