Analysis/Big Data & Machine Learning
[Dataproc] Pyspark Job with Airrflow (Composer): Get Data From MySQL
sonny.kim
2021. 11. 1. 12:31
UBUNTU CLIENT CONFIGURATION
export TEMPLATE_ID=workflow-mytest
export WORK_CLUSTER_NAME=cluster-mytest
export REGION=asia-northeast3
export BUCKET_NAME=jay-pyspark-mytest #airflow task dag에서도 필요
export PROJECT_ID=<GCP_PROJECT_ID> #airflow task dag에서도 필요
export PYTHON_FILE=pyspark-job.py
export STEP_ID=first_step #Some name like "Get Data"
PYTHON CODE
$vi pywork.py
import pymysql
import sys
import pandas as pd
from datetime import datetime, timedelta
DB_NAME = <DB_NAME>
DB_HOST = <DB_HOST>
DB_USER = <DB_USER>
DB_PASSWORD = <DB_PASSWORD>
DB_PORT = <DB_PORT>
YESTERDAY = datetime.strftime((datetime.now() - timedelta(1)), '%Y-%m-%d')
PATH = "gs://스토리지이름/" + YESTERDAY + "/"
def get_mysql_conn():
try:
conn = pymysql.connect(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, port=DB_PORT)
print('Connection success')
except Exception as e:
print(e)
sys.exit()
return conn
def close_sql_conn(conn):
conn.close()
print("MYSQL CONN CLOSED")
# information_schema 에서 해당 database의 테이블명과 컬럼명 수집
def get_cols(conn):
#for mysql
query = """SELECT table_name as TABLE_NAME, GROUP_CONCAT(CONCAT('`', column_name, '`')) AS COLUMNS FROM information_schema.columns
WHERE table_schema = 'classicmodels' GROUP BY table_name;"""
col_data = pd.read_sql(query, conn)
#for sql server
query="""SELECT TABLE_SCHEMA, TABLE_NAME,
COLUMNS = STUFF(( SELECT ',[' + n.COLUMN_NAME + ']' FROM AdventureWorks2019.INFORMATION_SCHEMA.columns n WHERE m.TABLE_SCHEMA = n.TABLE_SCHEMA AND m.TABLE_NAME = n.TABLE_NAME FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 1, '')
FROM AdventureWorks2019.INFORMATION_SCHEMA.columns m WHERE m.TABLE_SCHEMA != 'dbo' group by TABLE_SCHEMA, TABLE_NAME ORDER BY m.TABLE_SCHEMA"""
return col_data
#어제 날짜의 데이터 추출
def get_ydate_data(conn, table_name, columns):
query = """SELECT """ + columns + """ FROM """ + DB_NAME + """.""" + table_name + """;"""
df_ydate_data = pd.read_sql(query, conn)
#df_ydate_data.to_csv(PATH + DB_NAME + """_""" + table_name + ".csv", index = False)
#dataframe 을 parquet 포맷으로 GCS에 저장
df_ydate_data.to_parquet(PATH + DB_NAME + """_""" + table_name, engine="pyarrow", compression="gzip", index = False)
if __name__ == "__main__":
conn = get_mysql_conn()
col_data = get_cols(conn)
#가져온 테이블명과 컬럼명으로 loop
for idx, rowin col_data.iterrows():
try:
get_ydate_data(conn, row["TABLE_NAME"], row["COLUMNS"])
except Exception as e:
print(e)
sys.exit()
close_sql_conn(conn)
GCS 에 PYTHON CODE 저장 (Bucket 은 미리 생성)
gsutil cp pywork.py gs://$BUCKET_NAME/
WORKFLOW TEMPLATE 생성
gcloud dataproc workflow-templates create $TEMPLATE_ID \
--region=$REGION
JOB 실행을 위한 MANAGED CLUSTER 생성
gcloud beta dataproc workflow-templates set-managed-cluster $TEMPLATE_ID \
--region $REGION \
--cluster-name $WORK_CLUSTER_NAME \
--metadata 'PIP_PACKAGES=cx-Oracle numpy pandas PyMySQL pyarrow' \
--image-version 1.4
gcloud beta dataproc workflow-templates set-managed-cluster $TEMPLATE_ID \
--region $REGION \
--cluster-name $WORK_CLUSTER_NAME \
--image-version 1.4 \
--metadata 'PIP_PACKAGES=cx-Oracle numpy pandas PyMySQL PyMSSQL pyarrow Cython fsspec gcsfs' \
--initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/python/pip-install.sh
WORKFLOW TEMPLATE 에 JOB 연결
gcloud dataproc workflow-templates add-job pyspark \
gs://$BUCKET_NAME/$PYTHON_FILE \
--step-id $STEP_ID \
--workflow-template $TEMPLATE_ID \
--region $REGION \
--properties spark.jars.packages='org.apache.spark:spark-avro_2.11:2.4.0'
COMPOSER 생성하고 환경설정
Airflow Web UI -> Admin -> Variables -> Create
Key: project_id
Val: 프로젝트 ID
DAG 작성 후 COMPOSER의 BUCKET에 업로드
"""Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
Spark Pi Job.
This DAG relies on an Airflow variable
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
"""
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import DataprocInstantiateWorkflowTemplateOperator
from airflow.utils.dates import days_ago
project_id = models.Variable.get("project_id")
TEMPLATE_ID = "위에서 생성한 워크플로우 템플릿 ID"
default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
"start_date": days_ago(1),
"project_id": project_id,
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
# The id you will see in the DAG airflow page
"dataproc_workflow_dag",
default_args=default_args,
# The interval with which to schedule the DAG
schedule_interval=datetime.timedelta(days=1), # Override to match your needs
) as dag:
start_template_job = DataprocInstantiateWorkflowTemplateOperator(
# The task id of your job
task_id=<워크플로우의 단계 (태스크)>,
# The template id of your workflow
template_id=<워크플로우의 ID>,
project_id=project_id,
# The region for the template
region="us-central1",
)
gsutil cp composer-dataproc-dag.py gs://<AIRFLOW_BUCKET>/dags/
* 참고: Dataframe to Big Query
from google.cloud import bigquery
import pandas
df = pandas.DataFrame(
{
'my_string': ['a', 'b', 'c'],
'my_int64': [1, 2, 3],
'my_float64': [4.0, 5.0, 6.0],
'my_timestamp': [
pandas.Timestamp("1998-09-04T16:03:14"),
pandas.Timestamp("2010-09-13T12:03:45"),
pandas.Timestamp("2015-10-02T16:00:00")
],
}
)
client = bigquery.Client()
table_id = 'my_dataset.new_table'
# Since string columns use the "object" dtype, pass in a (partial) schema
# to ensure the correct BigQuery data type.
job_config = bigquery.LoadJobConfig(schema=[
bigquery.SchemaField("my_string", "STRING"),
])
job = client.load_table_from_dataframe(
df, table_id, job_config=job_config
)
# Wait for the load job to complete. (I omit this step)
# job.result()