0. Google BigQuery
빅쿼리는 확장성이 뛰어나고 가격 대비 성능이 좋은 기업용 서버리스 데이터 웨어하우스 입니다.
객체 스토리지(메타데이터와 고유의 식별자를 통해 객체로 저장), 스프레드시트의 데이터, 관리형 열 형식 스토리지(분석 앱에서 데이터 열을 빠르게 검색하는데 최적화) 를 통해 논리적 데이터 웨어하우스를 생성하여 모든 배치와 연속적으로 생성되는 스트리밍 데이터를 분석합니다.
공식 페이지에서 장점이나 기능, 그리고 사용 사례 등 궁금해할 수 있는 전반적 내용 확인이 가능합니다.
소개되어있는 장점 중 두드러지는 점은 일단 빅쿼리를 사용하면 초기 셋팅이나 확인하고자 하는 정보를 처리하는 데에 소요 시간이 짧아서 구축/활용의 사이클을 앞당긴다는 점 입니다. 특히 페타바이트 급(페타바이트라고 하면 2012년 기준으로 유투브에서 한달동안 저장하는 데이터)의 데이터를 SQL 쿼리를 통해 확인할 수 있으며, SQL 쿼리를 실행하여 7 테라 바이트(테이블에서 천억개의 행)를 30초 이내에 처리할 수 있다고 합니다. 이에 스트리밍 데이터를 수집하고 통계 정보로 시각화하는 것이 용이합니다.
이러한 특징은 우리가 가진 데이터 그리고 앞으로 처리해야 할 실시간 데이터를 적재하고 분석하는 데에 있어 가장 큰 장점이 아닌가 합니다. 데이터 로드, 변환, 시각화 도구가 빅쿼리와 통합되기 때문에 앞으로 비즈스프링에서 제공할 플랫폼을 구현하는 데에 효율적으로 운영이 가능하다고 생각됩니다.
1. Python + BigQuery
from google.cloud import bigquery
# Create a "Client" object
client = bigquery.Client()
# Construct a reference to the "hacker_news" dataset
dataset_ref = client.dataset("hacker_news", project="bigquery-public-data")
# API request - fetch the dataset
dataset = client.get_dataset(dataset_ref)
# List all the tables in the "hacker_news" dataset
tables = list(client.list_tables(dataset))
# Print names of all tables in the dataset (there are four!)
for table in tables:
print(table.table_id)
# Construct a reference to the "full" table
table_ref = dataset_ref.table("full")
# API request - fetch the table
table = client.get_table(table_ref)
# Print information on all the columns in the "full" table in the "hacker_news" dataset
table.schema
# Preview the first five lines of the "full" table
client.list_rows(table, max_results=5).to_dataframe()
# Preview the first five entries in the "by" column of the "full" table
client.list_rows(table, selected_fields=table.schema[:1], max_results=5).to_dataframe()
2. SELECT, FROM & WHERE
from google.cloud import bigquery
# Create a "Client" object
client = bigquery.Client()
# Construct a reference to the "openaq" dataset
dataset_ref = client.dataset("openaq", project="bigquery-public-data")
# API request - fetch the dataset
dataset = client.get_dataset(dataset_ref)
# List all the tables in the "openaq" dataset
tables = list(client.list_tables(dataset))
# Print names of all tables in the dataset (there's only one!)
for table in tables:
print(table.table_id)
## global_air_quality
# Construct a reference to the "global_air_quality" table
table_ref = dataset_ref.table("global_air_quality")
# API request - fetch the table
table = client.get_table(table_ref)
# Preview the first five lines of the "global_air_quality" table
client.list_rows(table, max_results=5).to_dataframe()
# Query to select all the items from the "city" column where the "country" column is 'US'
query = """
SELECT city
FROM `bigquery-public-data.openaq.global_air_quality`
WHERE country = 'US'
"""
query = """
SELECT *
FROM `bigquery-public-data.openaq.global_air_quality`
WHERE country = 'US'
"""
# Query to get the score column from every row where the type column has value "job"
query = """
SELECT score, title
FROM `bigquery-public-data.hacker_news.full`
WHERE type = "job"
"""
# Create a QueryJobConfig object to estimate size of query without running it
dry_run_config = bigquery.QueryJobConfig(dry_run=True)
# API request - dry run query to estimate costs
dry_run_query_job = client.query(query, job_config=dry_run_config)
print("This query will process {} bytes.".format(dry_run_query_job.total_bytes_processed))
## This query will process 429036722 bytes.
# Only run the query if it's less than 1 MB
ONE_MB = 1000*1000
safe_config = bigquery.QueryJobConfig(maximum_bytes_billed=ONE_MB)
# Set up the query (will only run if it's less than 1 MB)
safe_query_job = client.query(query, job_config=safe_config)
# API request - try to run the query, and return a pandas DataFrame
safe_query_job.to_dataframe()
## ERROR!
# Only run the query if it's less than 1 GB
ONE_GB = 1000*1000*1000
safe_config = bigquery.QueryJobConfig(maximum_bytes_billed=ONE_GB)
# Set up the query (will only run if it's less than 1 GB)
safe_query_job = client.query(query, job_config=safe_config)
# API request - try to run the query, and return a pandas DataFrame
job_post_scores = safe_query_job.to_dataframe()
# Print average score for job posts
job_post_scores.score.mean()
## 1.874250078939059
3. GROUP BY, HAVING & COUNT
GROUP BY takes the name of one or more columns, and treats all rows with the same value in that column as a single group when you apply aggregate functions like COUNT().
HAVING is used in combination with GROUP BY to ignore groups that don't meet certain criteria.
from google.cloud import bigquery
# Create a "Client" object
client = bigquery.Client()
# Construct a reference to the "hacker_news" dataset
dataset_ref = client.dataset("hacker_news", project="bigquery-public-data")
# API request - fetch the dataset
dataset = client.get_dataset(dataset_ref)
# Construct a reference to the "comments" table
table_ref = dataset_ref.table("comments")
# API request - fetch the table
table = client.get_table(table_ref)
# Preview the first five lines of the "comments" table
client.list_rows(table, max_results=5).to_dataframe()
# Improved version of earlier query, now with aliasing & improved readability
query_improved = """
SELECT parent, COUNT(1) AS NumPosts
FROM `bigquery-public-data.hacker_news.comments`
GROUP BY parent
HAVING COUNT(1) > 10
"""
safe_config = bigquery.QueryJobConfig(maximum_bytes_billed=10**10)
query_job = client.query(query_improved, job_config=safe_config)
# API request - run the query, and convert the results to a pandas DataFrame
improved_df = query_job.to_dataframe()
# Print the first five rows of the DataFrame
improved_df.head()
4. ORDER BY
ORDER BY is usually the last clause in your query, and it sorts the results returned by the rest of your query.
from google.cloud import bigquery
# Create a "Client" object
client = bigquery.Client()
# Construct a reference to the "nhtsa_traffic_fatalities" dataset
dataset_ref = client.dataset("nhtsa_traffic_fatalities", project="bigquery-public-data")
# API request - fetch the dataset
dataset = client.get_dataset(dataset_ref)
# Construct a reference to the "accident_2015" table
table_ref = dataset_ref.table("accident_2015")
# API request - fetch the table
table = client.get_table(table_ref)
# Preview the first five lines of the "accident_2015" table
client.list_rows(table, max_results=5).to_dataframe()
# Query to find out the number of accidents for each day of the week
query = """
SELECT COUNT(consecutive_number) AS num_accidents,
EXTRACT(DAYOFWEEK FROM timestamp_of_crash) AS day_of_week
FROM `bigquery-public-data.nhtsa_traffic_fatalities.accident_2015`
GROUP BY day_of_week
ORDER BY num_accidents DESC
"""
# Set up the query (cancel the query if it would use too much of
# your quota, with the limit set to 1 GB)
safe_config = bigquery.QueryJobConfig(maximum_bytes_billed=10**9)
query_job = client.query(query, job_config=safe_config)
# API request - run the query, and convert the results to a pandas DataFrame
accidents_by_day = query_job.to_dataframe()
# Print the DataFrame
accidents_by_day
5. AS & WITH
On its own, AS is a convenient way to clean up the data returned by your query. It's even more powerful when combined with WITH in what's called a "common table expression".
A common table expression (or CTE) is a temporary table that you return within your query. CTEs are helpful for splitting your queries into readable chunks, and you can write queries against them.
from google.cloud import bigquery
# Create a "Client" object
client = bigquery.Client()
# Construct a reference to the "crypto_bitcoin" dataset
dataset_ref = client.dataset("crypto_bitcoin", project="bigquery-public-data")
# API request - fetch the dataset
dataset = client.get_dataset(dataset_ref)
# Construct a reference to the "transactions" table
table_ref = dataset_ref.table("transactions")
# API request - fetch the table
table = client.get_table(table_ref)
# Preview the first five lines of the "transactions" table
client.list_rows(table, max_results=5).to_dataframe()
# Query to select the number of transactions per date, sorted by date
query_with_CTE = """
WITH time AS
(
SELECT DATE(block_timestamp) AS trans_date
FROM `bigquery-public-data.crypto_bitcoin.transactions`
)
SELECT COUNT(1) AS transactions,
trans_date
FROM time
GROUP BY trans_date
ORDER BY trans_date
"""
# Set up the query (cancel the query if it would use too much of
# your quota, with the limit set to 10 GB)
safe_config = bigquery.QueryJobConfig(maximum_bytes_billed=10**10)
query_job = client.query(query_with_CTE, job_config=safe_config)
# API request - run the query, and convert the results to a pandas DataFrame
transactions_by_date = query_job.to_dataframe()
# Print the first five rows
transactions_by_date.head()
transactions_by_date.set_index('trans_date').plot()
6. JOIN
from google.cloud import bigquery
# Create a "Client" object
client = bigquery.Client()
# Construct a reference to the "github_repos" dataset
dataset_ref = client.dataset("github_repos", project="bigquery-public-data")
# API request - fetch the dataset
dataset = client.get_dataset(dataset_ref)
# Construct a reference to the "licenses" table
licenses_ref = dataset_ref.table("licenses")
# API request - fetch the table
licenses_table = client.get_table(licenses_ref)
# Preview the first five lines of the "licenses" table
client.list_rows(licenses_table, max_results=5).to_dataframe()
# Construct a reference to the "sample_files" table
files_ref = dataset_ref.table("sample_files")
# API request - fetch the table
files_table = client.get_table(files_ref)
# Preview the first five lines of the "sample_files" table
client.list_rows(files_table, max_results=5).to_dataframe()
# Query to determine the number of files per license, sorted by number of files
query = """
SELECT L.license, COUNT(1) AS number_of_files
FROM `bigquery-public-data.github_repos.sample_files` AS sf
INNER JOIN `bigquery-public-data.github_repos.licenses` AS L
ON sf.repo_name = L.repo_name
GROUP BY L.license
ORDER BY number_of_files DESC
"""
# Set up the query (cancel the query if it would use too much of
# your quota, with the limit set to 10 GB)
safe_config = bigquery.QueryJobConfig(maximum_bytes_billed=10**10)
query_job = client.query(query, job_config=safe_config)
# API request - run the query, and convert the results to a pandas DataFrame
file_count_by_license = query_job.to_dataframe()
# Print the DataFrame
file_count_by_license
'Data Science, ML > Kaggle' 카테고리의 다른 글
[Kaggle] Data Cleaning 데이터 정제 (0) | 2021.05.24 |
---|