[关闭]
@JunQiu 2018-09-18T10:15:37.000000Z 字数 3756 阅读 1430

airflow(desc、use)

summary_2018/09 tools


1、日常

1.1、airflow的简介和使用


2、技术

2.1、airflow的简介和使用

2.1.1、airflow的简介
2.1.2、服务构成和工作流程
  1. // WebServer
  2. Airflow 提供了一个可视化的 Web 界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对一些变量进行配置。
  3. // Worker
  4. 一般来说我们用 Celery Worker 来执行具体的作业。Worker 可以部署在多台机器上,并可以分别设置接收的队列。当接收的队列中有作业任务时,Worker 就会接收这个作业任务,并开始执行。Airflow 会自动在每个部署 Worker 的机器上同时部署一个 Serve Logs 服务,这样我们就可以在 Web 界面上方便的浏览分散在不同机器上的作业日志了。
  5. // Scheduler
  6. 整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收。
  7. // Flower
  8. Flower 提供了一个可视化界面以监控所有 Celery Worker 的运行状况。这个服务并不是必要的。
  1. 1airflow启动时,会将DAG中的相关信息写入数据库。
  2. 2scheduler会按照指定频次查询数据库,检测是否有需要触发的任务。
  3. 3、当scheduler检测到需要触发的任务时,会向消息队列发送一条Message
  4. 4Celery会定时查询消息队列中是否有Message。当检测到Message时,会将Message中包含的任务信息下发给Worker,由Worker执行具体任务
  5. // Celery:分布式消息队列
2.1.3、安装和实现DAG任务
  1. # Importing Modules
  2. from airflow import DAG
  3. from airflow.operators.bash_operator import BashOperator
  4. from datetime import datetime, timedelta
  5. # some arguments for task
  6. # start_date DAG == schedule_interval is best way
  7. default_args = {
  8. 'owner': 'airflow',
  9. 'depends_on_past': False,
  10. 'start_date': datetime.now(),
  11. 'email': ['airflow@example.com'],
  12. 'email_on_failure': False,
  13. 'email_on_retry': False,
  14. 'retries': 1,
  15. 'retry_delay': timedelta(minutes=5),
  16. # 'queue': 'bash_queue',
  17. # 'pool': 'backfill',
  18. # 'priority_weight': 10,
  19. 'schedule_interval': timedelta(days=1),
  20. 'end_date': datetime(2018, 9, 4),
  21. }
  22. dag = DAG('qj_airflow_test', default_args=default_args)
  23. # t1, t2 and t3 are examples of tasks created by instantiating operators
  24. t1 = BashOperator(
  25. task_id='print_date',
  26. bash_command='date',
  27. dag=dag)
  28. t2 = BashOperator(
  29. task_id='sleep',
  30. bash_command='sleep 5',
  31. retries=3,
  32. dag=dag)
  33. templated_command = """
  34. {% for i in range(5) %}
  35. echo "{{ ds }}"
  36. echo "{{ macros.ds_add(ds, 7)}}"
  37. echo "{{ params.my_param }}"
  38. {% endfor %}
  39. """
  40. t3 = BashOperator(
  41. task_id='templated',
  42. bash_command=templated_command,
  43. params={'my_param': 'Parameter I passed in'},
  44. dag=dag)
  45. t4 = BashOperator(
  46. task_id='print_date1',
  47. bash_command='date',
  48. dag=dag)
  49. # Setting up Dependencies
  50. t2.set_upstream(t1)
  51. t3.set_upstream(t1)
  52. t4.set_upstream([t2, t3])
  53. // 一些字段介绍
  54. ## DAG
  55. 要配置一个 DAG 自然需要一个 DAG 实例。在同一个 DAG 下的所有作业,都需要将它的 dag 属性设置为这个 DAG 实例。在实例化 DAG 时,通过传参数可以给这个 DAG 实例做一些必要的配置。
  56. # dag_id
  57. DAG 取一个名字,方便日后维护。
  58. # default_args
  59. 默认参数,当属于这个DAG实例的作业没有配置相应参数时,将使用 DAG 实例的 default_args 中的相应参数。
  60. # schedule_interval
  61. 配置 DAG 的执行周期,语法和 crontab 的一致。
  62. ## Task
  63. Airflow 提供了很多 Operator BashOperator 则会执行 bash_command 参数所指定的 bash 指令,PythonOperator执行 Python callables,或者GoogleCloudStorageToBigQueryOperatorkubernetes_pod_operator(可以使用docker imageTipsstartup_timeout_seconds (int) timeout in seconds to startup the pod(默认120))。
  64. Tips:如果在 DAG 中有设置 default_args 而在 Operator 中没有覆盖相应配置,则会使用 default_args 中的配置。
  65. # dag
  66. 传递一个 DAG 实例,以使当前作业属于相应 DAG
  67. # task_id
  68. 给作业去一个名字,方便日后维护。
  69. # owner
  70. 作业的拥有者,方便作业维护。另外有些 Operator 会根据该参数实现相应的权限控制。
  71. # start_date
  72. 作业的开始时间,即作业将在这个时间点以后开始调度。
  73. ## 依赖的配置
  74. 除了可以使用作业实例的 set_upstream set_downstream 方法外,还可以使用类似:
  75. task1 << task2 << task3
  76. task3 >> task4
  77. ## 区分几个时间
  78. start date: 在配置中,它是作业开始调度时间。而在谈论执行状况时,它是调度开始时间。
  79. schedule interval: 调度执行周期。
  80. execution date: 执行时间,在 Airflow 中称之为执行时间,但其实它并不是真实的执行时间。
  81. # 举个例子:
  82. 假设我们配置了一个作业的 start date 2017101日,配置的 schedule interval **00 12 * * *** 那么第一次执行的时间将是 2017102 12 而此时记录的 execution date 2017101 12点。因此 execution date 并不是如其字面说的表示执行时间,真正的执行时间是 execution date 所显示的时间的下一个满足 schedule interval 的时间点。
2.1.4、总结
2.1.5、参考文档
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注