데이터 엔지니어(첫번째 이야기)
1. Introduction to Data Engineering
데이터 사이언티스트 : 데이터를 정리하고 다루고 구성
데이터 엔지니어 : 데이터베이스 및 대규모 처리 시스템과 같은 아키텍쳐 개발, 구성, 테스트 및 유지 관리
역할
데이터 엔지니어 | 데이터 사이언티스트 |
데이터를 수집하기 위한 프로세스 설정 | 통계 모델링 |
확장 가능한 데이터 아키텍처를 개발 | 기계 학습을 사용한 예측 모델 |
손상된 데이터 정리 | 데이터 패턴 마이닝 |
클라우드 기술 | 비즈니스 프로세스 모니터링 |
데이터 수집을 간소화 | 데이터의 통계적 특이치 정리 |
데이터 엔지니어의 기업으로 살펴본 역할 및 툴
유의할 점은 때로는 외부 API나 원시 파일 형태로 데이터를 가져와야 한다.
스케쥴러 : ex) Apache airflow 작업 예정, 작업 순서
클라우드 컴퓨팅 분야의 주요 기업
1) Amazon Web Service(AWS)
2) Microsoft Azure
3) Google Cloud Platform
클라우드 서비스
Storage | Compute | Database |
Amazon S3 | Google Compute Engine | Azure SQL Database |
Google Cloud Storage | Amazon EC2 | Amazon RDS |
Azure Blob Storage | Azure Virtual Machines | Google Cloud SQL |
2. Data engineering toolbox
1) 데이터베이스란 ?
빠른 검색을 위해 구성된 대규모 데이터 집합
2) 데이터베이스와 파일 시스템의 차이점
데이터베이스 관리 시스템이 검색, 복제와 같은 복잡한 데이터 작업을 추상화함. 파일 시스템의 경우 이러한 기능을 호스트하지 않음.
3) 정형 및 비정형 데이터
정형 데이터는 도식적이며, 잘 정의된 구조이다. ex) 관계형 데이터베이스의 표형 데이터
비정형 데이터는 도식적이지 못하며 파일처럼 보인다. ex) 사진, 비디오
* 정형 데이터와 비정형 데이터의 경계에 반정형 데이터가 많이 존재한다. ex ) 구조화된 JSON 데이터
4) SQL vs NoSQL
SQL 데이터베이스를 관계형 데이터베이스라 함. 스키마가 존재. ex) mysql , postgre
*데이터베이스 스키마는 관계 및 속성을 정의.
NoSQL은 비관계형 데이터베이스로 종종 구조화되지 못하고, 스키마가 없음
ex) Redis : key - value (caching) , MongoDB - document (JSON objects)
고객 및 주문에 대한 데이터베이스 스키마
# 판다스로 SQL 읽기 first_name, last_name을 select하는데 last_name, first_name 순으로 오름차순 정리
data = pd.read_sql("""
SELECT first_name, last_name FROM "Customer"
ORDER BY last_name, first_name
""", db_engine)
# Show the first 3 rows of the DataFrame
print(data.head(3))
# Show the info of the DataFrame
print(data.info())
# pandas로 SQL 읽고 테이블 JOIN
data = pd.read_sql("""
SELECT * FROM "Customer"
INNER JOIN "Order"
ON "Order"."customer_id"="Customer"."id"
""", db_engine)
# Show the id column of data
print(data.id)
7) 병렬 컴퓨팅이란?
빅데이터 처리시에 수행되는 작업을 여러개의 하위 작업으로 나눔 -> 하위 작업을 여러 시스템에 배포
왜 사용하는가? 메모리 관리 및 처리 능력
* 장점 : 한 컴퓨터의 메모리에 모든 데이터를 로드하는 대신 데이터를 분할하고 다른 컴퓨터의 메모리에 하위 집합을 로드할 수 있음. 즉, 컴퓨터당 메모리 공간이 적게 사용되며 RAM 프로세서 메모리에 들어갈 수 있음
-> 우선순위
* 단점 : 처리 요구사항이 적거나 처리 장치가 적은 경우 병목현상이 발생할 수 있음. 이를 parallel slowdown이라 함.
8) 다중 처리
동일한 컴퓨터의 여러 코어에 작업을 분산하는 풀(pool) - API
# Function to apply a function over multiple cores
@print_timing
def parallel_apply(apply_func, groups, nb_cores):
with Pool(nb_cores) as p:
results = p.map(apply_func, groups)
return pd.concat(results)
# Parallel apply using 1 core
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 1)
# Parallel apply using 2 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 2)
# Parallel apply using 4 cores
parallel_apply(take_mean_age, athlete_events.groupby('Year'), 4)
9) dask
패키지로 추상화 계층을 제공함.
import dask.dataframe as dd
# Set the number of partitions
athlete_events_dask = dd.from_pandas(athlete_events, npartitions=4)
# Calculate the mean Age per Year
print(athlete_events_dask.groupby('Year').Age.mean().compute())
10) 병렬 컴퓨팅 프레임워크
하둡 : Apache software foundation 에서 관리하는 오픈 소스 프로젝트 모음
오픈소스 - 1. HDFS : 분산 파일 시스템, 파일이 여러 컴퓨터에 존재함 -> 요즘 Amazon S3
2. MapReduce : 프로그램이 작업을 하위 작업으로 분할하여 여러 장치 간의 워크로드와 데이터를 분산.
처리 단위는 클러스터 내 여러 컴퓨터들
작성이 어려움 -> Hive
3. Hive : Hive의 SQL 변형을 사용하여 구조화된 방식으로 쿼리. 다른 데이터 프로세싱 툴과 통합
스파크 : 병렬 연산 프레임워크 , 데이터 처리 작업을 컴퓨터 클러스터에 분산
MapReduce 기반 시스템은 작업 간에 고가의 디스크 쓰기가 필요하지만 스파크는 메모리에 가능한 많은 프로 세싱 -> Apache Software Foundation에서 프로젝트를 유지 관리
아키텍쳐는 분산 데이터셋 RDD : 네이밍된 열이 존재하지 않음.
PySpark - python 인터페이스 Pandas DataFrame 과 비슷
Hive의 SQL을 추상화하는 대신 DataFrame을 추상화
# Print the type of athlete_events_spark
print(type(athlete_events_spark))
# Print the schema of athlete_events_spark
print(athlete_events_spark.printSchema())
# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean('Age'))
# Group by the Year, and find the mean Age
print(athlete_events_spark.groupBy('Year').mean('Age').show())
PySpark 실습 - 4개의 스레드에서 실행되는 local spark instance로 작업
11) Workflow 스케쥴링 프레임워크
1) linux's cron, 2) spotify's Luigi 3) Apache Airflow
cron과 같이 간단한 툴이 존재하지만 csv와 api에서 데이터를 함께 결합하는 작업일 경우, 이는 충분치 못함.
DAG : 비순환 그래프 사용, 사이클이 없음.
Airflow - 파이썬을 이용하여 복잡한 파이프라인을 구축하는 DAG를 만들고 테스트 가능
# Create the DAG object
dag = DAG(dag_id="car_factory_simulation",
default_args={"owner": "airflow","start_date": airflow.utils.dates.days_ago(2)},
schedule_interval="0 * * * *")
# Task definitions
assemble_frame = BashOperator(task_id="assemble_frame", bash_command='echo "Assembling frame"', dag=dag)
place_tires = BashOperator(task_id="place_tires", bash_command='echo "Placing tires"', dag=dag)
assemble_body = BashOperator(task_id="assemble_body", bash_command='echo "Assembling body"', dag=dag)
apply_paint = BashOperator(task_id="apply_paint", bash_command='echo "Applying paint"', dag=dag)
# Complete the downstream flow
assemble_frame.set_downstream(place_tires)
assemble_frame.set_downstream(assemble_body)
assemble_body.set_downstream(apply_paint)
3. Extract, Transform and Load (ETL)
1) 데이터 추출 : 영구 스토리지(Amazon S3의 파일, SQL 데이터베이스 등)에서 메모리로 데이터를 추출.
1. 텍스트 파일, 플랫 파일, JSON 데이터
2. 데이터베이스
웹 서비스와 같은 응용프로그램이 사용하는 데이터베이스는 일반적으로 많은 트랜잭션을 가지도록 최적화
* 트랜잭션은 일반적으로 행 또는 레코드를 데이터베이스에 변경하거나 삽입
OLTP : 행별로 데이터를 추가하는 행 지향적, 온라인 트랜잭션 처리
OLAP : 분석에 최적화된 열 지향적 , 온라인 분석 처리
import requests
# Fetch the Hackernews post
resp = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")
# Print the response parsed as JSON
print(resp.json())
# Assign the score of the test to post_score
post_score = resp.json()["score"]
print(post_score)
Read from a database
# Function to extract table to a pandas DataFrame
def extract_table_to_pandas(tablename, db_engine):
query = "SELECT * FROM {}".format(tablename)
return pd.read_sql(query, db_engine)
# Connect to the database using the connection URI
# username, password, host, port, database
connection_uri = "postgresql://repl:password@localhost:5432/pagila"
db_engine = sqlalchemy.create_engine(connection_uri)
# Extract the film table into a pandas DataFrame
extract_table_to_pandas("film", db_engine)
# Extract the customer table into a pandas DataFrame
extract_table_to_pandas("customer", db_engine)
2) 데이터 변환
1. split
# Get the rental rate column as a string
rental_rate_str = film_df.rental_rate.astype("str")
# Split up and expand the column
rental_rate_expanded = rental_rate_str.str.split(".", expand=True)
# Assign the columns to film_df
film_df = film_df.assign(
rental_rate_dollar=rental_rate_expanded[0],
rental_rate_cents=rental_rate_expanded[1],
)
2. join
# Use groupBy and mean to aggregate the column
ratings_per_film_df = rating_df.groupby('film_id').mean('film_id')
# Join the tables using the film_id column
film_df_with_ratings = film_df.join(
ratings_per_film_df,
film_df.film_id==ratings_per_film_df.film_id
)
# Show the 5 first results
print(film_df_with_ratings.show(5))
3) 데이터 분석
온라인 트랜잭션 처리 또는 OLTP를 위해 애플리케이션 데이터베이스 최적화
온라인 분석 처리 도는 OLAP를 위해 분석 데이터베이스 최적화
- 분석 쿼리가 대부분 표의 작은 열 하위 집합에 대한것이라 열에 대한 쿼리가 빠름, 행 지향 시스템에서는 사용하지 않는 행을 건너뛰는 작업에 시간 낭비 발생
4) MPP 데이터베이스
- ETL 프로세스가 끝나면 대상 데이터베이스로 이동 가능. 이를 대규모 병렬 처리 데이터베이스라 함. 분석용 최적화된 칼럼 지향 데이터베이스 분산 방식
- 쿼리는 단일 컴퓨팅 노드에서 실행되는 것이 아니라 하위 작업으로 분할되어 여러 노드에 분산됨.
ex) Amazon Redshift, Azure SQL Data Warehouse, Google BigQuery
Redshift - AWS의 S3에 파일을 쓰고 Redshift에 복사 쿼리를 보내는게 좋음. MPP 데이터베이스는 컬럼형 저장소 형 식을 사용하는 파일에서 데이터를 잘 로드함.
csv는 좋은 옵션이 아니며 데이터프레임에서 .to_parquet() , PySpark에서 .write_parquet() 사용하여 Postgre를 이용하여 Redshift에 연결
SQL 연결 URI를 이용하여 S3에서 Redshift로 연결
PostgreSQL - 변환 단계의 결과를 로드
데이터 파이프라인이 데이터를 추출하고 이를 변환하여 로드
pandas 데이터프레임에서 .to_sql()사용. 테이블이 이미 존재하는 경우에 전략 : 실패, 교체, 추가
# Write the pandas DataFrame to parquet
film_pdf.to_parquet("films_pdf.parquet")
# Write the PySpark DataFrame to parquet
film_sdf.write.parquet("films_sdf.parquet")
# Finish the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/dwh"
db_engine_dwh = sqlalchemy.create_engine(connection_uri)
# Transformation step, join with recommendations data
film_pdf_joined = film_pdf.join(recommendations)
# Finish the .to_sql() call to write to store.film
film_pdf_joined.to_sql("film", db_engine_dwh, schema="store", if_exists="replace")
# Run the query to fetch the data
pd.read_sql("SELECT film_id, recommended_film_ids FROM store.film", db_engine_dwh)
5) ETL 스케쥴링
postgre 추출 : extract_table_to_df(), SQL 테이블을 팬더 데이터 프레임으로 만듬
변환 : split_columns_transform()
로드 : load_df_into_dwh()
Apache Airflow - 순환 그래프를 python
DAG를 만듬 -> schedul_interval 를 이용하여 DAG를 실행해야 하는 시기를 정의
ex) 크론 식 : 공백으로 구분된 다섯개의 문자열 ( 분 시간 일 월 요일 )
.set_downstream() 메서드를 사용하여 작업 간의 업스트림 또는 다운스트림 설정 가능
wait_for_this_task 완료 후 etl_task 실행
# Define the ETL function
def etl():
film_df = extract_film_to_pandas()
film_df = transform_rental_rate(film_df)
load_dataframe_to_film(film_df)
# Define the ETL task using PythonOperator
etl_task = PythonOperator(task_id='etl_film',
python_callable=etl,
dag=dag)
# Set the upstream to wait_for_table and sample run etl()
etl_task.set_upstream(wait_for_table)
etl()
dag.py -> mv dags 폴더
매 자정에 실행되는 스케쥴링 확인
과거 스키마를 짜서 ETL(extract - transfrom - load) 즉, 데이터 추출/스키마에 따라 변형/저장 했던 방대한 데이터 웨어하우스를 기반으로 하던 방식에서 스키마의 형태에 분리됨에 따라 데이터 레이크에 우선 싣고 이후에 가공하는 ELT(extract - load - transform)가 현재 데이터 관리의 흐름.
1. AWS 스토리지 서비스인 S3에 우선 데이터를 모두 다 넣고 (데이터 레이크의 형식. 정리되지 않음) 그 이후 정제
2. AWS Redshift에서 1차 정제를하고 Athena, RDS, DynamoDB에서 최종 2차 정제
3. 정제된 데이터를 통해 분석/시각화는 AWS SageMaker, QuickSight(시각화) 등을 활용해도 되고 다른 툴을 사용해도 된다.