Azkaban入门
- GitHub:https://github.com/azkaban/azkaban
- 官方文档:https://azkaban.readthedocs.io/en/latest/
- 教程:https://www.bilibili.com/video/BV1y54y18713
1 Azkaban概论
Azkaban 是由 Linkedin 开源的一个批量工作流任务调度器。用于在一个工作流内以一个特定的顺序运行一组工作和流程。Azkaban 定义了一种 KV 文件格式来建立任务之间的依赖关系,并提供一个易于使用的 web 用户界面维护和跟踪你的工作流。
它有如下功能特点:
??1)Web 用户界面
??2)方便上传工作流
??3)方便设置任务之间的关系
??4)调度工作流
??5)认证/授权(权限的工作)
??6)能够杀死并重新启动工作流
??7)模块化和可插拔的插件机制
??8)项目工作区
??9)工作流和任务的日志记录和审计
2 Azkaban入门
2.0 编译源文件
tar -zxvf azkaban-4.0.0.tar.gz
cd azkaban-4.0.0
./gradlew build -x test
编译完成后,在azkaban-db -web-server -exec-server -solo-server 内的 /build/distributions 有编译后的压缩文件。
2.1 集群安装模式
# 解压
tar -zxvf azkaban-web-server -C /opt/module/azkabn
tar -zxvf azkaban-exec-server -C /opt/module/azkabn
tar -zxvf azkaban-db -C /opt/module/azkabn
# 重命名
mv azkaban-web/ azkaban-web
mv azkaban-exec/ azkaban-exec
2.1.1 配置MySQL
# 创建用户
create datebase azkaban;
# 创建azkaban用户
set global validate_password_length=4;
set global validate_password_policy=0;
create user 'azkaban'@'%' identified by 'azkaban';
grant select,insert,update,delete on azkaban.* to 'azkaban'@'%' with grant option;
flush privileges;
# 导入数据
use azkaban;
source /opt/module/azkaban/azkaban-db-0.1.0-SNAPSHOT/create-all-sql-0.1.0-SNAPSHOT.sql;
show tables;
# 配置my.cnf,防止mysql阻塞
vim /etc/my.cnf
[mysqld]
max_allowed_packet=1024M
sudo systemctl restart mysqld
2.1.2 配置Executor Server
cd /opt/module/azkaban/azkaban-exec/
cd conf/
vim azkaban.properties
# azkaban.properties
default.timezone.id=Asia/Shanghai
azkaban.webserver.url=http://hadoop101:8081
database.type=mysql
mysql.port=3306
mysql.host=hadoop101
mysql.database=azkaban
mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100
# 添加executor.port
executor.port=12321
# 同步azkaban-exec到所有节点
xsync /opt/module/azkaban/azkaban-exec
# 启动
cd /opt/module/azkaban/azkaban-exec/
[hadoop101] bin/start-exec.sh
[hadoop102] bin/start-exec.sh
[hadoop103] bin/start-exec.sh
# 如果生成executor.port说明生成成功
# 激活 curl -G "localhost:$(<./executor.port)/executor?action=activate" && echo
[hadoop101] curl -G "hadoop101:12321/executor?action=activate" && echo
[hadoop102] curl -G "hadoop102:12321/executor?action=activate" && echo
[hadoop103] curl -G "hadoop103:12321/executor?action=activate" && echo
# 提示{"status":"success"},表示激活成功
2.1.3 配置Web Server
cd /opt/module/azkaban/azkaban-web/
cd conf/
vim azkaban.properties
# azkaban.properties
default.timezone.id=Asia/Shanghai
database.type=mysql
mysql.port=3306
mysql.host=hadoop101
mysql.database=azkaban
mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100
# 虚拟环境删除 MinimumFreeMemory,否则内存不足,正式环境保留
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
# 用户配置 azkaban-users.xml
# 启动
cd /opt/module/azkaban/azkaban-exec/
bin/start-web.sh
访问 hadoop101:8081
2.2 Workflow 案例实操
2.2.1 HelloWorld案例
创建两个文件,
# first.project
azkaban-flow-version: 2.0
# first.flow
nodes:
- name: JobA
type: command
config:
command: echo "Hello world"
压缩为.zip。
创建project,上传zip文件。
执行。
YAML语法
- 大小写敏感
- 使用缩进表示层级关系
- 缩进时不允许使用Tab键,只允许使用空格
- 缩进的空格数不重要,只要相同层级的元素左对齐即可
- '#'表示注释
2.2.2 作业依赖案例
# demo3.flow
nodes:
- name: jobA
type: command
config:
command: echo "I`m jobA"
- name: jobB
type: command
config:
command: echo "I`m jobB"
- name: jobC
type: command
dependsOn:
- jobA
- jobB
config:
command: echo "I`m jobC"
2.2.3 自动失败重试案例
# demo3.flow
nodes:
- name: jobA
type: command
config:
command: sh /not_exists.sh
retries: 3
retry.backoff: 10000
2.2.4 手动失败重试案例
# demo4.flow
nodes:
- name: jobA
type: command
config:
command: echo "I`m jobA"
- name: jobB
type: command
dependsOn:
- jobA
config:
command: echo "I`m jobB"
- name: jobC
type: command
dependsOn:
- jobB
config:
command: echo "I`m jobC"
- name: jobD
type: command
dependsOn:
- jobC
config:
command: echo "I`m jobD"
方式一:
- History
- 进入flow
- 点击 Prepare Execution
方式二:
- 重新执行,将成功的任务 disable
3 Azkaban进阶
3.1 JavaProcess作业类型案例
内置任务类型:
- command
- javaprocess
- Xms(最小堆)
- Xmx(最大堆)
- classpath(类路径)
- java.class(要运行的Java对象,其中包含Main方法)
- main.agrs(main方法的参数)
# demoJavaProcess.flow
nodes:
- name: demoJavaProcess
type: javaprocess
config:
Xms: 96M
Xmx: 200M
java.class: com.tinydata.TestJavaProcess
将打包jar文件和flow文件一同打包。
3.2 条件工作流案例
3.2.1 运行时参数案例
- 基本原理
- 父job将参数写入JOB_OUTPUT_PROP_FILE环境变量所指向的文件
- 子job使用
${jobName:param}
来获取父job输出的参数并定义执行条件
- 支持的条件运算符
- ==
- !=
- >
- >=
- <
- <=
- &&
- ||
- !
# jobA.sh
#!/bin/bash
echo "do jobA"
wk=`date +%w`
echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
# jobB.sh
#!/bin/bash
echo "do jobB"
# condition.flow
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
dependsOn:
- JobA
config:
command: sh JobB.sh
condition: ${jobA:wk} == 1
3.2.2 预定义宏案例
- 预定义宏:
- all_success(默认)
- all_done
- all_failed
- one_success
- one_failed
# JobA.sh
#!/bin/bash
echo "do JobA"
# JobC.sh
#!/bin/bash
echo "do JobC"
# macro.flow
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
config:
command: sh JobB.sh
- name: JobC
type: command
dependsOn:
- JobA
- JobB
config:
command: sh JobC.sh
condition: one_success
3.3 定时执行案例
Schedule
3.4 邮件报警案例
3.4.1 注册邮箱
- 申请126、qq等免费邮箱
- 账号管理,开启SMTP服务
3.4.2 默认邮件报警案例
cd /opt/module/azkaban/azkaban-web/conf
vim azkaban.properties
# mail settings
mail.sender=tinydata@qq.com
mail.host=smtp.qq.com
mail.user=tinydata@qq.com
mail.password= # 第三方授权码
重启 web 服务。
在 Notification 配置成功或失败后发送的邮箱。
3.5 电话报警案例
3.5.1 第三方警告平台集成
睿象云:www.onealert.com
- 集成:CA平台,集成监控工具,Email通用集成方式(126邮箱可以,qq邮箱会退信)
- 通知:配置,通知策略
3.5.2 测试
3.6 Azkaban多Executor模式注意事项
[hadoop101]cd /home/tinydata
# test.sh
#!/bin/bash
echo hello
chmod +x test.sh
# test.flow
nodes:
- name: JobA
type: command
config:
command: sh /home/tinydata/test.sh
- 方案一:指定特定的Executor(hadop101)去执行任务。
- 在mysql中azkaban数据库executors表中,查询hadoop101上的Executor的id
- 在执行工作流时Flow Parameters加入useExecutor属性
- 方案二:在Executor所在所有的节点部署任务所需脚本和应用。
实际生产环境中,推荐方案二
4 参考资料
4.1 Azkaban完整配置
4.2 YAML语法
https://www.runoob.com/w3cnote/yaml-intro.html