a/ analytics note .jp

TECH · field log

e-Stat API Python クライアントの設計と実装 ─ retry/backoff・型安全・テスト戦略まで

日本の政府統計API(e-Stat)の3エンドポイントをPythonで型安全にラップするクライアントライブラリの設計を解説。exponential backoff、ロギング、pytestでのモックテスト戦略まで。

· 12 min read · #Python / #e-Stat / #API設計 / #pytest / #データ収集 / #requests · AI-assisted · reviewed Share on X はてブ Zennにクロスポスト

目次


はじめに

e-Stat は総務省統計局が提供する日本の政府統計ポータルです。API 仕様が公開されており、プログラムからデータを取得できます。国勢調査・労働力調査・消費者物価指数など、国内の主要統計を無料で取得できる API を公開しており、データエンジニアやアナリストにとって非常に価値のあるデータソースです。

e-Stat API の魅力を3点挙げると、まず無料かつ公的な点が挙げられます。申請フォームに必要事項を記入するだけで API キーを取得でき、商用利用も条件付きで認められています。次に時系列データが揃っている点で、国勢調査であれば過去数十年分の変化を追えます。最後に地域・産業・年齢など多軸のメタデータが整備されており、クロス集計が容易です。

一方で、そのまま素直に使おうとすると面倒な側面もあります。レスポンスの JSON 構造が XML 設計の名残で複雑にネストしており、@id$ といった特殊なキー名が登場します。公式ドキュメントにレートリミットの明示がなく、エラーが HTTP 200 で返ってくるケースがあるなど、独特の癖があります。

本記事では、これらの癖をすべて吸収する Python クライアントライブラリ の設計と実装を解説します。exponential backoff による自動リトライ、型安全な設計、pytest + responses によるモックテスト戦略まで、実プロジェクトで使える完全な実装を紹介します。


e-Stat API の仕様と癖

3つのエンドポイント

e-Stat API(REST 版 3.0)には主要な3エンドポイントがあります。データ収集の典型的なフローは「統計表を検索 → メタ情報で分類コードを確認 → 実データを取得」という順序になります。

エンドポイントHTTP メソッドパス役割
getStatsListGET/rest/3.0/app/json/getStatsList統計表一覧をキーワード・政府機関・統計分野等で検索する
getMetaInfoGET/rest/3.0/app/json/getMetaInfo統計表のメタ情報(分類コード・地域コード・時間軸等)を取得する
getStatsDataGET/rest/3.0/app/json/getStatsData実際の統計数値データを取得する

主なリクエストパラメータ

各エンドポイントで頻繁に使うパラメータをまとめます。

パラメータ名対象エンドポイント説明
appId全エンドポイントAPI キー(必須)
searchWordgetStatsList統計名・説明文のキーワード検索
statsDataIdgetMetaInfo / getStatsData統計表 ID(10桁の数字)
startPositiongetStatsDataページネーション開始位置(1始まり)
limitgetStatsList / getStatsData1リクエストで取得する最大件数
cdCat01getStatsData分類コードによる絞り込み

レスポンス構造の複雑さ

e-Stat API の最大の癖は JSON レスポンスの深いネスト構造です。もとの設計が XML ベースであるため、XML 属性が @ プレフィックスのキーとして、XML テキストノードが $ キーとして出てきます。

以下は getStatsData のレスポンス(抜粋)です。

{
  "GET_STATS_DATA": {
    "RESULT": {
      "STATUS": 0,
      "ERROR_MSG": "正常に終了しました。",
      "DATE": "2026-04-08T12:00:00.000+09:00"
    },
    "PARAMETER": {
      "STATS_DATA_ID": "0003411595",
      "START_POSITION": 1,
      "LIMIT": 10000
    },
    "STATISTICAL_DATA": {
      "TABLE_INF": {
        "@id": "0003411595",
        "STAT_NAME": { "@code": "00200521", "$": "国勢調査" },
        "GOV_ORG":   { "@code": "00200", "$": "総務省" },
        "TOTAL_NUMBER": 4200
      },
      "CLASS_INF": {
        "CLASS_OBJ": [
          {
            "@id": "cat01",
            "@name": "男女",
            "CLASS": [
              { "@code": "001", "@name": "総数", "@level": "1" },
              { "@code": "002", "@name": "男",   "@level": "2" },
              { "@code": "003", "@name": "女",   "@level": "2" }
            ]
          }
        ]
      },
      "DATA_INF": {
        "NOTE": { "@char": "*", "$": "統計値なし" },
        "VALUE": [
          { "@cat01": "001", "@area": "01000", "@time": "2020000000", "$": "5224614" },
          { "@cat01": "002", "@area": "01000", "@time": "2020000000", "$": "2502594" },
          { "@cat01": "003", "@area": "01000", "@time": "2020000000", "$": "2722020" }
        ]
      }
    }
  }
}

実際の数値は GET_STATS_DATA.STATISTICAL_DATA.DATA_INF.VALUE という深さ4階層のリストに格納されており、各要素の分類コードはすべて @cat01 のような属性スタイルのキーです。これをそのまま pandas の DataFrame に変換しようとすると、かなり煩雑なコードになります。

HTTP 200 エラー問題

もう一つの重大な癖が エラーレスポンスが HTTP 200 で返ってくる点です。たとえば API キーが不正な場合や、存在しない統計表 ID を指定した場合でも、HTTP ステータスコードは 200 で、RESULT.STATUS フィールドに 0 以外の値が入ります。

{
  "GET_STATS_LIST": {
    "RESULT": {
      "STATUS": 100,
      "ERROR_MSG": "APIキーが不正です。",
      "DATE": "2026-04-08T12:00:00.000+09:00"
    }
  }
}

requests.raise_for_status() だけではこのエラーを検出できないため、レスポンスボディを必ずパースしてステータスを確認する処理が必要になります。

レートリミット

公式ドキュメントにレートリミットの数値は明示されていませんが、1秒に1リクエスト程度のペースが推奨されています。大量のデータを取得する際はリクエスト間に time.sleep(1) を挟むか、exponential backoff で安全に処理しましょう。


プロジェクト構造

ここで紹介するクライアントは、データ収集パイプライン全体のコレクター層として設計しています。将来的に e-Stat 以外の API(気象庁 API、RESAS API 等)も同じインターフェースで追加できるよう、抽象基底クラスを用意しています。以下はプロジェクト構成の例です。ディレクトリ名は自由に変更してかまいません。

your-data-pipeline/
├── src/
│   └── collectors/
│       ├── __init__.py
│       ├── base.py              # 共通インターフェース(抽象基底クラス)
│       └── estat/
│           ├── __init__.py
│           └── client.py        # EStatClient 本体
├── tests/
│   ├── __init__.py
│   └── test_estat_client.py     # pytest テスト
├── .env                         # ローカル開発用(.gitignore に追加)
├── .env.example                 # リポジトリにコミットするテンプレート
└── requirements.txt             # 依存パッケージ一覧

requirements.txt は以下の通りです。

requests>=2.31.0
python-dotenv>=1.0.0
responses>=0.25.0   # テスト用モックライブラリ
pytest>=8.0.0
pytest-cov>=4.0.0

共通インターフェース(base.py)の設計

複数の API クライアントを作るプロジェクトでは、共通の抽象基底クラスを用意しておくと保守性が上がります。全コレクターが同じメソッドシグネチャを持つことで、依存性の注入やテスト時のモック差し替えが容易になります。

# src/collectors/base.py
from abc import ABC, abstractmethod
from typing import Any


class BaseCollector(ABC):
    """外部データ収集クライアントの共通インターフェース。

    新しい API クライアントはこのクラスを継承し、
    collect() と health_check() を必ず実装すること。
    """

    @abstractmethod
    def collect(self, **kwargs) -> dict[str, Any]:
        """データを収集して dict で返す。

        各サブクラスで「そのクライアントの最も基本的な収集操作」を実装する。
        e-Stat クライアントであれば get_stats_list() がこれに相当する。
        """
        ...

    @abstractmethod
    def health_check(self) -> bool:
        """API が正常に疎通できるかを確認する。

        Returns:
            True: 疎通OK
            False: 疎通不可(例外は握りつぶしてFalseを返す)
        """
        ...

ABCabstractmethod を使うことで、collect()health_check() を実装し忘れた場合にクラスのインスタンス化時点でエラーが発生します。実行時に初めて発覚するのではなく、開発中に問題を検出できる点が型安全設計のメリットです。


EStatClient の完全実装

以下が EStatClient の完全な実装です。設計上のポイントを先に整理しておきます。

設計上のポイント:

  • requests.Session を使い回すことで TCP コネクションを再利用してパフォーマンスを改善する
  • User-Agent ヘッダーを設定してリクエスト元を明示する
  • appId はログに出力しない(秘密情報の漏洩防止)
  • HTTP レベルのエラーと API レベルのエラーを分離して処理する
  • exponential backoff(2^attempt 秒)でリトライ間隔を指数的に増加させる
# src/collectors/estat/client.py
import os
import time
import logging
from typing import Any

import requests  # https://docs.python-requests.org/en/latest/
from dotenv import load_dotenv

from ..base import BaseCollector

load_dotenv()
logger = logging.getLogger(__name__)


class EStatAPIError(Exception):
    """e-Stat API 呼び出しエラー。

    HTTP レベルのエラーと API レベルのエラーの両方をラップする。
    呼び出し元でエラー内容を詳しくログに残せるよう、
    status_code / url / params を属性として保持する。
    """

    def __init__(
        self,
        message: str,
        status_code: int | None = None,
        url: str = "",
        params: dict | None = None,
    ):
        super().__init__(message)
        self.status_code = status_code
        self.url = url
        self.params = params or {}


class EStatClient(BaseCollector):
    """e-Stat REST API v3.0 クライアント。

    getStatsList / getMetaInfo / getStatsData の3エンドポイントをサポートする。
    HTTP エラーは exponential backoff でリトライし、
    API レベルエラー(HTTP 200 で返ってくる)も検出する。

    Args:
        app_id: e-Stat の API キー。省略時は環境変数 ESTAT_APP_ID を使用する。

    Examples:
        >>> client = EStatClient()  # ESTAT_APP_ID 環境変数から読み込む
        >>> result = client.get_stats_list(search_word="人口", limit=10)
    """

    BASE_URL = "https://api.e-stat.go.jp/rest/3.0/app/json"
    MAX_RETRIES = 3
    # これらのステータスコードはリトライ対象とする
    RETRY_STATUSES = {429, 500, 502, 503, 504}

    def __init__(self, app_id: str | None = None) -> None:
        self.app_id = app_id or os.environ["ESTAT_APP_ID"]
        self.session = requests.Session()
        self.session.headers.update(
            {"User-Agent": "analytics-note-estat-client/1.0"}
        )

    def _get(self, endpoint: str, params: dict[str, Any]) -> dict:
        """内部 GET メソッド。リトライと API レベルエラー検出を担う。

        Args:
            endpoint: エンドポイント名(例: "getStatsList")
            params: appId を除いたリクエストパラメータ

        Returns:
            パース済みの JSON レスポンス dict

        Raises:
            EStatAPIError: HTTP エラーまたは API レベルエラーが発生した場合
        """
        url = f"{self.BASE_URL}/{endpoint}"
        # appId をパラメータに追加する(ログには出力しない)
        full_params = {**params, "appId": self.app_id}
        log_params = {k: v for k, v in params.items()}  # appId を含まない

        for attempt in range(self.MAX_RETRIES):
            try:
                logger.debug(
                    "Request: endpoint=%s params=%s attempt=%d",
                    endpoint,
                    log_params,
                    attempt,
                )
                resp = self.session.get(url, params=full_params, timeout=30)
                resp.raise_for_status()

                data = resp.json()

                # e-Stat は HTTP 200 でもエラーを返すため、ボディを検査する
                self._check_api_error(data, url, log_params)

                logger.debug("Response OK: endpoint=%s", endpoint)
                return data

            except requests.HTTPError as e:
                status = resp.status_code
                if status not in self.RETRY_STATUSES or attempt == self.MAX_RETRIES - 1:
                    logger.error(
                        "HTTP error: status=%d url=%s params=%s",
                        status,
                        url,
                        log_params,
                    )
                    raise EStatAPIError(
                        str(e), status_code=status, url=url, params=log_params
                    ) from e
                wait = 2 ** attempt
                logger.warning(
                    "Retry %d/%d after %ds: status=%d",
                    attempt + 1,
                    self.MAX_RETRIES,
                    wait,
                    status,
                )
                time.sleep(wait)

            except requests.RequestException as e:
                # タイムアウト・接続エラー等もリトライ対象とする
                if attempt == self.MAX_RETRIES - 1:
                    logger.error(
                        "Request failed: url=%s params=%s error=%s",
                        url,
                        log_params,
                        e,
                    )
                    raise EStatAPIError(str(e), url=url, params=log_params) from e
                wait = 2 ** attempt
                logger.warning(
                    "Retry %d/%d after %ds: error=%s",
                    attempt + 1,
                    self.MAX_RETRIES,
                    wait,
                    e,
                )
                time.sleep(wait)

        # ここには到達しないが型チェッカーのために追加する
        raise EStatAPIError("Max retries exceeded", url=url, params=log_params)

    def _check_api_error(self, data: dict, url: str, params: dict) -> None:
        """HTTP 200 で返ってくる API レベルのエラーを検出する。

        e-Stat API は RESULT.STATUS が 0 以外の場合にエラーを示す。
        エンドポイントによってトップレベルキーが異なるため、
        既知の全キーを順番にチェックする。
        """
        top_level_keys = [
            "GET_STATS_LIST",
            "GET_META_INFO",
            "GET_STATS_DATA",
        ]
        for key in top_level_keys:
            result = data.get(key, {}).get("RESULT", {})
            if not result:
                continue
            try:
                status = int(result.get("STATUS", 0))
            except (ValueError, TypeError):
                continue
            if status != 0:
                message = result.get("ERROR_MSG", "Unknown API error")
                raise EStatAPIError(
                    f"API error (STATUS={status}): {message}",
                    url=url,
                    params=params,
                )

    def get_stats_list(
        self,
        search_word: str = "",
        limit: int = 100,
        **kwargs: Any,
    ) -> dict:
        """統計表一覧を検索する(getStatsList)。

        Args:
            search_word: 検索キーワード。空文字の場合は全件に近い結果が返る。
            limit: 1リクエストで取得する最大件数(最大 100000)。
            **kwargs: statsCode(統計分野コード)、surveyYears(調査年)等の追加パラメータ。

        Returns:
            getStatsList のレスポンス dict。

        Examples:
            >>> result = client.get_stats_list(search_word="労働力調査", limit=10)
            >>> tables = result["GET_STATS_LIST"]["DATALIST_INF"]["TABLE_INF"]
        """
        params: dict[str, Any] = {"searchWord": search_word, "limit": limit, **kwargs}
        return self._get("getStatsList", params)

    def get_meta_info(self, stats_data_id: str) -> dict:
        """統計表のメタ情報を取得する(getMetaInfo)。

        分類コード(cat01 等)・地域コード(area)・時間軸(time)の
        コードと名称の対応表が含まれる。getStatsData で絞り込む際に参照する。

        Args:
            stats_data_id: 統計表 ID(10桁の数字文字列)。

        Returns:
            getMetaInfo のレスポンス dict。
        """
        return self._get("getMetaInfo", {"statsDataId": stats_data_id})

    def get_stats_data(
        self,
        stats_data_id: str,
        start_position: int = 1,
        limit: int = 10000,
        **kwargs: Any,
    ) -> dict:
        """統計データを取得する(getStatsData)。

        1リクエストで取得できる上限は 100000 件。大量データは
        start_position でページネーションする(get_all_stats_data を推奨)。

        Args:
            stats_data_id: 統計表 ID。
            start_position: 取得開始位置(1始まり)。
            limit: 1リクエストで取得する最大件数。
            **kwargs: cdCat01(分類コード)、cdArea(地域コード)等の絞り込みパラメータ。

        Returns:
            getStatsData のレスポンス dict。
        """
        params: dict[str, Any] = {
            "statsDataId": stats_data_id,
            "startPosition": start_position,
            "limit": limit,
            **kwargs,
        }
        return self._get("getStatsData", params)

    def collect(self, **kwargs: Any) -> dict:
        """BaseCollector インターフェースの実装。getStatsList を呼び出す。"""
        return self.get_stats_list(**kwargs)

    def health_check(self) -> bool:
        """API との疎通確認を行う。

        「人口」キーワードで1件だけ取得し、正常にレスポンスが返ればTrueを返す。
        EStatAPIError が発生した場合は False を返す(例外は伝播させない)。
        """
        try:
            result = self.get_stats_list(search_word="人口", limit=1)
            return bool(result)
        except EStatAPIError:
            return False

exponential backoff の仕組み

_get() 内のリトライロジックを詳しく見ると、wait = 2 ** attempt で待機時間を計算しています。

attemptwait 時間
0 → 1回目リトライ1秒(2^0)
1 → 2回目リトライ2秒(2^1)
2 → 3回目リトライ4秒(2^2)

3回リトライしても解消しない場合は EStatAPIError を送出します。本番環境でより細かい制御が必要な場合は、tenacity ライブラリを使うとジッター付きの backoff を簡単に設定できます。


テスト戦略(pytest + responses)

外部 API のテストでは、実際の API を叩かずにモックで代替するのが原則です。responses ライブラリを使うと、requests ライブラリの HTTP 通信をデコレータ一つでモックできます。pytest と組み合わせてテストを構成します。

テストの観点

テストすべき観点を整理します。

テスト観点説明
正常系各エンドポイントが正常なレスポンスを返した場合
HTTP エラーリトライ503 等のリトライ対象ステータスで MAX_RETRIES 回失敗後に例外
API レベルエラーHTTP 200 だが STATUS != 0 の場合に EStatAPIError
タイムアウトrequests.Timeout が発生した場合のリトライ
health_check疎通OK/NG のそれぞれ
# tests/test_estat_client.py
import pytest
import responses as responses_mock
from requests.exceptions import Timeout

from src.collectors.estat.client import EStatClient, EStatAPIError

# テスト用のベース URL
BASE_URL = "https://api.e-stat.go.jp/rest/3.0/app/json"


# ──────────────────────────────────────────────
# フィクスチャ
# ──────────────────────────────────────────────

@pytest.fixture
def client() -> EStatClient:
    """テスト用クライアント(app_id は dummy を使用)"""
    return EStatClient(app_id="test-dummy-app-id")


# ──────────────────────────────────────────────
# getStatsList の正常系テスト
# ──────────────────────────────────────────────

@responses_mock.activate
def test_get_stats_list_success(client: EStatClient) -> None:
    """正常系: getStatsList がレスポンスを返す"""
    responses_mock.add(
        responses_mock.GET,
        f"{BASE_URL}/getStatsList",
        json={
            "GET_STATS_LIST": {
                "RESULT": {"STATUS": 0, "ERROR_MSG": "正常に終了しました。"},
                "PARAMETER": {"SEARCH_WORD": "人口", "LIMIT": 10},
                "DATALIST_INF": {
                    "NUMBER": 1,
                    "TABLE_INF": [
                        {
                            "@id": "0003411595",
                            "STAT_NAME": {"@code": "00200521", "$": "国勢調査"},
                            "TABLE_NAME": {"$": "男女別人口"},
                        }
                    ],
                },
            }
        },
        status=200,
    )
    result = client.get_stats_list(search_word="人口", limit=10)
    assert "GET_STATS_LIST" in result
    tables = result["GET_STATS_LIST"]["DATALIST_INF"]["TABLE_INF"]
    assert len(tables) == 1
    assert tables[0]["@id"] == "0003411595"


# ──────────────────────────────────────────────
# リトライのテスト
# ──────────────────────────────────────────────

@responses_mock.activate
def test_retry_on_503_raises_after_max_retries(client: EStatClient) -> None:
    """503 エラーが MAX_RETRIES 回続いた場合に EStatAPIError を送出する"""
    for _ in range(EStatClient.MAX_RETRIES):
        responses_mock.add(
            responses_mock.GET,
            f"{BASE_URL}/getStatsList",
            status=503,
        )
    with pytest.raises(EStatAPIError) as exc_info:
        client.get_stats_list()
    assert exc_info.value.status_code == 503


@responses_mock.activate
def test_retry_succeeds_on_second_attempt(client: EStatClient) -> None:
    """1回目が503、2回目が200の場合はリトライで成功する"""
    responses_mock.add(
        responses_mock.GET,
        f"{BASE_URL}/getStatsList",
        status=503,
    )
    responses_mock.add(
        responses_mock.GET,
        f"{BASE_URL}/getStatsList",
        json={
            "GET_STATS_LIST": {
                "RESULT": {"STATUS": 0, "ERROR_MSG": "正常に終了しました。"},
                "DATALIST_INF": {"NUMBER": 0, "TABLE_INF": []},
            }
        },
        status=200,
    )
    # sleep をスキップするため monkeypatch でも良いが、ここでは動作確認のみ
    result = client.get_stats_list()
    assert "GET_STATS_LIST" in result


@responses_mock.activate
def test_no_retry_on_400(client: EStatClient) -> None:
    """400 Bad Request はリトライ対象外のため、即座に EStatAPIError を送出する"""
    responses_mock.add(
        responses_mock.GET,
        f"{BASE_URL}/getStatsList",
        status=400,
    )
    with pytest.raises(EStatAPIError) as exc_info:
        client.get_stats_list()
    # リトライしないので呼び出し回数は1回
    assert len(responses_mock.calls) == 1
    assert exc_info.value.status_code == 400


# ──────────────────────────────────────────────
# API レベルエラーのテスト
# ──────────────────────────────────────────────

@responses_mock.activate
def test_api_level_error_detected(client: EStatClient) -> None:
    """HTTP 200 でも STATUS != 0 の場合に EStatAPIError を送出する"""
    responses_mock.add(
        responses_mock.GET,
        f"{BASE_URL}/getStatsList",
        json={
            "GET_STATS_LIST": {
                "RESULT": {
                    "STATUS": 100,
                    "ERROR_MSG": "APIキーが不正です。",
                }
            }
        },
        status=200,
    )
    with pytest.raises(EStatAPIError, match="API error"):
        client.get_stats_list()


@responses_mock.activate
def test_api_level_error_contains_message(client: EStatClient) -> None:
    """EStatAPIError のメッセージに ERROR_MSG の内容が含まれる"""
    responses_mock.add(
        responses_mock.GET,
        f"{BASE_URL}/getStatsList",
        json={
            "GET_STATS_LIST": {
                "RESULT": {
                    "STATUS": 200,
                    "ERROR_MSG": "指定された統計表IDが存在しません。",
                }
            }
        },
        status=200,
    )
    with pytest.raises(EStatAPIError) as exc_info:
        client.get_stats_list()
    assert "指定された統計表IDが存在しません" in str(exc_info.value)


# ──────────────────────────────────────────────
# health_check のテスト
# ──────────────────────────────────────────────

@responses_mock.activate
def test_health_check_returns_true_when_ok(client: EStatClient) -> None:
    """API 疎通OK の場合に health_check が True を返す"""
    responses_mock.add(
        responses_mock.GET,
        f"{BASE_URL}/getStatsList",
        json={
            "GET_STATS_LIST": {
                "RESULT": {"STATUS": 0, "ERROR_MSG": "正常に終了しました。"},
                "DATALIST_INF": {"NUMBER": 1, "TABLE_INF": [{"@id": "123"}]},
            }
        },
        status=200,
    )
    assert client.health_check() is True


@responses_mock.activate
def test_health_check_returns_false_on_error(client: EStatClient) -> None:
    """API エラー時に health_check が False を返す(例外を伝播させない)"""
    responses_mock.add(
        responses_mock.GET,
        f"{BASE_URL}/getStatsList",
        json={
            "GET_STATS_LIST": {
                "RESULT": {"STATUS": 100, "ERROR_MSG": "APIキーが不正です。"}
            }
        },
        status=200,
    )
    assert client.health_check() is False

テスト実行と coverage 確認

# テストを実行する
pytest tests/ -v

# カバレッジレポートを生成する
pytest tests/ --cov=src --cov-report=term-missing --cov-report=html

responses ライブラリのデコレータ @responses.activate を付けると、そのテスト関数内で requests が行うすべての HTTP 通信がモックに差し替えられます。responses.add() で登録した以外の URL へのリクエストは ConnectionError になるため、テスト内で意図しない外部通信が発生した場合もすぐに気づけます。


ページネーション対応

getStatsData は1リクエストで取得できる件数に上限があります(デフォルト limit=10000、最大 100000)。大規模な統計表(数十万件規模)を全件取得するには、startPosition でページネーションする必要があります。

TOTAL_NUMBER フィールドに総件数が入っているため、これを使って全件取得が完了したかどうかを判定できます。

def get_all_stats_data(
    self,
    stats_data_id: str,
    **kwargs: Any,
) -> list[dict]:
    """startPosition でページングして統計データを全件取得する。

    TOTAL_NUMBER に達するまでリクエストを繰り返す。
    取得件数が多い場合はリクエスト間に自動で 1 秒のスリープを挟む。

    Args:
        stats_data_id: 統計表 ID。
        **kwargs: get_stats_data に渡す追加パラメータ(cdCat01 等)。

    Returns:
        全 VALUE レコードのリスト。各要素は dict(@cat01, @area, @time, $ 等のキー)。

    Examples:
        >>> values = client.get_all_stats_data("0003411595")
        >>> df = pd.DataFrame(values)
        >>> df.rename(columns={"$": "value", "@area": "area", "@time": "time"})
    """
    all_values: list[dict] = []
    position = 1
    limit = 10000
    total: int | None = None

    while True:
        data = self.get_stats_data(
            stats_data_id,
            start_position=position,
            limit=limit,
            **kwargs,
        )
        statistical_data = (
            data.get("GET_STATS_DATA", {})
                .get("STATISTICAL_DATA", {})
        )

        # 初回リクエストで総件数を取得する
        if total is None:
            try:
                total = int(
                    statistical_data.get("TABLE_INF", {}).get("TOTAL_NUMBER", 0)
                )
            except (ValueError, TypeError):
                total = 0
            logger.info(
                "get_all_stats_data: stats_data_id=%s total=%d",
                stats_data_id,
                total,
            )

        values: list[dict] = (
            statistical_data.get("DATA_INF", {}).get("VALUE", [])
        )

        if not values:
            logger.info(
                "get_all_stats_data: no more values at position=%d", position
            )
            break

        all_values.extend(values)
        logger.debug(
            "get_all_stats_data: fetched %d/%d records", len(all_values), total
        )

        if total and len(all_values) >= total:
            break

        position += limit
        # レートリミット対策として1秒待機する
        time.sleep(1)

    return all_values

pandas DataFrame への変換

取得したレコードをそのまま pandas.DataFrame に渡すと、@cat01 @area @time $ がそのままカラム名になります。可読性のためにリネームしておくと便利です。

import pandas as pd
from src.collectors.estat.client import EStatClient

client = EStatClient()

# 国勢調査(統計表ID: 0003411595)の全件取得
values = client.get_all_stats_data("0003411595")

df = pd.DataFrame(values)
# @ プレフィックスを除去してカラム名を整理する
df = df.rename(columns={
    "$":      "value",
    "@cat01": "sex_code",
    "@area":  "area_code",
    "@time":  "time_code",
})
# 数値列を変換する("*" は統計値なしを示す文字列)
df["value"] = pd.to_numeric(df["value"], errors="coerce")

print(df.head())
#   sex_code area_code   time_code     value
# 0      001     01000  2020000000  5224614.0
# 1      002     01000  2020000000  2502594.0
# 2      003     01000  2020000000  2722020.0

環境設定と secrets 管理

.env.example

API キーや GCP の認証情報は .env ファイルで管理し、.gitignore に追加してリポジトリにコミットしないことが鉄則です。.env.example はリポジトリにコミットするテンプレートファイルで、値は空にしておきます。

# .env.example
# ─────────────────────────────────────────────
# e-Stat API 設定
# ─────────────────────────────────────────────
# e-Stat APIキーを設定する
# 取得先: https://www.e-stat.go.jp/api/
ESTAT_APP_ID=

# ─────────────────────────────────────────────
# GCP 設定(パイプライン使用時)
# ─────────────────────────────────────────────
GCS_BUCKET=
GCP_PROJECT=
# GOOGLE_APPLICATION_CREDENTIALS=path/to/service-account.json

環境変数の検証

python-dotenvload_dotenv().env ファイルを読み込むだけで、必須変数の存在確認はしません。本番環境では起動時に必須変数を検証するコードを追加しておくと安全です。

# src/config.py
import os
from dataclasses import dataclass


@dataclass(frozen=True)
class Config:
    """アプリケーション設定。起動時に環境変数を検証する。"""

    estat_app_id: str
    gcs_bucket: str
    gcp_project: str

    @classmethod
    def from_env(cls) -> "Config":
        """環境変数から設定を読み込む。必須変数が未設定の場合は ValueError を送出する。"""
        missing = []
        estat_app_id = os.environ.get("ESTAT_APP_ID", "")
        gcs_bucket = os.environ.get("GCS_BUCKET", "")
        gcp_project = os.environ.get("GCP_PROJECT", "")

        if not estat_app_id:
            missing.append("ESTAT_APP_ID")
        if not gcs_bucket:
            missing.append("GCS_BUCKET")
        if not gcp_project:
            missing.append("GCP_PROJECT")

        if missing:
            raise ValueError(
                f"必須環境変数が設定されていません: {', '.join(missing)}\n"
                ".env.example を参考に .env を作成してください。"
            )

        return cls(
            estat_app_id=estat_app_id,
            gcs_bucket=gcs_bucket,
            gcp_project=gcp_project,
        )

ロギングの設定

EStatClientlogging.getLogger(__name__) を使っているため、呼び出し側でロガーを設定するだけで詳細なログを有効化できます。

import logging

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(name)s %(levelname)s %(message)s",
)

# または特定のロガーだけ DEBUG にする
logging.getLogger("src.collectors.estat").setLevel(logging.DEBUG)

logger.debug()appId を渡さないようにしている点は、ログ管理ツール(Cloud Logging 等)にシークレットが流出するのを防ぐための重要な設計判断です。


まとめ

本記事では、e-Stat API の3エンドポイントを Python で型安全にラップするクライアントライブラリの設計と実装を解説しました。

設計上の重要ポイント

ポイント実装内容
HTTP 200 エラー対応_check_api_error() で RESULT.STATUS を検査する
Exponential backoff2 ** attempt 秒の待機でリトライする(最大3回)
秘密情報の保護appId はログに出力しない
型安全型ヒント・ABC による抽象インターフェースで設計する
テストresponses ライブラリで HTTP 通信をモックする
ページネーションTOTAL_NUMBER を参照して全件取得ループを制御する

e-Stat API は、正しくラッパーを作れば公的統計データを快適に利用できる優れたデータソースです。本記事で紹介した BaseCollector の抽象インターフェースを活用することで、気象庁 API や RESAS API など他の公的 API クライアントも同じパターンで横展開できます。

ページネーション対応の get_all_stats_data() と、分類コードを取得する get_meta_info() を組み合わせることで、分析に即使える構造化データを自動収集するパイプラインを構築できます。次のステップとして、取得したデータを GCS や BigQuery に書き出すストレージ層の追加を検討してみてください。

関連記事

F/ この記事の設計を反映しているプロダクト: FlowAgent

see →
an

analytics note — editor

AI とデータ分析の実装ログを毎週編集。設計判断と運用のつまずきを、再現できる形で残すことを大切にしています。