Airflowで後続のOperatorに配列を渡す
Apache AirflowにおいてOperator間で値を渡すにはXCOMを使用しますが、 Airflow macroで文字列として取得する PythonOperatorでtask_instanceから取得する の2通りの方法があります。 しかし、例えば GoogleCloudStorageListOperatorでファイルのリストを取得 » GoogleCloudStorageToBigQueryOperator でリストされたファイルをBigQueryにロードする といったことをやりたい場合、XCOMからファイルのリストを配列として取得しコンストラクタに渡さなければならないためすこし工夫が必要になります。 本稿ではその実装について記載します。 NG 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 ... list_files = GoogleCloudStorageListOperator( task_id='list_files', bucket='my_bucket', prefix='path/to/file/', xcom_push=True, dag=dag ) gcs_to_bigquery = GoogleCloudStorageToBigQueryOperator( task_id='gcs_to_bigquery', bucket='my_bucket', source_objects="{{ ti.xcom_pull(task_ids='list_files') }}", destination_project_dataset_table='project:dataset.table', autodetect=True, dag=dag ) list_files >> gcs_to_bigquery ... ファイル名の配列がデシリアライズされた状態で source_objects に渡されてしまうため動作しません。 OK 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 ....