Analysis/Big Data & Machine Learning

[GCP] Airflow + Python (Importing from mysql)

sonny.kim 2021. 11. 11. 13:21

UBUNTU CLIENT CONFIGURATION (리눅스에서 코딩 및 GCP 컨트롤 진행)

#gsutil 설치
<https://cloud.google.com/storage/docs/gsutil_install>

gcloud auth activate-service-account --key-file=<미리만든 ACCOUNT KEY> (권한: dataproc editor, storage object creater)
or
gcloud init 

export TEMPLATE_ID=
export WORK_CLUSTER_NAME=
export REGION=
export BUCKET_NAME=
export PROJECT_ID=
export STEP_ID=
export PYTHON_FILE=main.py

PYTHON CODE

$vi pywork.py

import pymysql
import sys
import pandas as pd
from datetime import datetime, timedelta
from google.cloud import storage
from google.cloud import bigquery
from google.oauth2 import service_account
import time

KEY_FILE = <service account key file> #needed to load parquet to BQ dataset
DB_NAME = <source database name>
DB_HOST = <source mysql host ip/name>
DB_USER = <db user name>
DB_PASSWORD = <db password>
DB_PORT = <db_port>
YESTERDAY = datetime.strftime((datetime.now() - timedelta(1)), '%Y-%m-%d')
BUCKET_NAME = <bucket_name> # the name should not start with "gs://" (Eg: BUCKET_NAME=test_bucket)

try:
    #storage_client = storage.Client.from_service_account_json(KEY_FILE)
    #storage_bucket = storage_client.get_bucket(BUCKET_NAME)
    bigquery_credentials = service_account.Credentials.from_service_account_file(KEY_FILE)
    bq_client = bigquery.Client(credentials = bigquery_credentials)
        
except Exception as e:
    print("Error in getting GCP clients: " + str(e))
    
def get_mysql_conn():
    try:
        conn = pymysql.connect(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, port=DB_PORT)
    except Exception as e:
        print(e)
        sys.exit()
    return conn

def close_sql_conn(conn):
    conn.close()
    print("MYSQL CONN CLOSED")

# information_schema 에서 해당 database의 테이블명과 컬럼명 수집
def get_cols(conn):
    query = """SELECT table_name as TABLE_NAME, GROUP_CONCAT(CONCAT('`', column_name, '`')) AS COLUMNS FROM information_schema.columns
            WHERE table_schema = '""" + DB_NAME + """'  GROUP BY table_name;"""
    col_data = pd.read_sql(query, conn)
    return col_data

#어제 날짜의 데이터 추출
def get_ydate_data(conn, table_name, columns):
    query = """SELECT """ + columns + """ FROM """ + DB_NAME + """.""" + table_name  + """;"""
    df_ydate_data = pd.read_sql(query, conn)
    parquet_to_bq(df_ydate_data, table_name)
    #csv_to_bq(df_ydate_data, table_name)

# create csv and load data to BQ   
def csv_to_bq(df, table_name):
    uri = "gs://" + BUCKET_NAME + "/" + YESTERDAY + "/" + DB_NAME + """_""" + table_name
    try:
        df.to_csv(uri + ".csv", index = False)
        #storage_bucket.blob(YESTERDAY + "/" + DB_NAME + """_""" + table_name + ".csv").upload_from_filename(table_name + ".csv")
        #jobConfig = bigquery.LoadJobConfig()
        #jobConfig.skip_leading_rows = 1
        #jobConfig.source_format = bigquery.SourceFormat.CSV
        #jobConfig.write_disposition = bigquery.WriteDisposition.WRITE_APPEND   
        #jobConfig.autodetect=True
        #datasetName = "cloocus_dp_test"
        #targetTable = table_name
        #uri = BUCKET_NAME + "/" + YESTERDAY + "/" + DB_NAME + """_""" + table_name + ".csv"
        #tableRef = bq_client.dataset(datasetName).table(targetTable)
        #bigqueryJob = bq_client.load_table_from_uri(uri, tableRef, job_config=jobConfig)
        print(table_name + "table created!!!")
    except Exception as e:
        print(e)
        sys.exit()

# create parquet and load to BQ        
def parquet_to_bq(df, table_name):
    uri = "gs://" + BUCKET_NAME + "/" + YESTERDAY + "/" + DB_NAME + """_""" + table_name
    try:
        df.to_parquet(uri + ".parquet", engine="pyarrow", compression="gzip")
        print(table_name + " saved")
        #storage_bucket.blob(YESTERDAY + "/" + DB_NAME + """_""" + table_name + ".parquet").upload_from_filename(table_name + ".parquet")
        #jobConfig = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.PARQUET,)
        #jobConfig.write_disposition = bigquery.WriteDisposition.WRITE_APPEND   
        #jobConfig.autodetect=True
        #datasetName = "cloocus_dp_test"
        #targetTable = table_name
        #uri = BUCKET_NAME + "/" + YESTERDAY + "/" + DB_NAME + """_""" + table_name + ".parquet"
        #tableRef = bq_client.dataset(datasetName).table(targetTable)
        #bigqueryJob = bq_client.load_table_from_uri(uri, tableRef, job_config=jobConfig)
        #print(table_name + "table created!!!")
    except Exception as e:
        print("Create parquet error: " + str(e))
        sys.exit()
    
if __name__ == "__main__":
    start = time.time()
    print("started at: " + str(start))
    conn = get_mysql_conn()
    col_data = get_cols(conn)
    print(col_data)
    for idx, row in col_data.iterrows():
        print(idx)  
        try:
            get_ydate_data(conn, row["TABLE_NAME"], row["COLUMNS"])
        except Exception as e:
            print(e)
            sys.exit()

    close_sql_conn(conn)
    end = time.time()
    diff = end - start
    print("Took "  + str(diff))

GCS 에 PYTHON CODE 저장 (Bucket 은 미리 생성)

gsutil cp pywork.py gs://$BUCKET_NAME/

WORKFLOW TEMPLATE 생성

gcloud dataproc workflow-templates create $TEMPLATE_ID \\
--region=$REGION \\
--project=$PROJECT_ID

JOB 실행을 위한 MANAGED CLUSTER 생성

gcloud beta dataproc workflow-templates set-managed-cluster $TEMPLATE_ID \\
--region $REGION \\
--project=$PROJECT_ID \\
--cluster-name $WORK_CLUSTER_NAME \\
--metadata 'PIP_PACKAGES=cx-Oracle fsspec numpy pandas PyMySQL PyMSSQL pyarrow google-cloud-storage google-cloud-bigquery gcsfs' \\
--image-version 1.4 \\
--properties core:fs.defaultFS=gs://$BUCKET_NAME \\
--initialization-actions=gs://goog-dataproc-initialization-actions-asia-northeast3/python/pip-install.sh

# default fs 를 설정하여, datarframe 를 temp file 생성 안하고 바로 GCS에 업로드
#참고URL
<https://cloud.google.com/dataproc/docs/tutorials/python-configuration>

WORKFLOW TEMPLATE 에 JOB 연결

gcloud dataproc workflow-templates add-job pyspark \\
gs://$BUCKET_NAME/$PYTHON_FILE \\
--step-id $STEP_ID \\
--workflow-template $TEMPLATE_ID \\
--region $REGION \\
--properties spark.jars.packages='org.apache.spark:spark-avro_2.11:2.4.0' \\
--project=$PROJECT_ID

COMPOSER 생성하고 환경설정

Airflow Web UI -> Admin -> Variables ->CreateKey: project_id
Val: 프로젝트ID

DAG 작성 후 COMPOSER의 BUCKET에 업로드

"""Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
Spark Pi Job.

This DAG relies on an Airflow variabow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
"""

import datetime

from airflowimport models
from airflow.providers.google.cloud.operators.dataprocimport DataprocInstantiateWorkflowTemplateOperator
from airflow.utils.datesimport days_ago

project_id = models.Variable.get("project_id")
TEMPLATE_ID = "위에서 생성한 워크플로우 템플릿 ID"
default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
		"start_date": days_ago(1),
    "project_id": project_id,
}

# Define a DAG (directed acyclic graph) of tasks.# Any task you create within the context manager is automatically added to the# DAG object.with models.DAG(
# The id you will see in the DAG airflow page"dataproc_workflow_dag",
    default_args=default_args,
# The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),# Override to match your needs
)as dag:

    start_template_job = DataprocInstantiateWorkflowTemplateOperator(
# The task id of your job
        task_id=<워크플로우의 단계 (태스크)>,
# The template id of your workflow
        template_id=<워크플로우의 ID>,
        project_id=project_id,
# The region for the template
        region="asia-northeast3",
    )
gsutil cp composer-dataproc-dag.py gs://<AIRFLOW_BUCKET>/dags/
  • 참고: Dataframe to Big Query
from google.cloudimport bigquery
import pandas

df = pandas.DataFrame(
    {
        'my_string': ['a', 'b', 'c'],
        'my_int64': [1, 2, 3],
        'my_float64': [4.0, 5.0, 6.0],
        'my_timestamp': [
            pandas.Timestamp("1998-09-04T16:03:14"),
            pandas.Timestamp("2010-09-13T12:03:45"),
            pandas.Timestamp("2015-10-02T16:00:00")
        ],
    }
)

client = bigquery.Client()
table_id = 'my_dataset.new_table'

# Since string columns use the "object" dtype, pass in a (partial) schema# to ensure the correct BigQuery data type.
job_config = bigquery.LoadJobConfig(schema=[
    bigquery.SchemaField("my_string", "STRING"),
])

job = client.load_table_from_dataframe(
    df, table_id, job_config=job_config
)

# Wait for the load job to complete. (I omit this step)# job.result()