Analysis/Big Data & Machine Learning

[Dataproc] Pyspark Job with Airrflow (Composer): Get Data From MySQL

sonny.kim 2021. 11. 1. 12:31

UBUNTU CLIENT CONFIGURATION

export TEMPLATE_ID=workflow-mytest
export WORK_CLUSTER_NAME=cluster-mytest
export REGION=asia-northeast3
export BUCKET_NAME=jay-pyspark-mytest #airflow task dag에서도 필요
export PROJECT_ID=<GCP_PROJECT_ID> #airflow task dag에서도 필요
export PYTHON_FILE=pyspark-job.py
export STEP_ID=first_step #Some name like "Get Data"

PYTHON CODE

$vi pywork.py


import pymysql
import sys
import pandas as pd
from datetime import datetime, timedelta

DB_NAME = <DB_NAME>
DB_HOST = <DB_HOST>
DB_USER = <DB_USER>
DB_PASSWORD = <DB_PASSWORD>
DB_PORT = <DB_PORT>
YESTERDAY = datetime.strftime((datetime.now() - timedelta(1)), '%Y-%m-%d')
PATH = "gs://스토리지이름/" + YESTERDAY + "/"


def get_mysql_conn():
    try:
        conn = pymysql.connect(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, port=DB_PORT)
        print('Connection success')
    except Exception as e:
        print(e)
        sys.exit()
    return conn

def close_sql_conn(conn):
    conn.close()
    print("MYSQL CONN CLOSED")

# information_schema 에서 해당 database의 테이블명과 컬럼명 수집
def get_cols(conn):
		#for mysql
    query = """SELECT table_name as TABLE_NAME, GROUP_CONCAT(CONCAT('`', column_name, '`')) AS COLUMNS FROM information_schema.columns
            WHERE table_schema = 'classicmodels'  GROUP BY table_name;"""
    col_data = pd.read_sql(query, conn)

		#for sql server
		query="""SELECT TABLE_SCHEMA, TABLE_NAME,
		        COLUMNS = STUFF(( SELECT ',[' + n.COLUMN_NAME + ']' FROM AdventureWorks2019.INFORMATION_SCHEMA.columns n WHERE m.TABLE_SCHEMA = n.TABLE_SCHEMA AND m.TABLE_NAME = n.TABLE_NAME FOR XML PATH(''), TYPE).value('.', 'NVARCHAR(MAX)'), 1, 1, '')
		        FROM AdventureWorks2019.INFORMATION_SCHEMA.columns m WHERE m.TABLE_SCHEMA != 'dbo' group by TABLE_SCHEMA, TABLE_NAME ORDER BY m.TABLE_SCHEMA"""
	    return col_data

#어제 날짜의 데이터 추출
def get_ydate_data(conn, table_name, columns):
    query = """SELECT """ + columns + """ FROM """ + DB_NAME + """.""" + table_name  + """;"""
    df_ydate_data = pd.read_sql(query, conn)
#df_ydate_data.to_csv(PATH + DB_NAME + """_""" + table_name + ".csv", index = False)
		#dataframe 을 parquet 포맷으로 GCS에 저장
    df_ydate_data.to_parquet(PATH + DB_NAME + """_""" + table_name, engine="pyarrow", compression="gzip", index = False)

if __name__ == "__main__":
    conn = get_mysql_conn()
    col_data = get_cols(conn)

#가져온 테이블명과 컬럼명으로 loop
for idx, rowin col_data.iterrows():
        try:
            get_ydate_data(conn, row["TABLE_NAME"], row["COLUMNS"])
        except Exception as e:
            print(e)
            sys.exit()

    close_sql_conn(conn)

GCS 에 PYTHON CODE 저장 (Bucket 은 미리 생성)

gsutil cp pywork.py gs://$BUCKET_NAME/

WORKFLOW TEMPLATE 생성

gcloud dataproc workflow-templates create $TEMPLATE_ID \
--region=$REGION

JOB 실행을 위한 MANAGED CLUSTER 생성

gcloud beta dataproc workflow-templates set-managed-cluster $TEMPLATE_ID \
--region $REGION \
--cluster-name $WORK_CLUSTER_NAME \
--metadata 'PIP_PACKAGES=cx-Oracle numpy pandas PyMySQL pyarrow' \
--image-version 1.4

gcloud beta dataproc workflow-templates set-managed-cluster $TEMPLATE_ID \
--region $REGION \
--cluster-name $WORK_CLUSTER_NAME \
--image-version 1.4 \
--metadata 'PIP_PACKAGES=cx-Oracle numpy pandas PyMySQL PyMSSQL pyarrow Cython fsspec gcsfs' \
--initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/python/pip-install.sh

WORKFLOW TEMPLATE 에 JOB 연결

gcloud dataproc workflow-templates add-job pyspark \
gs://$BUCKET_NAME/$PYTHON_FILE \
--step-id $STEP_ID \
--workflow-template $TEMPLATE_ID \
--region $REGION \
--properties spark.jars.packages='org.apache.spark:spark-avro_2.11:2.4.0'

COMPOSER 생성하고 환경설정

Airflow Web UI -> Admin -> Variables -> Create
Key: project_id
Val: 프로젝트 ID

DAG 작성 후 COMPOSER의 BUCKET에 업로드

"""Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
Spark Pi Job.

This DAG relies on an Airflow variable
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import DataprocInstantiateWorkflowTemplateOperator
from airflow.utils.dates import days_ago

project_id = models.Variable.get("project_id")
TEMPLATE_ID = "위에서 생성한 워크플로우 템플릿 ID"
default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": project_id,
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "dataproc_workflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:

    start_template_job = DataprocInstantiateWorkflowTemplateOperator(
        # The task id of your job
        task_id=<워크플로우의 단계 (태스크)>,
        # The template id of your workflow
        template_id=<워크플로우의 ID>,
        project_id=project_id,
        # The region for the template
        region="us-central1",
    )
gsutil cp composer-dataproc-dag.py gs://<AIRFLOW_BUCKET>/dags/

* 참고: Dataframe to Big Query

from google.cloud import bigquery
import pandas

df = pandas.DataFrame(
    {
        'my_string': ['a', 'b', 'c'],
        'my_int64': [1, 2, 3],
        'my_float64': [4.0, 5.0, 6.0],
        'my_timestamp': [
            pandas.Timestamp("1998-09-04T16:03:14"),
            pandas.Timestamp("2010-09-13T12:03:45"),
            pandas.Timestamp("2015-10-02T16:00:00")
        ],
    }
)

client = bigquery.Client()
table_id = 'my_dataset.new_table'

# Since string columns use the "object" dtype, pass in a (partial) schema
# to ensure the correct BigQuery data type.
job_config = bigquery.LoadJobConfig(schema=[
    bigquery.SchemaField("my_string", "STRING"),
])

job = client.load_table_from_dataframe(
    df, table_id, job_config=job_config
)

# Wait for the load job to complete. (I omit this step)
# job.result()