เปิดใช้งาน Apache Airflow บน GCP ในไม่กี่คลิก

Pratya Thanwatthanakit
4 min readMar 13, 2020

--

หากเรามีจำเป็นต้องใช้งาน ETL , Automate workflow ในงาน Production แบบเร่งด่วน / ทันที Cloud Composer (Apache Airflow) ก็เป็นตัวเลือกหนึ่งที่น่าสนใจไม่น้อย เพราะเวลาและคนที่ใช้ไปในการ setup ถือว่าประหยัดไปมากๆ

ถ้ากรณีอยากทดสอบ Google ก็ยังมี Option ให้ทดสอบ + $300 free credit ให้ลองเล่น (แต่ก็ต้องแลกกับการใส่บัตรเครดิตไป)

รายละเอียดราคาแบบคร่าวๆ

ราคาที่คิดเป็นต่อ ชม. ซึ่งยกมาเฉพาะของ asia-norteast zone จะได้ดังนี้ ต่อ 1 ชุด

โดยทาง Google ได้ทำการ ยกตัวอย่าง การคำนวณมูลค่าที่ต้องจ่ายต่อเดือน จากสเปคขั้นต่ำ / us-central1 zone โดยสรุปได้ว่ายอดที่ต้องจ่าย คือ 76.06 $ ต่อเดือน

มาเริ่มต้นใช้งานกัน !!

เริ่มจากเข้าใช้งาน ที่ http://cloud.google.com/

จากนั้นเลือกไปเมนู BIG DATA > Composer
(Google จะเรียกเป็น Composer service)

หลังจากเปิดเข้ามาในหน้าจอของ Composer ให้กด CREATE เพื่อทำการสร้าง Composer ใหม่ขึ้นมา

จากนั้นเริ่มสร้าง Composer กัน โดยรายละเอียดการสร้าง จะมีดังนี้
Name : ชื่อของ Composer
Node count : จำนวนของ node ใน Kubernetes Engine Cluster ที่ใช้ในโปรเจคนี้ ขั้นต่ำ คือ 3 Node
Location : เลือก Location ตอนนี้ใกล้เราสุดจะเป็น asia-norteast (Tokyo)
Zone : เลือกโซน
Machine type : เลือกสเปคของ Cluster Node โดยเริ่มต้มที่ 1 vCPUs, 7.5 GB RAM
Disk size : ขนาดพื้นที่ที่ใช้เริ่มต้นที่ 20 GB
OAuth Scopes : Google API ที่ใช้ในโปรเจคถ้าว่างไว้ default จะเป็น https://www.googleapis.com/auth/cloud-platform
Service Account : เลือก Service Account ที่ใช่ในโปรเจค
Tag : ใส่ Tag เพื่อให้ง่ายในการค้นหา
Image Version : เลือก image airflow version
Python Version : เลือก Python version

จากนั้นกดปุ่ม Create แล้วก็รอพักใหญ่ๆ …
ถ้าเรียบร้อยก็ได้ Composer ที่พร้อมใช้แล้ว

ลองกดเข้าไปดู รายละเอียดด้านในจะเจอข้อมูลตามที่เราสร้างมา
และหลักๆที่เราสนใจ มี 2 ส่วนคือ
DAGs folder : Google Storage ที่เก็บ DAG File
Airflow web UI : web UI ของ airflow

My first DAG on Google Cloud Composer

มาทดสอบว่า Airflow ทำงานได้หรือไม่ โดย Copy โค้ดด้านล่าง และ Save เป็นไฟล์ชื่อ “airflow_monitoring.py” (จะตั้งชื่ออื่นก็ได้แล้วแต่ชอบนะครับ)

from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)

t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
dag=dag,
)
dag.doc_md = __doc__

t1.doc_md = """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)

t1 >> [t2, t3]

จากนั้น Drag and drop file หรือ Upload File ลงไปใน dags Folder

เข้าไปเช็คที่ Airflow web UI จะพบว่ามี DAG task ขึ้นมา เป็นอันสำเร็จ !!

Additional

เมื่อใช้ไปซักพักจะพบปัญหาเรื่อง python package ที่ต้อง install เพิ่ม ตัว Composer ก็มี PYPI Packages เพื่อให้เราเพื่อ Python Package ได้อย่างสะดวกสบาย
โดยเข้าไปที่ Environment details > PYPI PACKAGES
จากนั้นก็ใส่ package name, version ตามรูปได้เลย

เมื่อ save แล้วต้องรอไปประมาณ​20นาที ต่อการ save เพื่อเพิ่ม package และสามารถใช้งาน package ใหม่ๆได้

ก็จบไปแล้วนะครับสำหรับบทความนี้ หวังว่าจะได้รับประโยชน์กันบ้างนะครับ ☺

--

--