Apache AirflowにおいてOperator間で値を渡すにはXCOMを使用しますが、
- Airflow macroで文字列として取得する
- PythonOperatorでtask_instanceから取得する
の2通りの方法があります。
しかし、例えば
GoogleCloudStorageListOperator
でファイルのリストを取得 » GoogleCloudStorageToBigQueryOperator
でリストされたファイルをBigQueryにロードする
といったことをやりたい場合、XCOMからファイルのリストを配列として取得しコンストラクタに渡さなければならないためすこし工夫が必要になります。
本稿ではその実装について記載します。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| ...
list_files = GoogleCloudStorageListOperator(
task_id='list_files',
bucket='my_bucket',
prefix='path/to/file/',
xcom_push=True,
dag=dag
)
gcs_to_bigquery = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bigquery',
bucket='my_bucket',
source_objects="{{ ti.xcom_pull(task_ids='list_files') }}",
destination_project_dataset_table='project:dataset.table',
autodetect=True,
dag=dag
)
list_files >> gcs_to_bigquery
...
|
ファイル名の配列がデシリアライズされた状態で source_objects
に渡されてしまうため動作しません。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| ...
class CustomGcsToBigQueryOperator(GoogleCloudStorageToBigQueryOperator):
def __init__(self, *args, **kwargs):
super().__init__(
source_objects=[],
*args,
**kwargs
)
self.source_objects_task_id = kwargs['source_objects_task_id']
def execute(self, context):
self.source_objects=context['ti'].xcom_pull(task_ids=self.source_objects_task_id)
super().execute(context)
list_files = GoogleCloudStorageListOperator(
task_id='list_files',
bucket='my_bucket',
prefix='path/to/file/',
xcom_push=True,
dag=dag
)
gcs_to_bigquery = CustomGcsToBigQueryOperator(
task_id='gcs_to_bigquery',
bucket='my_bucket',
source_objects_task_id='list_files',
destination_project_dataset_table='project:dataset.table',
autodetect=True,
dag=dag
)
list_files >> gcs_to_bigquery
...
|
GoogleCloudStorageToBigQueryOperator
を継承したカスタムオペレーターを実装し、execute
で xcom_pull
すればOKです。