Airflowで後続のOperatorに配列を渡す

Apache AirflowにおいてOperator間で値を渡すにはXCOMを使用しますが、 Airflow macroで文字列として取得する PythonOperatorでtask_instanceから取得する の2通りの方法があります。 しかし、例えば GoogleCloudStorageListOperatorでファイルのリストを取得 » GoogleCloudStorageToBigQueryOperator でリストされたファイルをBigQueryにロードする といったことをやりたい場合、XCOMからファイルのリストを配列として取得しコンストラクタに渡さなければならないためすこし工夫が必要になります。 本稿ではその実装について記載します。 NG 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 ... list_files = GoogleCloudStorageListOperator( task_id='list_files', bucket='my_bucket', prefix='path/to/file/', xcom_push=True, dag=dag ) gcs_to_bigquery = GoogleCloudStorageToBigQueryOperator( task_id='gcs_to_bigquery', bucket='my_bucket', source_objects="{{ ti.xcom_pull(task_ids='list_files') }}", destination_project_dataset_table='project:dataset.table', autodetect=True, dag=dag ) list_files >> gcs_to_bigquery ... ファイル名の配列がデシリアライズされた状態で source_objects に渡されてしまうため動作しません。 OK 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 ....

May 20, 2021

terraform非対応リソースをlocal-execで管理する

terraformに対応していないクラウドリソースを local-exec を用いてterraform化してみます。 今回はBigQueryのユーザー定義関数(UDF)でやってみます。 実装 さて早速。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 variable project{} resource "null_resource" "bigquery-udf" { <- #1 triggers = { query = "CREATE OR REPLACE FUNCTION my_dataset.TEST_FUNCTION(x INT64) AS (x + 1);" <- #2 } provisioner "local-exec" { <- #3 interpreter = ["bq", "query", "--use_legacy_sql=false", "--project_id=${var.project}"] <- #4 command = self.triggers.query on_failure = fail <- #5 } provisioner "local-exec" { when = destroy <- #6 interpreter = ["bq", "query", "--use_legacy_sql=false", "--project_id=${var....

September 14, 2020