使用Python与BigQuery进行交互的代码详解
目录
- 选择合适的 python 库
- 安装库
- 运行查询
- 使用 GoogleSQL 语法
- 使用旧版 SQL 语法
- 使用 BigQuery Storage API 加速数据下载
- 配置查询
- 参数化查询
- 将 pandas DataFrame 加载到 BigQuery 表中
- pandas-gbq 的局限性
- 解决连接池错误
选择合适的 Python 库
在使用 BigQuery 时,您可以根据自己的需求选择以下三个 Python 库:
- BigQuery DataFrame:通过服务器端处理,支持 Pandas 和 Scikit-learn API,适合数据处理和机器学习任务。
- pandas-gbq:客户端库,用于在 Python 中读写 BigQuery 数据,适合简单的数据处理和分析。
- google-cloud-bigquery:Google 维护的库,提供完整的 BigQuery API 功能,适合复杂的数据管理和分析。
安装库
要使用这些库,您需要安装以下包:
pip install --upgrade pandas-gbq 'google-cloud-bigquery[bqstorage,pandas]'
运行查询
使用 GoogleSQL 语法
以下示例展示了如何使用 pandas-gbq
和 google-cloud-bigquery
运行 GoogleSQL 查询:
pandas-gbq
import pandas sql = """ SELECT name FROM `bigquery-public-data.usa_names.usa_1910_current` WHERE state = 'TX' LIMIT 100 """ # 使用标准 SQL 查询 df = pandas.read_gbq(sql, dialect="standard") # 指定项目 ID project_id = "your-project-id" df = pandas.read_gbq(sql, project_id=project_id, dialect="standard")
google-cloud-bigquery
from google.cloud import bigquery client = bigquery.Client() sql = """ SELECT name FROM `bigquery-public-data.usa_names.usa_1910_current` WHERE state = 'TX' LIMIT 100 """ # 使用标准 SQL 查询 df = cphplient.query(sql).to_dataframe() # 指定项目 ID project_id = "your-project-id" df = client.query(sql, project=project_id).to_dataframe()
使用旧版 SQL 语法
如果需要使用旧版 SQL 语法,可以通过以下方式进行:
pandas-gbq
import pandas sql = """ SELECT name FROM [bigquery-public-data:usa_names.usa_1910_current] WHERE state = 'TX' LIMIT 100 """ df = pandas.read_gbq(sql, dialect="legacy")
google-cloud-bigquery
from google.cloud import bigquery client = bigquery.Client() sql = """ SELECT name FROM [bigquery-public-data:usa_names.usa_1910_current] WHERE state = 'TX' LIMIT 100 """ query_config = bigquery.QueryJobConfig(use_legacy_sql=True) df = client.query(sql, job_config=query_config).to_dataframe()
使用 BigQuery Storage API 加速数据下载
BigQuery Storage API 可以显著提高大型结果的下载速度。以下示例展示了如何使用此 API:
pandas-gbq
import pandas sql = "SELECT * FROM `bigquery-public-data.irs_990.irs_990_2012`" # 使用 BigQue编程ry Storage API 加速下载 df = pandas.read_gbq(sql, dialect="standard", use_bqstorage_api=True)
google-cloud-bigquery
from google.cloud import bigquery client = bigquery.Client() sql = "SELECT * FROM `bigquery-public-data.irs_990.irs_990_2012`" # 如果 BigQuery Storage API 已启用,则自动使用 df = client.query(sql).to_dataframe()
配置查询
参数化查询
以下示例展示了如何使用参数化查询:
pandas-gbq
import pandas sql = """ SELECT name FROM `bigquery-public-data.usa_names.usa_1910_current` WHERE state = @state LIMIT @limit """ query_config = { "query": { "parameterMode": "NAMED", "queryParameters": [ { "name": "state", "parameterType": {"type": "STRING"}, "parameterValue": {"value": "TX"}, }, { "name": "limit", "parameterType": {"type": "INTEGER"}, "parameterValue": {"value": 100}, }, ], } } df = pandas.read_gbq(sql, configuration=query_config)
google-cloud-bigquery
from google.cloud import bigquery client = biphpgquery.Client() sql = """ SELECT name FROM `bigquery-public-data.usa_names.usa_1910_current` WHERE state = @state LIMIT @limit """ query_config = bigquery.QueryJobConfig( query_parameters=[ bigquery.ScalarQueryParameter("state", "STRING", "TX"), bigquery.ScalarQueryParameter("limit", "INTEGER", 100), ] ) df = client.query(sql, job_config=query_config).to_dataframe()
将 pandas DataFrame 加载到 BigQuery 表中
以下示例展示了如何将 pandas DataFrame 加载到 BigQuery 表中:
pandas-gbq
import pandas df = pandas.DataFrame( { "my_string": ["a", "b", "c"], "my_int64": [1, 2, 3], "my_float64": [4.0, 5.0, 6.0], "my_timestamp": [ pandas.Timestamp("1998-09-04T16:03:14"), pandas.Timestamp("2010-09-13T12:03:45"), python pandas.Timestamp("2015-10-02T16:00:00"), ], } ) table_id = "my_dataset.new_table" df.to_gbq(table_id)
google-cloud-bigquery
from google.cloud import bigquery import pandas df = pandas.DataFrame( { "my_string": ["a", "b", "c"], "my_int64": [1, 2, 3], "my_float64": [4.0, 5.0, 6.0], "my_timestamp": [ pandas.Timestamp("1998-09-04T16:03:14"), pandas.Timestamp("2010-09-13T12:03:45"), pandas.Timestamp("2015-10-02T16:00:00"), ], } ) client = bigquery.Client() table_id = "my_dataset.new_table" # 确保正确的数据类型 job_config = bigquery.LoadJobConfig( schema=[ bigquery.SchemaField("my_string", "STRING"), ] ) job = client.load_table_from_dataframe(df, table_id, job_config=job_config) # 等待加载完成 job.result()
pandas-gbq 的局限性
- 数据集管理:不支持创建、更新或删除数据集。
- 数据格式支持:仅支持 CSV 格式,不支持嵌套值或数组值。
- 表管理:不支持列出表、复制表或删除表。
- 数据导出:不支持直接导出数据到 Cloud Storage。
解决连接池错误
如果遇到连接池错误,可以通过以下方式增加连接池大小:
import requests client = bigquery.Client() adapter = requests.adapters.HTTPAdapter(pool_connections=128, pool_maxsize=128, max_retries=3) client._http.mount("https://", adapter) client._http._auth_request.sessio编程客栈n.mount("https://", adapter)
以上就是使用Python与BigQuery进行交互的代码详解的详细内容,更多关于Python与BigQuery交互的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论