Analysis/Big Data & Machine Learning
[GCP] Airflow + Python (Importing from mysql)
sonny.kim
2021. 11. 11. 13:21
UBUNTU CLIENT CONFIGURATION (리눅스에서 코딩 및 GCP 컨트롤 진행)
#gsutil 설치
<https://cloud.google.com/storage/docs/gsutil_install>
gcloud auth activate-service-account --key-file=<미리만든 ACCOUNT KEY> (권한: dataproc editor, storage object creater)
or
gcloud init
export TEMPLATE_ID=
export WORK_CLUSTER_NAME=
export REGION=
export BUCKET_NAME=
export PROJECT_ID=
export STEP_ID=
export PYTHON_FILE=main.py
PYTHON CODE
$vi pywork.py
import pymysql
import sys
import pandas as pd
from datetime import datetime, timedelta
from google.cloud import storage
from google.cloud import bigquery
from google.oauth2 import service_account
import time
KEY_FILE = <service account key file> #needed to load parquet to BQ dataset
DB_NAME = <source database name>
DB_HOST = <source mysql host ip/name>
DB_USER = <db user name>
DB_PASSWORD = <db password>
DB_PORT = <db_port>
YESTERDAY = datetime.strftime((datetime.now() - timedelta(1)), '%Y-%m-%d')
BUCKET_NAME = <bucket_name> # the name should not start with "gs://" (Eg: BUCKET_NAME=test_bucket)
try:
#storage_client = storage.Client.from_service_account_json(KEY_FILE)
#storage_bucket = storage_client.get_bucket(BUCKET_NAME)
bigquery_credentials = service_account.Credentials.from_service_account_file(KEY_FILE)
bq_client = bigquery.Client(credentials = bigquery_credentials)
except Exception as e:
print("Error in getting GCP clients: " + str(e))
def get_mysql_conn():
try:
conn = pymysql.connect(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, port=DB_PORT)
except Exception as e:
print(e)
sys.exit()
return conn
def close_sql_conn(conn):
conn.close()
print("MYSQL CONN CLOSED")
# information_schema 에서 해당 database의 테이블명과 컬럼명 수집
def get_cols(conn):
query = """SELECT table_name as TABLE_NAME, GROUP_CONCAT(CONCAT('`', column_name, '`')) AS COLUMNS FROM information_schema.columns
WHERE table_schema = '""" + DB_NAME + """' GROUP BY table_name;"""
col_data = pd.read_sql(query, conn)
return col_data
#어제 날짜의 데이터 추출
def get_ydate_data(conn, table_name, columns):
query = """SELECT """ + columns + """ FROM """ + DB_NAME + """.""" + table_name + """;"""
df_ydate_data = pd.read_sql(query, conn)
parquet_to_bq(df_ydate_data, table_name)
#csv_to_bq(df_ydate_data, table_name)
# create csv and load data to BQ
def csv_to_bq(df, table_name):
uri = "gs://" + BUCKET_NAME + "/" + YESTERDAY + "/" + DB_NAME + """_""" + table_name
try:
df.to_csv(uri + ".csv", index = False)
#storage_bucket.blob(YESTERDAY + "/" + DB_NAME + """_""" + table_name + ".csv").upload_from_filename(table_name + ".csv")
#jobConfig = bigquery.LoadJobConfig()
#jobConfig.skip_leading_rows = 1
#jobConfig.source_format = bigquery.SourceFormat.CSV
#jobConfig.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
#jobConfig.autodetect=True
#datasetName = "cloocus_dp_test"
#targetTable = table_name
#uri = BUCKET_NAME + "/" + YESTERDAY + "/" + DB_NAME + """_""" + table_name + ".csv"
#tableRef = bq_client.dataset(datasetName).table(targetTable)
#bigqueryJob = bq_client.load_table_from_uri(uri, tableRef, job_config=jobConfig)
print(table_name + "table created!!!")
except Exception as e:
print(e)
sys.exit()
# create parquet and load to BQ
def parquet_to_bq(df, table_name):
uri = "gs://" + BUCKET_NAME + "/" + YESTERDAY + "/" + DB_NAME + """_""" + table_name
try:
df.to_parquet(uri + ".parquet", engine="pyarrow", compression="gzip")
print(table_name + " saved")
#storage_bucket.blob(YESTERDAY + "/" + DB_NAME + """_""" + table_name + ".parquet").upload_from_filename(table_name + ".parquet")
#jobConfig = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.PARQUET,)
#jobConfig.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
#jobConfig.autodetect=True
#datasetName = "cloocus_dp_test"
#targetTable = table_name
#uri = BUCKET_NAME + "/" + YESTERDAY + "/" + DB_NAME + """_""" + table_name + ".parquet"
#tableRef = bq_client.dataset(datasetName).table(targetTable)
#bigqueryJob = bq_client.load_table_from_uri(uri, tableRef, job_config=jobConfig)
#print(table_name + "table created!!!")
except Exception as e:
print("Create parquet error: " + str(e))
sys.exit()
if __name__ == "__main__":
start = time.time()
print("started at: " + str(start))
conn = get_mysql_conn()
col_data = get_cols(conn)
print(col_data)
for idx, row in col_data.iterrows():
print(idx)
try:
get_ydate_data(conn, row["TABLE_NAME"], row["COLUMNS"])
except Exception as e:
print(e)
sys.exit()
close_sql_conn(conn)
end = time.time()
diff = end - start
print("Took " + str(diff))
GCS 에 PYTHON CODE 저장 (Bucket 은 미리 생성)
gsutil cp pywork.py gs://$BUCKET_NAME/
WORKFLOW TEMPLATE 생성
gcloud dataproc workflow-templates create $TEMPLATE_ID \\
--region=$REGION \\
--project=$PROJECT_ID
JOB 실행을 위한 MANAGED CLUSTER 생성
gcloud beta dataproc workflow-templates set-managed-cluster $TEMPLATE_ID \\
--region $REGION \\
--project=$PROJECT_ID \\
--cluster-name $WORK_CLUSTER_NAME \\
--metadata 'PIP_PACKAGES=cx-Oracle fsspec numpy pandas PyMySQL PyMSSQL pyarrow google-cloud-storage google-cloud-bigquery gcsfs' \\
--image-version 1.4 \\
--properties core:fs.defaultFS=gs://$BUCKET_NAME \\
--initialization-actions=gs://goog-dataproc-initialization-actions-asia-northeast3/python/pip-install.sh
# default fs 를 설정하여, datarframe 를 temp file 생성 안하고 바로 GCS에 업로드
#참고URL
<https://cloud.google.com/dataproc/docs/tutorials/python-configuration>
WORKFLOW TEMPLATE 에 JOB 연결
gcloud dataproc workflow-templates add-job pyspark \\
gs://$BUCKET_NAME/$PYTHON_FILE \\
--step-id $STEP_ID \\
--workflow-template $TEMPLATE_ID \\
--region $REGION \\
--properties spark.jars.packages='org.apache.spark:spark-avro_2.11:2.4.0' \\
--project=$PROJECT_ID
COMPOSER 생성하고 환경설정
Airflow Web UI -> Admin -> Variables ->CreateKey: project_id
Val: 프로젝트ID
DAG 작성 후 COMPOSER의 BUCKET에 업로드
"""Example Airflow DAG that kicks off a Cloud Dataproc Template that runs a
Spark Pi Job.
This DAG relies on an Airflow variabow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataproc Template.
"""
import datetime
from airflowimport models
from airflow.providers.google.cloud.operators.dataprocimport DataprocInstantiateWorkflowTemplateOperator
from airflow.utils.datesimport days_ago
project_id = models.Variable.get("project_id")
TEMPLATE_ID = "위에서 생성한 워크플로우 템플릿 ID"
default_args = {
# Tell airflow to start one day ago, so that it runs as soon as you upload it
"start_date": days_ago(1),
"project_id": project_id,
}
# Define a DAG (directed acyclic graph) of tasks.# Any task you create within the context manager is automatically added to the# DAG object.with models.DAG(
# The id you will see in the DAG airflow page"dataproc_workflow_dag",
default_args=default_args,
# The interval with which to schedule the DAG
schedule_interval=datetime.timedelta(days=1),# Override to match your needs
)as dag:
start_template_job = DataprocInstantiateWorkflowTemplateOperator(
# The task id of your job
task_id=<워크플로우의 단계 (태스크)>,
# The template id of your workflow
template_id=<워크플로우의 ID>,
project_id=project_id,
# The region for the template
region="asia-northeast3",
)
gsutil cp composer-dataproc-dag.py gs://<AIRFLOW_BUCKET>/dags/
- 참고: Dataframe to Big Query
from google.cloudimport bigquery
import pandas
df = pandas.DataFrame(
{
'my_string': ['a', 'b', 'c'],
'my_int64': [1, 2, 3],
'my_float64': [4.0, 5.0, 6.0],
'my_timestamp': [
pandas.Timestamp("1998-09-04T16:03:14"),
pandas.Timestamp("2010-09-13T12:03:45"),
pandas.Timestamp("2015-10-02T16:00:00")
],
}
)
client = bigquery.Client()
table_id = 'my_dataset.new_table'
# Since string columns use the "object" dtype, pass in a (partial) schema# to ensure the correct BigQuery data type.
job_config = bigquery.LoadJobConfig(schema=[
bigquery.SchemaField("my_string", "STRING"),
])
job = client.load_table_from_dataframe(
df, table_id, job_config=job_config
)
# Wait for the load job to complete. (I omit this step)# job.result()