[关闭]
@JunQiu 2018-09-18T10:13:20.000000Z 字数 1431 阅读 4278

airflow(Xcom)

tools summary_2018/09


1、日常

1.1、调研airflow Task Instance 之间传递数据的方式


2、技术

2.1、Xcom:官方推荐的一种方式

  1. 研究官方的Xcom方案:
  2. // 从一个 operator 中放入 Xcom 中变量可以使用 xcom_push() method、或者函数返回值、Operator’s execute() method
  3. # Tasks can push XComs at any time by calling the xcom_push() method.
  4. # In addition, if a task returns a value (either from its Operator’s execute() method,
  5. # or from a PythonOperator’s python_callable function), then an XCom containing that value is automatically pushed.
  6. // 取回Xcom中的变量可以根据 key, source task_ids, and source dag_id.
  7. # Tasks call xcom_pull() to retrieve XComs, optionally applying filters based
  8. # on criteria like key, source task_ids, and source dag_id.
  9. # By default, xcom_pull() filters for the keys that are automatically given to XComs
  10. # when they are pushed by being returned from execute functions (as opposed to XComs that are pushed manually).
  11. 在一个DAG中:key value方式,value支持py的数据类型,应该是存储在数据库中:

  1. from airflow.models import DAG
  2. from airflow.operators.python_operator import PythonOperator
  3. from datetime import datetime
  4. DAG = DAG(
  5. dag_id='xcom_test',
  6. start_date=datetime.now(),
  7. schedule_interval='@once'
  8. )
  9. ls = ['a', 'b', 'c']
  10. def push_function(**kwargs):
  11. return ls
  12. push_task = PythonOperator(
  13. task_id='push_task',
  14. python_callable=push_function,
  15. dag=DAG)
  16. def pull_function(**kwargs):
  17. # 感觉函数返回值上使用xcom可能是多余的,必须使用['any']key
  18. value = kwargs['ti'].xcom_pull(task_ids='push_task')
  19. return value
  20. pull_task = PythonOperator(
  21. task_id='pull_task',
  22. python_callable=pull_function,
  23. provide_context=True,
  24. dag=DAG)
  25. push_task >> pull_task
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注