airflow
https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html
--------------------------------------------------------------------------------------------------------
| install: | |
| export AIRFLOW_HOME=~/airflow | |
| # install from pypi using pip | |
| pip install apache-airflow | |
| # initialize the database | |
| airflow initdb(这一步会报错,往往安装的时候报错,无非是版本不匹配,权限不对,文件位置不对,配置不对,依赖不对,全都是适配性的东西,用docker都可以解决) | |
| GitHub issue里有讨论解决:https://github.com/apache/airflow/issues/11965 | |
| 因为: | |
| this was caused by a Python dependency; specifically cattrs==1.1.0, which was released yesterday (2020-10-29). Downgrading cattrs manually to 1.0.0 | |
| does fix the issue and allows the Airflow database to be initialised: | |
| # start the web server, default port is 8080 | |
| airflow webserver -p 8080 | |
| # start the scheduler | |
| airflow scheduler | |
| # visit localhost:8080 in the browser and enable the example dag in the home page | |
| 现在通过localhost:8080就可以访问airflow web 了。 | |
| 之后写一个测试调度:kk.py | |
| from datetime import timedelta | |
| # The DAG object; we'll need this to instantiate a DAG | |
| from airflow import DAG | |
| # Operators; we need this to operate! | |
| from airflow.operators.bash_operator import BashOperator | |
| from airflow.utils.dates import days_ago | |
| # These args will get passed on to each operator | |
| # You can override them on a per-task basis during operator initialization | |
| default_args = { | |
| 'owner': 'airflow', | |
| 'depends_on_past': False, | |
| 'start_date': days_ago(2), | |
| 'email': ['airflow@example.com'], | |
| 'email_on_failure': False, | |
| 'email_on_retry': False, | |
| 'retries': 1, | |
| 'retry_delay': timedelta(minutes=5), | |
| # 'queue': 'bash_queue', | |
| # 'pool': 'backfill', | |
| # 'priority_weight': 10, | |
| # 'end_date': datetime(2016, 1, 1), | |
| # 'wait_for_downstream': False, | |
| # 'dag': dag, | |
| # 'sla': timedelta(hours=2), | |
| # 'execution_timeout': timedelta(seconds=300), | |
| # 'on_failure_callback': some_function, | |
| # 'on_success_callback': some_other_function, | |
| # 'on_retry_callback': another_function, | |
| # 'sla_miss_callback': yet_another_function, | |
| # 'trigger_rule': 'all_success' | |
| } | |
| dag = DAG( | |
| 'tutorial', | |
| default_args=default_args, | |
| description='A simple tutorial DAG', | |
| schedule_interval=timedelta(days=1), | |
| ) | |
| # t1, t2 and t3 are examples of tasks created by instantiating operators | |
| t1 = BashOperator( | |
| task_id='print_date', | |
| bash_command='date', | |
| dag=dag, | |
| ) | |
| t2 = BashOperator( | |
| task_id='sleep', | |
| depends_on_past=False, | |
| bash_command='sleep 5', | |
| retries=3, | |
| dag=dag, | |
| ) | |
| dag.doc_md = __doc__ | |
| t1.doc_md = """\ | |
| #### Task Documentation | |
| You can document your task using the attributes `doc_md` (markdown), | |
| `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets | |
| rendered in the UI's Task Instance Details page. | |
|  | |
| """ | |
| templated_command = """ | |
| {% for i in range(5) %} | |
| echo "{{ ds }}" | |
| echo "{{ macros.ds_add(ds, 7)}}" | |
| echo "{{ params.my_param }}" | |
| {% endfor %} | |
| """ | |
| t3 = BashOperator( | |
| task_id='templated', | |
| depends_on_past=False, | |
| bash_command=templated_command, | |
| params={'my_param': 'Parameter I passed in'}, | |
| dag=dag, | |
| ) | |
| t1 >> [t2, t3] | |
| 然后放入 dag目录,默认 airflow/dags 目录。 | |
| 然后执行 python3 kk.py 执行后,无异常输出就没问题,然后要等 ,等一会才能在web里刷新到调度 | |
| 触发调度: | |
| Each DAG may or may not have a schedule, which informs how DAG Runs are created. schedule_interval is defined as a DAG arguments, | |
| and receives preferably a cron expression as a str, | |
| or a datetime.timedelta object. Alternatively, you can also use one of these cron “preset”: | |
| -------------------------------------------------------------- | |
| operator->task->dag | |
| DAG: The work (tasks), and the order in which work should take place (dependencies), written in Python. | |
| DAG Run: An instance of a DAG for a particular logical date and time. | |
| Operator: A class that acts as a template for carrying out some work. | |
| Task: Defines work by implementing an operator, written in Python. | |
| Task Instance: An instance of a task - that has been assigned to a DAG and has a state associated with a specific DAG run (i.e for a specific execution_date). | |
| execution_date: The logical date and time for a DAG Run and its Task Instances. | |
| DAG Runs | |
| A DAG run is a physical instance of a DAG, containing task instances that run for a specific execution_date. | |
| tasks: | |
| Each task is an implementation of an Operator, for example a PythonOperator to execute some Python code, | |
| or a BashOperator to run a Bash command. | |
| task_1 >> task_2 # Define dependencies | |
| We can say that task_1 is upstream of task_2, and conversely task_2 is downstream of task_1. | |
| When a DAG Run is created, task_1 will start running and task_2 waits for task_1 to complete successfully before it may start. | |
| task state | |
| Task instances also have an indicative state, which could be “running”, “success”, “failed”, “skipped”, “up for retry”, etc. | |
| Airflow does have a feature for operator cross-communication called XCom that is described in the section XComs | |
| Airflow provides operators for many common tasks, including: | |
| BashOperator - executes a bash command | |
| PythonOperator - calls an arbitrary Python function | |
| EmailOperator - sends an email | |
| SimpleHttpOperator - sends an HTTP request | |
| MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. - executes a SQL command | |
| Sensor - an Operator that waits (polls) for a certain time, file, database row, S3 key, etc… | |
| In addition to these basic building blocks, there are many more specific operators: DockerOperator, HiveOperator, S3FileTransformOperator, PrestoToMySqlTransfer, SlackAPIOperator… you get the idea! | |
| Operators are only loaded by Airflow if they are assigned to a DAG. | |
| ----------------------------------------------------------------------------------------- | |
| action: | |
| 查看依赖: | |
| # print the list of active DAGs | |
| airflow list_dags | |
| # prints the list of tasks the "tutorial" dag_id | |
| airflow list_tasks tutorial | |
| # prints the hierarchy of tasks in the tutorial DAG | |
| airflow list_tasks tutorial --tree | |
| 测试: | |
| execution_date | |
| # command layout: command subcommand dag_id task_id date | |
| airflow test tutorial print_date 2015-06-01 | |
| note: airflow test,It simply allows testing a single task instance. | |
| depends_on_past = true | |
| 是否满足 task 的依赖性。 直接位于 task 上游的任务实例需要处于success状态。 此外,如果设置depends_on_past=True ,除了满足上游成功之外, | |
| 前一个调度周期的 task instance 也需要成功(除非该 task 是第一次运行) | |
| You may also want to consider wait_for_downstream=True when using depends_on_past=True. While depends_on_past=True causes a task instance to depend on | |
| the success of its previous task_instance, wait_for_downstream=True will cause a task instance to also wait for all task instances immediately downstream | |
| of the previous task instance to succeed. | |
| 运行: | |
| backfill: | |
| The operation of running a DAG for a specified date in the past is called “backfilling.” | |
| 》》》airflow backfill -s 2019-01-01 -e 2019-01-04 test_dag | |
| In this case, I am running the test_dag DAG with the execution date set to 2019-01-01, 2019-01-02, and 2019-01-03. | |
| 提交生成一个dag:就是执行创建dag的脚本 | |
| python ~/airflow/dags/somedagcode.py | |
| ------------------------------------------------------------------------------------------ | |
| airflow docker | |
| https://github.com/puckel/docker-airflow |