[Flink] 从 RabbitMQ 读取并计算后输出到 MySQL
实现的需求是从 RabbitMQ 读取 JSON 格式的消息,处理结果输出到 MySQL。
主要参考了 这篇博客 和 Apache Flink 中文文档 。
编程语言: Scala 2.12.10
构建工具: sbt 1.3.0
IDE:IntelliJ IDEA Community 2019.1
开发环境的搭建可以参考 这篇博客 => 通过 IntelliJ IDEA 打包 Flink Scala 项目 。
这篇博客。
这篇博客 。
package octopus.ba
import java.sql.Connection
import com.alibaba.druid.pool.DruidDataSource
import octopus.ba.config.MySqlConfig
/**
* MySql 连接工厂
*/
object MySqlConnection {
private var druidDataSource = new DruidDataSource
def getConnection() = {
druidDataSource.setDriverClassName("com.mysql.jdbc.Driver")
// 例:"jdbc:mysql://192.168.0.1:3306/mydbname?serverTimezone=UTC"
druidDataSource.setUrl(MySqlConfig.url)
druidDataSource.setUsername(MySqlConfig.username)
druidDataSource.setPassword(MySqlConfig.password)
// 设置连接池的一些参数
// 1.数据库连接池初始化的连接个数
druidDataSource.setInitialSize(50)
// 2.指定最大的连接数,同一时刻可以同时向数据库申请的连接数
druidDataSource.setMaxActive(200)
// 3.指定小连接数:在数据库连接池空闲状态下,连接池中保存的最少的空闲连接数
druidDataSource.setMinIdle(30)
var con:Connection = null
try {
con = druidDataSource.getConnection
System.out.println("创建连接池:" + con)
} catch {
case e: Exception =>
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage)
}
con
}
}
这篇博客
这篇博客
CC 4.0 BY-NC-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://www.liujiajia.me/2019/9/18/flink-input-from-rabbitmq-and-output-to-mysql
package octopus.ba
import java.sql.Connection
import com.alibaba.druid.pool.DruidDataSource
import octopus.ba.config.MySqlConfig
/**
* MySql 连接工厂
*/
object MySqlConnection {
private var druidDataSource = new DruidDataSource
def getConnection() = {
druidDataSource.setDriverClassName("com.mysql.jdbc.Driver")
// 例:"jdbc:mysql://192.168.0.1:3306/mydbname?serverTimezone=UTC"
druidDataSource.setUrl(MySqlConfig.url)
druidDataSource.setUsername(MySqlConfig.username)
druidDataSource.setPassword(MySqlConfig.password)
// 设置连接池的一些参数
// 1.数据库连接池初始化的连接个数
druidDataSource.setInitialSize(50)
// 2.指定最大的连接数,同一时刻可以同时向数据库申请的连接数
druidDataSource.setMaxActive(200)
// 3.指定小连接数:在数据库连接池空闲状态下,连接池中保存的最少的空闲连接数
druidDataSource.setMinIdle(30)
var con:Connection = null
try {
con = druidDataSource.getConnection
System.out.println("创建连接池:" + con)
} catch {
case e: Exception =>
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage)
}
con
}
}