S3にアップロードされたファイルをfluentdでBigQueryにinsertする際、S3キー名に応じてテーブルを振り分けるサンプルを掲載します。 ここではフォーマットはs3://my-bucket/{BigQueryデータセット名}/{テーブル名}/{uuid}.csv.gz とします。

fluent.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<source>
  tag s3
  @type s3
  s3_bucket my-bucket
  s3_region ap-northeast-1
  <sqs>
    queue_name my-queue
  </sqs>
</source>
<match s3>
  @type rewrite_tag_filter
  <rule>
    key s3_key
    pattern ^(.+?)/(.+?)/.+\.gz$
    tag bigquery.$1.$2
  </rule>
</match>
<filter bigquery.hoge.fuga>
    @type parser
    key_name message
    <parse>
        @type csv
        keys id,family_name,first_name
        null_empty_string true
    </parse>
</filter>
<filter bigquery.foo.bar>
    @type parser
    key_name message
    <parse>
        @type csv
        keys column_1,column_2
        null_empty_string true
    </parse>
</filter>
<match bigquery.*>
  @type bigquery_insert
  project my-project
  dataset ${tag[1]}
  table ${tag[2]}
  auth_method application_default
  fetch_schema true
  <buffer tag>
    @type memory
  </buffer>
</match>

解説

source

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
<source>
  tag s3
  @type s3
  s3_bucket my-bucket
  s3_region ap-northeast-1
  add_object_metadata true
  <sqs>
    queue_name my-queue
  </sqs>
</source>

以下のファイルをアップロードしたとします。

1
1, tanaka, taro

add_object_metadata オプションにより、以下の形にtransformされます。

1
tag:s3 {"message":"1,tanaka,taro", "s3_bucket":"my-bucket", "s3_key":"hoge/fuga/uuid.csv.gz"}

rewrite_tag_filter

1
2
3
4
5
6
7
8
<match s3>
  @type rewrite_tag_filter
  <rule>
    key s3_key
    pattern ^(.+?)/(.+?)/.+\.gz$
    tag bigquery.$1.$2
  </rule>
</match>

s3_keyを正規表現で展開し、タグが書き換えられます。

1
tag:bigquery.hoge.fuga {"message":"1,tanaka,taro", "s3_bucket":"my-bucket", "s3_key":"hoge/fuga/uuid.csv.gz"}

filter

1
2
3
4
5
6
7
8
9
<filter bigquery.hoge.fuga>
    @type parser
    key_name message
    <parse>
        @type csv
        keys id,family_name,first_name
        null_empty_string true
    </parse>
</filter>

message以外のメタデータを捨て、csvデータをパースします。

1
tag:bigquery.hoge.fuga {"id":"1", "family_name":"tanaka", "first_name":"taro"}

bigquery_insert

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
<match bigquery.*>
  @type bigquery_insert
  project my-project
  dataset ${tag[1]}
  table ${tag[2]}
  auth_method application_default
  fetch_schema true
  <buffer tag>
    @type memory
  </buffer>
</match>

タグからdatasetとtableを取得し、対象のテーブルにレコードをstreaming insertします。 tagプレースホルダを使用するためには <buffer tag> の宣言が必要になります。

my-project:hoge.fuga

idfamily_namefirst_name
1tanakataro