도찐개찐

[Python] 스파크(spark) 본문

PYTHON/데이터분석

[Python] 스파크(spark)

도개진 2022. 12. 26. 12:03

데이터 분석 환경

분석 환경은 주로 엔지니어 및 회사 고유의 상황에 따라 결정된다. 분석가는 환경적/구조적 특성과 제한점 등 여러 사항을 고려하여 분석을 진행한다. 특히 데이터 수집 과정을 분석 목적에 맞게 최적화 하는 등의 목적을 위해 분석가가 환경 및 구조에 관여하기도 한다. 물론, 분석가가 주도적으로 처음부터 환경을 설정하고 구조를 쌓아올라가는 경우도 있지만 이는 일반적인 상황이라고 보기 어렵다.

분석가가 좋은 성과를 내기 위해서는 분석 환경을 잘 이해/활용하고 때로는 (분석 관점에 맞게) 개선점을 엔지니어에게 전달하는 등 역할이 필요하다. 따라서 (실무는 엔지니어가 진행하더라도) 환경/시스템적 요소에 대한 이해와 지속적인 관여 역시 분석가의 역할이기도 하다.

스파크 소개

최근 비정형 데이터의 생성과 매우 큰 사이즈 등의 이슈로 기존 RDBS에서 하둡/스파크를 도입하는 추세이다. 비록 RDBS만큼 즉각적 생성/수정/변경 등은 어렵지만, Spark나 하둡을 이용할 경우 분산 저장 및 처리를 통해 빠른 분석 진행이 가능하다. 최근에는 하둡 보다 분석 친화적인 스파크를 주로 사용해 분석하는 추세이다. 스파크가 Pyspark이나 SparkR 같은 다양한 분석 API를 제공하고 있기 때문이다. 참고로 하둡은 Java, Spark는 원래 스칼라 기반이다.

Source: Nimisha Sharath Sharma

  • SparkSQL
  • DataFrame API
  • MLlib
  • Python

스파크 RDD, DataFrame, Lazy execution

스파크에서 다루는 주요 데이터 타입은 RDD(Resilient Distributed Datasets)와 DataFrame이다. 기존 하둡에서는 디스크에서 데이터 I/O가 발생하는 반면, 스파크는 RAM에서 발생하게 설정할 수 있으므로 속도에서 비약적인 차이가 발생한다. 최근에는 RDD보다 DataFrame을 이용하는 추세이며(RDBS의 테이블이나 Pandas Dataframe과 유사하기 때문), Spark의 특징인 Lazy execution을 통해 보다 효율적인 처리/분석이 가능하다.

Lazy Execution은 함수를 Transform, Action 으로 구분해 Action 일 경우에만 실제 실행이 발생하는 것을 의미한다. 매번 결과를 갖고 오지 않고, 필요한 경우에만 RAM을 통해 데이터 I/O가 발생하므로 분석 작업 속도가 매우 높아진다. Spark에서 데이터 분석을 하는 경우, 매우 큰 사이즈의 데이터를 다루는 경우가 많아 이러한 매커니즘은 매우 중요한 장점으로 작용한다. (다행히 Transform 단계라도 에러를 내보내므로 Action 단계에서 제대로 결과가 나왔는지 걱정할 필요는 없다)

RDD

  • Distribute collection of JVM objects
  • Funtional Operators (map, filter, reduceByKey, ect)

Source: Research Computing

DataFrame

  • Distribute collection of Row objects
  • Expression-based operations and UDFs
  • Logical plans and optimizer
  • Fast/efficient internal reprenstations

Source: Duchess france

Lazy Execution

  • Transfrom: filter, select, drop, join, dropDuplicates, distinct, withColumn, pivot, get_json_object, sample
  • Action: count, collect, show, head, take

Source: Birendra Kumar Sahu

Spark 데이터 추출 및 전처리

# import modules
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

# read the csv with library
df = sqlContext.read.format('com.databricks.spark.csv')\
                    .options(header='true', inferSchema='true')\
                    .load('/Users/woowahan/Documents/Python/DS_Ext_School/tutorial_01/doc_use_log.csv')

# convert the df to tmp table (as if it's in database)
df.registerTempTable("df_tmp")

# extract data from table with sql
df1 = sqlContext.sql("select ismydoc, actiontype, sessionid, datetime from df_tmp where ismydoc = true")

## Lazy Execution
df2 = sqlContext.sql("select * from df_tmp")

df2_pdf = df2.select("sessionid", "ext").filter(" ext == 'PDF' or ext = 'DOC'").dropDuplicates().cache()
df2.distinct().count()

df2_min_date = df2.groupby("sessionid").agg(min("datetime").alias("min_date"))
df2_min_date.show()

df2_join = df2_pdf.join(df2_min_date, "sessionid", "left")
df2_join.show()

df2_join1 = df2_join.groupby("min_date", "ext").agg(count("sessionid").alias("cnt"))

df2_join1.describe().show()

# Pandas
df2_pd = df2.toPandas()
df2_pd.head()
df2_pd.describe()

스파크 Modules & 머신러닝

스파크가 최근에 각광을 받게 된 배경에는 스파크가 제공하는 모듈도 영향을 미쳤다. 스파크는 분산처리프레임 위에 Spark Streaming, SparkSQL, MLlib, GraphX와 같은 모듈을 제공하여 실시간 수집부터 데이터 추출/전처리, 머신러닝 및 그래프 분석까지 하나의 흐름에 가능하도록 개발되었다. 각 모듈의 특성을 살펴보자.

  • Spark SQL: Spark Wrapper 함수에 SQL 쿼리를 넣어 추출/전처리/분석이 쉽게 가능하도록 지원
  • MLlib: 머신러닝 알고리즘 제공 (코드 예시)
  • Spark Streaming: 실시간 데이터 처리
  • GraphX: 그래프 분석 라이브러리

위 4개의 모듈 중에 분석가가 많이 사용하는 것은 Spark SQL과 Mllib이다. 아래 예시 코드를 보자.

# import modules
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col, stddev_samp

# read datafiles
df = sqlContext.read.format('com.databricks.spark.csv')\
                    .options(header='true', inferSchema='true')\
                    .load('/Users/woowahan/Documents/Python/DS_Ext_School/tutorial_01/Default.csv')\
                    .drop("_c0")\
                    .cache()

# transform categorical values to int
strIdx = StringIndexer(inputCol = "student", outputCol = "studentIdx")

# one-hot encoding
encode = OneHotEncoder(inputCol = "studentIdx", outputCol = "studentclassVec")

# transform categorical values to int
label_StrIdx = StringIndexer(inputCol = "default", outputCol = "label")

# set stages for pipeline
stages = [strIdx, encode, label_StrIdx]

# columns
numCols = ['income', 'balance']
for c in numCols:
    df = df.withColumn(c + "Scaled", col(c)/df.agg(stddev_samp(c)).first()[0])

# set inputs and append it to the stage
inputs = ["studentclassVec", "incomeScaled", "balanceScaled"]
assembler = VectorAssembler(inputCols = inputs, outputCol = "features")
stages += [assembler]

# create pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
dataset = pipelineModel.transform(df)

# cross validation and fit models
(train, test) = dataset.randomSplit([0.7, 0.3], seed = 14)
lr = LogisticRegression(labelCol = "label", featuresCol = "features", maxIter=10)

lrModel = lr.fit(train)
predictions = lrModel.transform(test)
predictions.show()

# evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions) # AUC

# grid search for parametor tuning
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
evaluator.evaluate(predictions)

출처

728x90

'PYTHON > 데이터분석' 카테고리의 다른 글

[Python] 선형회귀  (2) 2022.12.26
인공지능 개념  (0) 2022.12.26
[Python] folium(지도시각화)  (0) 2022.12.26
[Python] plot 한글 사용  (0) 2022.12.26
[Python] 판다스(pandas)  (0) 2022.12.26
Comments