Azkaban入门


  • GitHub:https://github.com/azkaban/azkaban
  • 官方文档:https://azkaban.readthedocs.io/en/latest/
  • 教程:https://www.bilibili.com/video/BV1y54y18713

1 Azkaban概论

Azkaban 是由 Linkedin 开源的一个批量工作流任务调度器。用于在一个工作流内以一个特定的顺序运行一组工作和流程。Azkaban 定义了一种 KV 文件格式来建立任务之间的依赖关系,并提供一个易于使用的 web 用户界面维护和跟踪你的工作流。
它有如下功能特点:
??1)Web 用户界面
??2)方便上传工作流
??3)方便设置任务之间的关系
??4)调度工作流
??5)认证/授权(权限的工作)
??6)能够杀死并重新启动工作流
??7)模块化和可插拔的插件机制
??8)项目工作区
??9)工作流和任务的日志记录和审计

2 Azkaban入门

2.0 编译源文件

tar -zxvf azkaban-4.0.0.tar.gz
cd azkaban-4.0.0
./gradlew build -x test

编译完成后,在azkaban-db -web-server -exec-server -solo-server 内的 /build/distributions 有编译后的压缩文件。

2.1 集群安装模式

# 解压
tar -zxvf azkaban-web-server -C /opt/module/azkabn
tar -zxvf azkaban-exec-server -C /opt/module/azkabn
tar -zxvf azkaban-db -C /opt/module/azkabn
# 重命名
mv azkaban-web/ azkaban-web 
mv azkaban-exec/ azkaban-exec

2.1.1 配置MySQL

# 创建用户
create datebase azkaban;
# 创建azkaban用户
set global validate_password_length=4;
set global validate_password_policy=0;
create user 'azkaban'@'%' identified by 'azkaban';
grant select,insert,update,delete on azkaban.* to 'azkaban'@'%' with grant option;
flush privileges;
# 导入数据
use azkaban;
source /opt/module/azkaban/azkaban-db-0.1.0-SNAPSHOT/create-all-sql-0.1.0-SNAPSHOT.sql;
show tables;
# 配置my.cnf,防止mysql阻塞
vim /etc/my.cnf
[mysqld]
max_allowed_packet=1024M
sudo systemctl restart mysqld

2.1.2 配置Executor Server

cd /opt/module/azkaban/azkaban-exec/
cd conf/
vim azkaban.properties
# azkaban.properties

default.timezone.id=Asia/Shanghai
azkaban.webserver.url=http://hadoop101:8081

database.type=mysql
mysql.port=3306
mysql.host=hadoop101
mysql.database=azkaban
mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100

# 添加executor.port
executor.port=12321
# 同步azkaban-exec到所有节点
xsync /opt/module/azkaban/azkaban-exec
# 启动
cd /opt/module/azkaban/azkaban-exec/
[hadoop101] bin/start-exec.sh
[hadoop102] bin/start-exec.sh
[hadoop103] bin/start-exec.sh
# 如果生成executor.port说明生成成功

# 激活 curl -G "localhost:$(<./executor.port)/executor?action=activate" && echo
[hadoop101]  curl -G "hadoop101:12321/executor?action=activate" && echo
[hadoop102]  curl -G "hadoop102:12321/executor?action=activate" && echo
[hadoop103]  curl -G "hadoop103:12321/executor?action=activate" && echo
# 提示{"status":"success"},表示激活成功

2.1.3 配置Web Server

cd /opt/module/azkaban/azkaban-web/
cd conf/
vim azkaban.properties
# azkaban.properties

default.timezone.id=Asia/Shanghai

database.type=mysql
mysql.port=3306
mysql.host=hadoop101
mysql.database=azkaban
mysql.user=azkaban
mysql.password=azkaban
mysql.numconnections=100

# 虚拟环境删除 MinimumFreeMemory,否则内存不足,正式环境保留
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
# 用户配置 azkaban-users.xml

  
  
  

  
  

# 启动
cd /opt/module/azkaban/azkaban-exec/
bin/start-web.sh

访问 hadoop101:8081

2.2 Workflow 案例实操

2.2.1 HelloWorld案例

创建两个文件,

# first.project

azkaban-flow-version: 2.0
# first.flow

nodes:
  - name: JobA
    type: command
    config:
      command: echo "Hello world"

压缩为.zip。

创建project,上传zip文件。

执行。

YAML语法

  • 大小写敏感
  • 使用缩进表示层级关系
  • 缩进时不允许使用Tab键,只允许使用空格
  • 缩进的空格数不重要,只要相同层级的元素左对齐即可
  • '#'表示注释

2.2.2 作业依赖案例

# demo3.flow

nodes:
  - name: jobA
    type: command
    config:
      command: echo "I`m jobA"
      
  - name: jobB
    type: command
    config:
      command: echo "I`m jobB"
      
  - name: jobC
    type: command
    dependsOn: 
      - jobA
      - jobB
    config:
      command: echo "I`m jobC"

2.2.3 自动失败重试案例

# demo3.flow

nodes:
  - name: jobA
    type: command
    config:
      command: sh /not_exists.sh
      retries: 3
      retry.backoff: 10000

2.2.4 手动失败重试案例

# demo4.flow

nodes:
  - name: jobA
    type: command
    config:
      command: echo "I`m jobA"
      
  - name: jobB
    type: command
    dependsOn: 
      - jobA
    config:
      command: echo "I`m jobB"
      
  - name: jobC
    type: command
    dependsOn: 
      - jobB
    config:
      command: echo "I`m jobC"      

  - name: jobD
    type: command
    dependsOn: 
      - jobC
    config:
      command: echo "I`m jobD"

方式一:

  1. History
  2. 进入flow
  3. 点击 Prepare Execution

方式二:

  1. 重新执行,将成功的任务 disable

3 Azkaban进阶

3.1 JavaProcess作业类型案例

内置任务类型:

  1. command
  2. javaprocess
    • Xms(最小堆)
    • Xmx(最大堆)
    • classpath(类路径)
    • java.class(要运行的Java对象,其中包含Main方法)
    • main.agrs(main方法的参数)
# demoJavaProcess.flow

nodes:
  - name: demoJavaProcess
    type: javaprocess
    config:
      Xms: 96M
      Xmx: 200M
      java.class: com.tinydata.TestJavaProcess

将打包jar文件和flow文件一同打包。

3.2 条件工作流案例

3.2.1 运行时参数案例

  • 基本原理
    • 父job将参数写入JOB_OUTPUT_PROP_FILE环境变量所指向的文件
    • 子job使用${jobName:param}来获取父job输出的参数并定义执行条件
  • 支持的条件运算符
    • ==
    • !=
    • >
    • >=
    • <
    • <=
    • &&
    • ||
    • !
# jobA.sh

#!/bin/bash
echo "do jobA"
wk=`date +%w`
echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE


# jobB.sh

#!/bin/bash
echo "do jobB"
# condition.flow

nodes:
  - name: JobA
    type: command
    config:
      command: sh JobA.sh
  
  - name: JobB
    type: command
    dependsOn:
      - JobA
    config:
      command: sh JobB.sh
    condition: ${jobA:wk} == 1

3.2.2 预定义宏案例

  • 预定义宏:
    • all_success(默认)
    • all_done
    • all_failed
    • one_success
    • one_failed
# JobA.sh
#!/bin/bash
echo "do JobA"


# JobC.sh
#!/bin/bash
echo "do JobC"
# macro.flow

nodes:
  - name: JobA
    type: command
    config:
      command: sh JobA.sh
      
  - name: JobB
    type: command
    config:
      command: sh JobB.sh
      
  - name: JobC
    type: command
    dependsOn:
      - JobA
      - JobB
    config:
      command: sh JobC.sh
    condition: one_success

3.3 定时执行案例

Schedule

3.4 邮件报警案例

3.4.1 注册邮箱

  1. 申请126、qq等免费邮箱
  2. 账号管理,开启SMTP服务

3.4.2 默认邮件报警案例

cd /opt/module/azkaban/azkaban-web/conf
vim azkaban.properties
# mail settings
mail.sender=tinydata@qq.com
mail.host=smtp.qq.com
mail.user=tinydata@qq.com
mail.password= # 第三方授权码

重启 web 服务。

在 Notification 配置成功或失败后发送的邮箱。

3.5 电话报警案例

3.5.1 第三方警告平台集成

睿象云:www.onealert.com

  • 集成:CA平台,集成监控工具,Email通用集成方式(126邮箱可以,qq邮箱会退信)
  • 通知:配置,通知策略

3.5.2 测试

3.6 Azkaban多Executor模式注意事项

[hadoop101]cd /home/tinydata

# test.sh
#!/bin/bash
echo hello

chmod +x test.sh
# test.flow
nodes:
  - name: JobA
    type: command
    config:
      command: sh /home/tinydata/test.sh
  • 方案一:指定特定的Executor(hadop101)去执行任务。
    • 在mysql中azkaban数据库executors表中,查询hadoop101上的Executor的id
    • 在执行工作流时Flow Parameters加入useExecutor属性
  • 方案二:在Executor所在所有的节点部署任务所需脚本和应用。

实际生产环境中,推荐方案二

4 参考资料

4.1 Azkaban完整配置

4.2 YAML语法

https://www.runoob.com/w3cnote/yaml-intro.html