Apache AirflowにおいてOperator間で値を渡すにはXCOMを使用しますが、

  • Airflow macroで文字列として取得する
  • PythonOperatorでtask_instanceから取得する

の2通りの方法があります。

しかし、例えば

GoogleCloudStorageListOperatorでファイルのリストを取得 » GoogleCloudStorageToBigQueryOperator でリストされたファイルをBigQueryにロードする

といったことをやりたい場合、XCOMからファイルのリストを配列として取得しコンストラクタに渡さなければならないためすこし工夫が必要になります。 本稿ではその実装について記載します。

NG

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
...

list_files = GoogleCloudStorageListOperator(
    task_id='list_files',
    bucket='my_bucket',
    prefix='path/to/file/', 
    xcom_push=True,
    dag=dag
)

gcs_to_bigquery = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bigquery',
    bucket='my_bucket',
    source_objects="{{ ti.xcom_pull(task_ids='list_files') }}",
    destination_project_dataset_table='project:dataset.table',
    autodetect=True,
    dag=dag
)

list_files >> gcs_to_bigquery

...

ファイル名の配列がデシリアライズされた状態で source_objects に渡されてしまうため動作しません。

OK

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
...

class CustomGcsToBigQueryOperator(GoogleCloudStorageToBigQueryOperator):
    def __init__(self, *args, **kwargs):
        super().__init__(
            source_objects=[],
            *args,
            **kwargs
            )
        self.source_objects_task_id = kwargs['source_objects_task_id']

    def execute(self, context):
        self.source_objects=context['ti'].xcom_pull(task_ids=self.source_objects_task_id)
        super().execute(context)

list_files = GoogleCloudStorageListOperator(
    task_id='list_files',
    bucket='my_bucket',
    prefix='path/to/file/', 
    xcom_push=True,
    dag=dag
)

gcs_to_bigquery = CustomGcsToBigQueryOperator(
    task_id='gcs_to_bigquery',
    bucket='my_bucket',
    source_objects_task_id='list_files',
    destination_project_dataset_table='project:dataset.table',
    autodetect=True,
    dag=dag
)

list_files >> gcs_to_bigquery

...

GoogleCloudStorageToBigQueryOperator を継承したカスタムオペレーターを実装し、executexcom_pull すればOKです。