【Python】ローカルPythonからBigQueryを操作する

GCP

BigQueryはGoogle Cloud Platform(GCP)で提供されるデータウェアハウスサービスです。GCPでは各種サービスにAPIが用意されており、外部システム、アプリケーションと連携することができます。

今回はローカルのPython環境からBigQueryにAPI連携する方法を紹介します。

Python側の準備

Pythonパッケージのインストール

PythonからBigQueryに接続するためにgoogle-cloud-bigqueryライブラリをインストールします。

pip、Anacondaでのインストールコマンドは以下の通りです。

//pip
pip install google-cloud-bigquery

//conda
conda install -c conda-forge google-cloud-bigquery

GCP側の準備

Cloud Platform Projectの作成

APIで連携する先のプロジェクトをGCP上に作成しておく必要があります。作成方法については以下記事を参照ください。

認証設定

外部からAPI連携する際には認証設定を行う必要があります。以下にそのステップを紹介します。

1. APIアクセスの有効化

まずはAPIアクセスを有効化します。Google Cloudにアクセスし、左側のメニューバーから[APIとサービス] > [有効なAPIとサービス]を選択します。

有効なAPIとサービス

続いて、[APIとサービスの有効化]を選択します。

APIとサービスの有効化

するとAPIライブラリの画面が表示されます。今回はBigQueryのAPIを有効化したいので、検索窓からBigQuery APIを検索します。

BigQuery API

BigQuery APIの画面から「有効にする」ボタンを押します。これでAPIが有効化されます。

「有効にする」を押す

2. サービスアカウントの作成

続いて、APIアクセスのためのサービスアカウントを作成します。左側のメニューバーから[APIとサービス] > [認証情報]を選択します。

左側のメニューバーから[APIとサービス] > [認証情報]を選択

画面上部の「認証情報を作成」から「サービスアカウント」を選択します。

サービスアカウントを作成

アカウント作成画面に移動するので、作成するアカウント名を設定します。アカウントIDはデフォルトでアカウント名に合わせて表示されますが、任意変更も可能です。

サービスアカウントのロール等設定することも可能ですが、こちらは任意設定項目です。必要に応じてロールを指定するようにしてください。

今回はBigQuery管理者ロールを割り当てます。設定したら下部の「完了」ボタンを押下します。これにてサービスアカウントの作成は完了です。

BigQuery管理者ロールを割り当てる

3. アカウントキーの発行

サービスアカウントを作成すると、認証情報画面でアカウントが作成されたことを確認できます。最後に、アカウントの認証キーを発行します。このキーを元にクライアントの認証を行います。

認証情報画面から先ほど作成したアカウントを選択します。

サービスアカウントを選択

「キー」タブに移動し、[鍵を追加] > [新しい鍵を作成]を選択します。

「キー」タブに移動し、[鍵を追加] > [新しい鍵を作成]を選択

キーのタイプとしてJSONまたはP12を選択できますが、今回はJSON形式を指定します。すると、ファイルがダウンロードされます。このファイルがクライアント側の認証情報になります。無くしたり、流出したりしないように保管してください。

※認証情報が流出して悪用されると、大量課金等トラブルになりかねません。ご注意ください

ここまでで事前設定は完了です。お疲れさまでした。

Pythonからの接続・操作

ようやくここからPythonでコーディングしていきます。今回はBigQuery連携してデータを取得し、結果をPandasデータフレームに格納します。

まずはコード全体です。

from google.cloud import bigquery
from google.oauth2 import service_account

import pandas as pd


# ダウンロードした認証ファイル(.json)のフルパスを指定
key_path = "Path/to/secret_key.json"
credentials = service_account.Credentials.from_service_account_file(key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"])
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

#実行クエリ
query = """
        -- This query shows a list of the daily top Google Search terms.
        -- These search terms are in the top 25 in the US each day.
        SELECT
            refresh_date AS Day
            , term AS Top_Term                          
            , rank 
        FROM
            `bigquery-public-data.google_trends.top_terms` 
        WHERE
            -- Choose only the top term each day.
            rank = 1                                
            -- Filter to the last 2 weeks.
            AND refresh_date >= DATE_SUB(CURRENT_DATE (), INTERVAL 2 WEEK) 
        GROUP BY
            Day
            , Top_Term
            , rank 
        ORDER BY
            Day DESC                                    
    """

#ジョブ実行
query_job = client.query(query)  # Make an API request.

#データフレームに格納
df = (query_job
      .result()
      .to_dataframe()
    )

パッケージのインポート

今回はgoogle.cloudのbigqueryと、認証メソッドとしてgooogle.oauth2のservice_accountをインポートします。データフレームを扱うことから、pandasもインポートしておきます。

from google.cloud import bigquery
from google.oauth2 import service_account

import pandas as pd

接続設定

続いてBigQueryとの接続オブジェクトを作成します。

key_pathには先ほどダウンロードした認証情報のJSONファイルを指定します。

# ダウンロードした認証ファイル(.json)のフルパスを指定
key_path = "Path/to/secret_key.json"
credentials = service_account.Credentials.from_service_account_file(key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"])
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

クエリの実行

クエリを作成し、実行します。

クエリは文字列で作成します。基本的にはSQLと同じ構文です。テーブルにはテーブルIDを指定します。テーブルIDはBigQueryのSQLワークスペースで各テーブルの[詳細]から確認できます。

テーブルIDの確認方法

client.query(クエリ)でクエリを実行し、結果がclientに格納されます。

#実行クエリ
query = """
        -- This query shows a list of the daily top Google Search terms.
        -- These search terms are in the top 25 in the US each day.
        SELECT
            refresh_date AS Day
            , term AS Top_Term                          
            , rank 
        FROM
            `bigquery-public-data.google_trends.top_terms` 
        WHERE
            -- Choose only the top term each day.
            rank = 1                                
            -- Filter to the last 2 weeks.
            AND refresh_date >= DATE_SUB(CURRENT_DATE (), INTERVAL 2 WEEK) 
        GROUP BY
            Day
            , Top_Term
            , rank 
        ORDER BY
            Day DESC                                    
    """

#ジョブ実行
query_job = client.query(query)  # Make an API request.

結果のデータフレーム化

最後に実行結果をデータフレームに変換します。先ほど実行結果を格納したquery_jobからresult()メソッドで結果セットを取得できます。さらに、to_dataframe()とすることでデータフレームに変換することができます。

#データフレームに格納
df = (query_job
      .result()
      .to_dataframe()
    )
dfの中身

まとめ

ローカルPython環境からBigQueryに接続してクエリを実行する方法を紹介しました。APIの認証設定等、実際のアプリケーション運用ではもう少し細かく設定が必要になりますが、大きな流れは本記事で紹介した通りです。

クラウドでビッグデータを管理し、ローカルシステムと連携するようなケースはこれからもっと増えていくと思います。利用される場合は、本記事が参考になれば幸いです。

ではでは👋