a/ analytics note .jp

DATA · field log

e-Stat × DuckDB × BigQuery × dbt で作る政府統計データ基盤 ─ 探索から本番パイプラインまで

政府統計API(e-Stat)をDuckDBでその場探索し、BigQuery+dbtでbronze/silver/martの3層データ基盤に積み上げるアーキテクチャを設計から実装まで解説します。

· 14 min read · #DuckDB / #BigQuery / #dbt / #e-Stat / #データ基盤 / #Python / #SQL · AI-assisted · reviewed Share on X はてブ Zennにクロスポスト

目次


なぜ政府統計データが面白いか

政府統計データは、データ分析プロダクトに外部信号を加える最もコスパの高いリソースのひとつです。 無料・商用利用可・高品質という三拍子が揃った e-Stat(政府統計の総合窓口)を活用することで、自社データだけでは見えなかったマクロトレンドとの相関を掘り起こせます。

e-Stat は総務省統計局が運営するポータルサイトで、e-Stat API を通じてプログラムからデータを取得できます。e-Stat は、国勢調査・住民基本台帳・産業統計など数千のデータセットを REST API 経由で提供しています。APIキーの取得は無料で、商用利用も許可されており、レスポンスは JSON または XML で受け取れます。利用規約(CC BY 4.0 相当)に従う限り、SaaS プロダクトの機能に組み込むことも問題ありません。

具体的にどんな活用シーンがあるか。例えば、EC サイトの売上モデルに都道府県別人口・年齢構成を説明変数として加える、採用プロダクトに地域別就業者数トレンドを重ねる、不動産分析に人口移動統計を紐付けるといったケースが考えられます。こうした外部信号は、自社の行動ログだけでは再現できない文脈を与えてくれます。

本記事では、e-Stat API をデータ基盤に取り込む具体的なアーキテクチャとして DuckDB による即席探索 → Python クライアント → GCS/BigQuery bronze → dbt silver/mart という流れを設計から実装まで解説します。実際に手を動かせるコードを多く含めていますので、そのまま試してみてください。


DuckDB で e-Stat API を直接クエリする(探索フェーズ)

新しいデータソースを本番パイプラインに組み込む前に、まず DuckDB でその場探索するのが最速アプローチです。 DuckDB は read_text() 関数で HTTP エンドポイントを直接クエリでき、JSON の解析まで SQL 一本で完結します。ローカルに追加のミドルウェアをインストールする必要はありません。

e-Stat API の概要

e-Stat REST API が提供する主要エンドポイントは以下の3つです。本記事ではすべてのエンドポイントを扱います。

エンドポイント用途主なパラメータ
getStatsList統計表の一覧を取得searchWord, limit, statsCode
getMetaInfo統計表のメタ情報(分類・属性)を取得statsDataId
getStatsData統計数値を取得statsDataId, cdArea, cdTime

API キーは e-Stat アカウント登録 から無料で発行できます。発行後、appId パラメータとして各リクエストに付与します。

DuckDB で統計表一覧をクエリする

DuckDB CLI(またはJupyterのDuckDB接続)を起動し、以下を実行します。ESTAT_APP_ID にはご自身のアプリIDをセットしてください。

-- アプリIDをセッション変数に格納
SET VARIABLE estat_app_id = 'YOUR_APP_ID_HERE';

-- e-Stat API getStatsList を直接クエリ(検索ワード「人口」)
SELECT
  json_extract(value, '$.@id')::VARCHAR            AS stats_data_id,
  json_extract(value, '$.STAT_NAME."$"')::VARCHAR  AS stat_name,
  json_extract(value, '$.TITLE."$"')::VARCHAR      AS title,
  json_extract(value, '$.SURVEY_DATE')::VARCHAR    AS survey_date,
  json_extract(value, '$.OPEN_DATE')::VARCHAR      AS open_date
FROM (
  SELECT unnest(
    json_extract(
      read_text(
        'https://api.e-stat.go.jp/rest/3.0/app/json/getStatsList?appId='
        || getvariable('estat_app_id')
        || '&searchWord=人口&limit=100'
      )::JSON,
      '$.GET_STATS_LIST.DATALIST_INF.TABLE_INF'
    )
  ) AS value
)
LIMIT 20;

このクエリは e-Stat API にリクエストを送り、返ってきた JSON をその場で展開して表形式に変換します。結果から興味深い stats_data_id を見つけたら、次は getMetaInfo でどんな分類軸があるか確認します。

-- 特定統計表のメタ情報を確認
SELECT
  json_extract(value, '$.@code')::VARCHAR  AS class_code,
  json_extract(value, '$.@name')::VARCHAR  AS class_name,
  json_extract(value, '$.@description')::VARCHAR AS description
FROM (
  SELECT unnest(
    json_extract(
      read_text(
        'https://api.e-stat.go.jp/rest/3.0/app/json/getMetaInfo?appId='
        || getvariable('estat_app_id')
        || '&statsDataId=0003412313'  -- 国勢調査の例
      )::JSON,
      '$.GET_META_INFO.METADATA_INF.CLASS_INF.CLASS_OBJ'
    )
  ) AS value
);

実際のデータ値を取得する

統計表の構造が把握できたら getStatsData で数値を取得します。

-- 都道府県別・年次の人口データを取得
SELECT
  json_extract(value, '$.@area')::VARCHAR   AS area_code,
  json_extract(value, '$.@time')::VARCHAR   AS time_code,
  json_extract(value, '$.@cat01')::VARCHAR  AS category_code,
  json_extract(value, '$."$"')::DOUBLE      AS stat_value
FROM (
  SELECT unnest(
    json_extract(
      read_text(
        'https://api.e-stat.go.jp/rest/3.0/app/json/getStatsData?appId='
        || getvariable('estat_app_id')
        || '&statsDataId=0003412313'
        || '&metaGetFlg=N'
        || '&cntGetFlg=N'
        || '&limit=1000'
      )::JSON,
      '$.GET_STATS_DATA.STATISTICAL_DATA.DATA_INF.VALUE'
    )
  ) AS value
)
LIMIT 50;

DuckDB 探索のメリットは、仮説検証が SQL 一本で完結する点です。「この統計表は使えそうか」「どのフィールドが必要か」を確認するだけなら、Python スクリプトをゼロから書く必要はありません。探索で見当を付けてから、本番パイプラインの設計に進みましょう。


アーキテクチャ設計: L0〜L4 の5層

データ基盤全体を5つの論理層に分けることで、探索・収集・変換・提供の責務を分離します。 なぜ5層なのか。データパイプラインで最もコストがかかるのは「やり直し」です。生データを直接加工してしまうと、変換ロジックを変えたいときに API からの再取得が必要になります。層を分けておけば、特定の層だけ差し替えても全体への影響を局所化でき、やり直しのコストを最小化できます。

L0: Source
    e-Stat API(getStatsList / getMetaInfo / getStatsData)

L1: Raw / Bronze
    GCS raw archive(JSON ファイル)
    BigQuery bronze テーブル(生 JSON 文字列)

L2: Silver
    dbt で型変換・正規化
    dim_estat_stat / dim_estat_area / fact_estat_value

L3: Mart
    dbt で集計・結合
    mart_population_prefecture_year 等

L4: Application
    BI ツール(Looker Studio 等)
    プロダクト外部信号 API

各層の設計方針

ストレージ変換の種類変更頻度
L0 Sourcee-Stat APIなしAPI 仕様変更時
L1 BronzeGCS + BigQueryなし(生データ保存)収集バッチ更新時
L2 SilverBigQuery型変換・NULL 処理・正規化スキーマ変更時
L3 MartBigQuery集計・JOIN・ビジネスロジック要件変更時
L4 Application各ツール固有表示ロジックのみ頻繁

Bronze に生データをそのまま保存しておく設計(いわゆる「生ログ永久保存」)は重要です。e-Stat の API レスポンス仕様が変わっても、Bronze の JSON から Silver を再生成できます。dbt--full-refresh と組み合わせることで、過去データを含めた再変換が可能になります。

リポジトリ境界の設計

Hub リポジトリ(analytics_note)には docs/ADR のみを置き、実装は Lab リポジトリ(external-signal)に分離します。この境界を守ることで、Hub は方針・ドキュメント管理に専念でき、Lab は実験的な変更を安全に進められます。

analytics_note/                  # Hub リポジトリ
  docs/adr/005-estat-external-signal.md
  docs/external-data-sources.md

external-signal/                 # Lab リポジトリ
  src/
    collectors/estat/            # e-Stat APIクライアント
    loaders/bigquery/            # BigQuery ローダー
  dbt/
    models/
      bronze/                    # Bronze ビュー(ソース定義)
      silver/                    # Silver テーブル
      mart/                      # Mart テーブル
  infra/
    terraform/                   # GCS バケット・BQ データセット
  tests/

Python e-Stat クライアントの設計

型安全・リトライ・ロギングを備えた e-Stat クライアントを設計することで、本番パイプラインの信頼性を確保します。 探索フェーズの DuckDB クエリは便利ですが、定期実行する本番バッチには Python クライアントが適しています。

クライアント実装

# src/collectors/estat/client.py
from __future__ import annotations

import os
import time
import logging
from typing import Any

import requests

logger = logging.getLogger(__name__)


class EStatAPIError(Exception):
    """e-Stat API 固有のエラー"""
    pass


class EStatClient:
    """e-Stat REST API v3.0 クライアント

    環境変数 ESTAT_APP_ID からアプリIDを読み込む。
    リトライは指数バックオフで最大3回試行する。
    """

    BASE_URL = "https://api.e-stat.go.jp/rest/3.0/app/json"
    MAX_RETRIES = 3
    RATE_LIMIT_SLEEP = 1.0  # e-Stat 推奨: 1秒に1リクエスト

    def __init__(self) -> None:
        self.app_id = os.environ["ESTAT_APP_ID"]
        self.session = requests.Session()
        self.session.headers.update({"Accept": "application/json"})

    def _get(self, endpoint: str, params: dict[str, Any]) -> dict:
        """共通 GET リクエスト(リトライ付き)"""
        url = f"{self.BASE_URL}/{endpoint}"
        params = {"appId": self.app_id, **params}

        for attempt in range(self.MAX_RETRIES):
            try:
                resp = self.session.get(url, params=params, timeout=30)
                resp.raise_for_status()
                data = resp.json()
                # e-Stat 独自のエラーレスポンスを検出
                status = data.get("GET_STATS_LIST", {}).get("RESULT", {}).get("STATUS")
                if status and status != 0:
                    msg = data.get("GET_STATS_LIST", {}).get("RESULT", {}).get("ERROR_MSG", "")
                    raise EStatAPIError(f"API error status={status}: {msg}")
                return data
            except (requests.RequestException, EStatAPIError) as exc:
                if attempt == self.MAX_RETRIES - 1:
                    logger.error(
                        "Request failed after %d attempts: url=%s params=%s error=%s",
                        self.MAX_RETRIES, url, params, exc,
                    )
                    raise
                wait = 2 ** attempt
                logger.warning("Retrying in %ds (attempt %d): %s", wait, attempt + 1, exc)
                time.sleep(wait)

        raise RuntimeError("Unreachable")  # mypy 向け

    def get_stats_list(
        self,
        search_word: str | None = None,
        stats_code: str | None = None,
        limit: int = 100,
        start_position: int = 1,
    ) -> dict:
        """統計表一覧を取得する(getStatsList)"""
        params: dict[str, Any] = {"limit": limit, "startPosition": start_position}
        if search_word:
            params["searchWord"] = search_word
        if stats_code:
            params["statsCode"] = stats_code
        result = self._get("getStatsList", params)
        time.sleep(self.RATE_LIMIT_SLEEP)
        return result

    def get_meta_info(self, stats_data_id: str) -> dict:
        """統計表のメタ情報を取得する(getMetaInfo)"""
        result = self._get("getMetaInfo", {"statsDataId": stats_data_id})
        time.sleep(self.RATE_LIMIT_SLEEP)
        return result

    def get_stats_data(
        self,
        stats_data_id: str,
        cd_area: str | None = None,
        cd_time: str | None = None,
        limit: int = 10000,
    ) -> dict:
        """統計データを取得する(getStatsData)"""
        params: dict[str, Any] = {
            "statsDataId": stats_data_id,
            "metaGetFlg": "N",
            "cntGetFlg": "N",
            "limit": limit,
        }
        if cd_area:
            params["cdArea"] = cd_area
        if cd_time:
            params["cdTime"] = cd_time
        result = self._get("getStatsData", params)
        time.sleep(self.RATE_LIMIT_SLEEP)
        return result

設計上のポイント

指数バックオフの実装意図について説明します。e-Stat API は公共サービスであり、特定の時間帯に高負荷になることがあります。2 ** attempt の待機時間(1秒 → 2秒 → 4秒)により、一時的なエラーから自動回復しつつサーバーへの負荷を抑えられます。

RATE_LIMIT_SLEEP = 1.0 は e-Stat の利用ガイドラインに沿った設定です。大量の統計表を収集する際はこの値を守り、不必要な高速アクセスは避けてください。APIの制限に引っかかると全リクエストが503を返すようになるため、後続の全バッチに影響が及びます。


GCS raw アーカイブと BigQuery bronze テーブル

生データを GCS にアーカイブしてから BigQuery に取り込む2段階設計は、データ損失リスクを最小化するベストプラクティスです。 GCS が「真実の源泉」となるため、BigQuery 側に不具合が起きても再ロードできます。

GCS パーティション設計

gs://<project>-estat-raw/
  estat/
    raw/
      2026/
        04/
          08/
            0003412313.json   # stats_data_id がファイル名
            0003412314.json
            ...
      2026/
        04/
          09/
            ...

日付でパーティションすることで、特定日のデータを再収集した際に上書きするだけで済みます。stats_data_id をファイル名にすることで、BigQuery からの逆引きも容易になります。

BigQuery データセットとテーブル構成

-- bronze データセット作成(Terraform 管理推奨)
CREATE SCHEMA IF NOT EXISTS `{project}.bronze`
  OPTIONS (
    location = 'asia-northeast1',
    description = 'e-Stat raw JSON archive'
  );

-- bronze.estat_stats_data テーブル
CREATE TABLE IF NOT EXISTS `{project}.bronze.estat_stats_data`
(
  ingested_at    TIMESTAMP  NOT NULL,
  stats_data_id  STRING     NOT NULL,
  raw_json       STRING     NOT NULL  -- JSON 文字列をそのまま保存
)
PARTITION BY DATE(ingested_at)
CLUSTER BY stats_data_id
OPTIONS (
  partition_expiration_days = NULL,  -- 無期限保持
  description = 'e-Stat getStatsData の生JSONアーカイブ'
);

-- bronze.estat_stats_list テーブル(統計表一覧)
CREATE TABLE IF NOT EXISTS `{project}.bronze.estat_stats_list`
(
  ingested_at   TIMESTAMP  NOT NULL,
  search_word   STRING,
  raw_json      STRING     NOT NULL
)
PARTITION BY DATE(ingested_at)
OPTIONS (
  description = 'e-Stat getStatsList の生JSONアーカイブ'
);

BigQuery ローダー実装

# src/loaders/bigquery.py
from __future__ import annotations

import json
import logging
from datetime import datetime, timezone

from google.cloud import bigquery

logger = logging.getLogger(__name__)


class BQLoader:
    """BigQuery へのデータロードを担当するクラス"""

    def __init__(self, project: str | None = None, dataset: str = "bronze") -> None:
        self.client = bigquery.Client(project=project)
        self.dataset = dataset

    def insert_raw(
        self,
        table_name: str,
        data: dict,
        extra_fields: dict | None = None,
    ) -> int:
        """JSON データを bronze テーブルにストリーミング挿入する"""
        table_ref = f"{self.client.project}.{self.dataset}.{table_name}"
        row = {
            "ingested_at": datetime.now(tz=timezone.utc).isoformat(),
            "raw_json": json.dumps(data, ensure_ascii=False),
            **(extra_fields or {}),
        }
        errors = self.client.insert_rows_json(table_ref, [row])
        if errors:
            logger.error("BigQuery insert errors: %s", errors)
            raise RuntimeError(f"BigQuery insert failed: {errors}")
        logger.info("Inserted 1 row to %s", table_ref)
        return 1

dbt silver モデルの設計

Silver 層の責務は「型変換・NULL 処理・正規化」に絞り込み、ビジネスロジックを持ち込まないことが重要です。 Bronze の生 JSON から構造化テーブルを生成し、Mart 層が安心して使えるデータを提供します。

dbt プロジェクト構成

dbt/
  dbt_project.yml
  profiles.yml        # .gitignore 対象
  models/
    sources.yml       # bronze テーブルのソース定義
    silver/
      dim_estat_stat.sql
      dim_estat_area.sql
      fact_estat_value.sql
      schema.yml
    mart/
      mart_population_prefecture_year.sql
      schema.yml
  tests/
  macros/

sources.yml の定義

# dbt/models/sources.yml
version: 2

sources:
  - name: bronze
    database: "{{ env_var('DBT_PROJECT_ID') }}"
    schema: bronze
    tables:
      - name: estat_stats_data
        description: "e-Stat getStatsData の生JSONアーカイブ"
        columns:
          - name: ingested_at
            description: "取り込み日時(UTC)"
          - name: stats_data_id
            description: "e-Stat 統計表ID"
          - name: raw_json
            description: "APIレスポンスの生JSON文字列"
      - name: estat_stats_list
        description: "e-Stat getStatsList の生JSONアーカイブ"

dim_estat_stat.sql(統計表マスタ)

-- dbt/models/silver/dim_estat_stat.sql
{{ config(
    materialized='table',
    partition_by={
        'field': 'ingested_date',
        'data_type': 'date'
    },
    cluster_by=['stats_data_id']
) }}

WITH source AS (
    SELECT
        stats_data_id,
        raw_json,
        ingested_at,
        DATE(ingested_at) AS ingested_date,
        -- 最新取り込みのみ残す
        ROW_NUMBER() OVER (
            PARTITION BY stats_data_id
            ORDER BY ingested_at DESC
        ) AS rn
    FROM {{ source('bronze', 'estat_stats_data') }}
    WHERE stats_data_id IS NOT NULL
),

parsed AS (
    SELECT
        stats_data_id,
        JSON_VALUE(raw_json, '$.GET_STATS_DATA.STATISTICAL_DATA.TABLE_INF.@id')
            AS stats_data_id_from_json,
        JSON_VALUE(raw_json, '$.GET_STATS_DATA.STATISTICAL_DATA.TABLE_INF.STAT_NAME."$"')
            AS stat_name,
        JSON_VALUE(raw_json, '$.GET_STATS_DATA.STATISTICAL_DATA.TABLE_INF.TITLE."$"')
            AS title,
        JSON_VALUE(raw_json, '$.GET_STATS_DATA.STATISTICAL_DATA.TABLE_INF.SURVEY_DATE')
            AS survey_date,
        JSON_VALUE(raw_json, '$.GET_STATS_DATA.STATISTICAL_DATA.TABLE_INF.OPEN_DATE')
            AS open_date,
        ingested_at,
        ingested_date
    FROM source
    WHERE rn = 1
)

SELECT *
FROM parsed
WHERE stat_name IS NOT NULL

fact_estat_value.sql(統計数値ファクト)

Silver 層で最も重要なのはファクトテーブルです。e-Stat の VALUE 配列を展開し、各観測値を1行にします。

-- dbt/models/silver/fact_estat_value.sql
{{ config(
    materialized='incremental',
    unique_key=['stats_data_id', 'area_code', 'time_code', 'category_code'],
    partition_by={
        'field': 'ingested_date',
        'data_type': 'date'
    },
    cluster_by=['stats_data_id', 'area_code']
) }}

WITH raw_values AS (
    SELECT
        stats_data_id,
        DATE(ingested_at) AS ingested_date,
        -- BigQuery の JSON_QUERY_ARRAY で配列展開
        value_row
    FROM {{ source('bronze', 'estat_stats_data') }},
    UNNEST(
        JSON_QUERY_ARRAY(raw_json, '$.GET_STATS_DATA.STATISTICAL_DATA.DATA_INF.VALUE')
    ) AS value_row

    {% if is_incremental() %}
    WHERE DATE(ingested_at) > (SELECT MAX(ingested_date) FROM {{ this }})
    {% endif %}
)

SELECT
    stats_data_id,
    JSON_VALUE(value_row, '$.@area')                       AS area_code,
    JSON_VALUE(value_row, '$.@time')                       AS time_code,
    JSON_VALUE(value_row, '$.@cat01')                      AS category_code,
    SAFE_CAST(JSON_VALUE(value_row, '$."$"') AS FLOAT64)   AS stat_value,
    ingested_date
FROM raw_values
WHERE JSON_VALUE(value_row, '$."$"') IS NOT NULL
  AND JSON_VALUE(value_row, '$."$"') != '-'  -- e-Stat の欠損値表現

schema.yml でデータ品質テストを定義

# dbt/models/silver/schema.yml
version: 2

models:
  - name: fact_estat_value
    description: "e-Stat 統計数値ファクトテーブル(silver 層)"
    columns:
      - name: stats_data_id
        tests:
          - not_null
      - name: area_code
        tests:
          - not_null
      - name: stat_value
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0

dbt mart モデル(都道府県別人口推移)

Mart 層はビジネス課題に直結した集計テーブルを提供します。 Silver のファクト・ディメンションを JOIN して、プロダクトや BI ツールが使いやすい形に整形します。

dim_estat_area.sql(地域マスタ)

-- dbt/models/silver/dim_estat_area.sql
{{ config(materialized='table') }}

WITH area_meta AS (
    SELECT DISTINCT
        JSON_VALUE(class_obj, '$.@code')  AS area_code,
        JSON_VALUE(class_obj, '$.@name')  AS area_name,
        JSON_VALUE(class_obj, '$.@level') AS area_level,
        DATE(ingested_at) AS ingested_date
    FROM {{ source('bronze', 'estat_stats_data') }},
    UNNEST(
        JSON_QUERY_ARRAY(
            raw_json,
            '$.GET_STATS_DATA.STATISTICAL_DATA.CLASS_INF.CLASS_OBJ'
        )
    ) AS class_obj_array,
    UNNEST(
        JSON_QUERY_ARRAY(class_obj_array, '$.CLASS')
    ) AS class_obj
    WHERE JSON_VALUE(class_obj_array, '$.@name') = '地域'
)

SELECT
    area_code,
    area_name,
    area_level,
    -- 都道府県コード(2桁)を付与
    CASE
        WHEN LENGTH(area_code) = 5 THEN LEFT(area_code, 2)
        ELSE area_code
    END AS pref_code,
    MAX(ingested_date) AS latest_ingested_date
FROM area_meta
WHERE area_code IS NOT NULL
GROUP BY 1, 2, 3, 4

mart_population_prefecture_year.sql

-- dbt/models/mart/mart_population_prefecture_year.sql
{{ config(
    materialized='table',
    cluster_by=['area_code', 'year']
) }}

WITH population_fact AS (
    SELECT
        f.area_code,
        -- e-Stat の time_code は "2020000000" 形式なので年を抽出
        CAST(LEFT(f.time_code, 4) AS INT64) AS year,
        f.stat_value                         AS population
    FROM {{ ref('fact_estat_value') }} f
    -- 国勢調査の stats_data_id を直接指定(または category_code でフィルタ)
    WHERE f.category_code = '001'  -- 総人口
      AND f.stat_value IS NOT NULL
),

with_area AS (
    SELECT
        p.area_code,
        d.area_name,
        d.pref_code,
        p.year,
        p.population
    FROM population_fact p
    JOIN {{ ref('dim_estat_area') }} d
        ON p.area_code = d.area_code
    WHERE d.area_level = '2'  -- 都道府県レベル
)

SELECT
    pref_code,
    area_code,
    area_name,
    year,
    SUM(population) AS population
FROM with_area
GROUP BY 1, 2, 3, 4
ORDER BY year DESC, pref_code

プロダクト外部信号としての活用

このマートテーブルを BigQuery から直接クエリして、プロダクトの機能に組み込む例を示します。

# プロダクト API: 都道府県コードを受け取り人口推移を返す
from google.cloud import bigquery

def get_population_trend(pref_code: str, from_year: int = 2015) -> list[dict]:
    """都道府県コードに対応する人口推移を返す"""
    client = bigquery.Client()
    query = """
        SELECT year, population
        FROM `{project}.mart.mart_population_prefecture_year`
        WHERE pref_code = @pref_code
          AND year >= @from_year
        ORDER BY year
    """
    job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("pref_code", "STRING", pref_code),
            bigquery.ScalarQueryParameter("from_year", "INT64", from_year),
        ]
    )
    rows = client.query(query.format(project="your-project"), job_config=job_config)
    return [{"year": row.year, "population": row.population} for row in rows]

このような形で Mart テーブルをラップする API を作ると、フロントエンドやデータサイエンスチームが自由に利用できます。BigQuery の行レベルセキュリティと組み合わせることで、アクセス制御も細かく設定できます。


Cloud Run ingest エントリポイント

Cloud Run をインジェスト処理のエントリポイントにすることで、サーバーレスかつスケーラブルなパイプラインを構成できます。 Cloud Scheduler からの HTTP POST でトリガーし、Cloud Run がコレクターとローダーを呼び出す設計です。

Flask アプリ実装

# src/entrypoint.py
from __future__ import annotations

import logging
import os

from flask import Flask, Response, jsonify, request

from collectors.estat.client import EStatClient, EStatAPIError
from loaders.bigquery import BQLoader

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = Flask(__name__)


@app.route("/health", methods=["GET"])
def health() -> Response:
    return jsonify({"status": "ok"})


@app.route("/ingest/stats-list", methods=["POST"])
def ingest_stats_list() -> Response:
    """統計表一覧を収集して bronze に書き込む"""
    body = request.get_json(silent=True) or {}
    keyword = body.get("keyword", "人口")
    limit = int(body.get("limit", 100))

    try:
        client = EStatClient()
        loader = BQLoader()
        data = client.get_stats_list(search_word=keyword, limit=limit)
        loader.insert_raw(
            "estat_stats_list",
            data,
            extra_fields={"search_word": keyword},
        )
        return jsonify({"status": "ok", "keyword": keyword})
    except EStatAPIError as exc:
        logger.error("e-Stat API error: %s", exc)
        return jsonify({"status": "error", "message": str(exc)}), 502
    except Exception as exc:
        logger.exception("Unexpected error: %s", exc)
        return jsonify({"status": "error", "message": "internal error"}), 500


@app.route("/ingest/stats-data", methods=["POST"])
def ingest_stats_data() -> Response:
    """指定統計表のデータを収集して bronze に書き込む"""
    body = request.get_json(silent=True) or {}
    stats_data_id = body.get("statsDataId")

    if not stats_data_id:
        return jsonify({"status": "error", "message": "statsDataId is required"}), 400

    try:
        client = EStatClient()
        loader = BQLoader()
        data = client.get_stats_data(
            stats_data_id=stats_data_id,
            cd_area=body.get("cdArea"),
            cd_time=body.get("cdTime"),
        )
        loader.insert_raw(
            "estat_stats_data",
            data,
            extra_fields={"stats_data_id": stats_data_id},
        )
        return jsonify({"status": "ok", "stats_data_id": stats_data_id})
    except EStatAPIError as exc:
        logger.error("e-Stat API error: %s", exc)
        return jsonify({"status": "error", "message": str(exc)}), 502
    except Exception as exc:
        logger.exception("Unexpected error: %s", exc)
        return jsonify({"status": "error", "message": "internal error"}), 500


if __name__ == "__main__":
    port = int(os.environ.get("PORT", 8080))
    app.run(host="0.0.0.0", port=port, debug=False)

Dockerfile

FROM python:3.12-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ ./src/

ENV PYTHONPATH=/app/src

CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "2", "entrypoint:app"]

Cloud Scheduler 設定(Terraform)

# infra/terraform/scheduler.tf
resource "google_cloud_scheduler_job" "ingest_population" {
  name      = "estat-ingest-population"
  schedule  = "0 3 * * 1"  # 毎週月曜 3:00 JST
  time_zone = "Asia/Tokyo"

  http_target {
    uri         = "${google_cloud_run_service.ingest.status[0].url}/ingest/stats-list"
    http_method = "POST"
    body        = base64encode(jsonencode({ keyword = "人口", limit = 100 }))

    oidc_token {
      service_account_email = google_service_account.scheduler.email
    }
  }
}

運用上の注意点

実本番環境で安定稼働させるために把握しておくべき注意点をまとめます。

e-Stat API のレートリミット

e-Stat の利用規約では「過度なアクセスを避けること」と定めています。実装上の推奨は以下のとおりです。

項目推奨値理由
リクエスト間隔1秒以上サーバー負荷軽減
同時並列リクエスト1(直列)レートリミット回避
バッチ実行時間帯深夜〜早朝ピーク時間を避ける
limit パラメータ最大10,0001リクエストの取得上限

統計表によってはデータ件数が10,000件を超える場合があります。その際は startPosition パラメータを使ってページングしてください。

認証情報の管理

# .env(必ず .gitignore に追加)
ESTAT_APP_ID=your_app_id_here
GOOGLE_CLOUD_PROJECT=your-project-id

# .gitignore
.env
.env.local
credentials/

Cloud Run にシークレットを渡す際は、直接環境変数に書かず Secret Manager を使います。

# Secret Manager に登録
gcloud secrets create estat-app-id --data-file=- <<< "YOUR_APP_ID"

# Cloud Run にシークレットをマウント
gcloud run services update estat-ingest \
  --set-secrets "ESTAT_APP_ID=estat-app-id:latest"

段階的な開発フロー

Step 1: DuckDB で探索
  ↓ 使えそうなデータセットを特定
Step 2: Python クライアントで小規模取得
  ↓ 1〜2個の統計表を試す
Step 3: bronze への蓄積(ローカル BigQuery Emulator)
  ↓ スキーマを確認
Step 4: dbt silver を手動実行
  ↓ データ品質テストをパス
Step 5: Cloud Run + Scheduler で定期実行
  ↓ 本番安定稼働

DuckDB 探索をスキップして最初から Cloud Run で書くと、「このデータは使えない」と分かった時点でパイプライン全体を作り直す羽目になります。探索フェーズに1〜2時間かけておくと、実装フェーズの手戻りを大幅に減らせます。

dbt でのデータ品質管理

# dbt/models/mart/schema.yml
version: 2

models:
  - name: mart_population_prefecture_year
    description: "都道府県別・年次人口マートテーブル"
    columns:
      - name: pref_code
        tests:
          - not_null
          - relationships:
              to: ref('dim_estat_area')
              field: pref_code
      - name: year
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 1980
              max_value: 2030
      - name: population
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0

dbt のテストを CI に組み込み、dbt test が失敗したら Mart テーブルを更新しない設計にしておくと、BI ツールに壊れたデータが流れることを防げます。


まとめと今後の展開

本記事では、e-Stat API をデータ基盤に取り込む一連の流れを解説しました。各フェーズのポイントを振り返ります。

フェーズツール主な成果物
探索DuckDB使えるデータセットの特定
収集Python + Cloud Runbronze テーブルへの生JSON蓄積
変換dbt (silver)型安全・正規化済みの dim/fact テーブル
集計dbt (mart)プロダクト・BI 向けの集計テーブル
提供BigQuery API外部信号 API・BI 接続

次のステップ

Mart の拡充として、都道府県別人口以外にも以下のマートテーブルを追加することが考えられます。

  • mart_employment_industry_year(産業別就業者数)
  • mart_birth_rate_prefecture_year(都道府県別出生率)
  • mart_household_income_distribution(所帯収入分布)

他省庁APIへの横展開も重要な次のステップです。国土交通省の不動産取引価格情報 API、厚生労働省の求人倍率データなど、同様のパターンで取り込めるデータソースが複数あります。EStatClient のクラス構造を継承・拡張することで、新しいデータソースのクライアントをスムーズに追加できます。

# 横展開の例: 国交省 不動産取引価格 API
class MLITClient(BaseGovDataClient):
    BASE_URL = "https://www.land.mlit.go.jp/webland/api"
    # EStatClient と共通のリトライ・ロギング機構を継承

このパターンの本質は「探索と本番を分離する」ことです。DuckDB という使い捨ての探索環境を持つことで、本番パイプラインを汚染せずに新しいデータソースを試せます。外部データを躊躇なく試し、価値があると分かったものだけを本番に昇格させる文化が、データ活用を加速させます。

政府統計データは整備されたデータの宝庫であり、多くの企業でまだ活用されていない競争優位の源泉です。本記事のアーキテクチャを出発点に、自社プロダクトに合わせた外部信号データ基盤を構築してみてください。

関連記事

F/ この記事の設計を反映しているプロダクト: FlowAgent

see →
an

analytics note — editor

AI とデータ分析の実装ログを毎週編集。設計判断と運用のつまずきを、再現できる形で残すことを大切にしています。