Spark
Spark 이해하기
•
Hadoop MapReduce 대비 100배까지 성능 향상 ( 애플리케이션 비지니스 로직에 따라 많이 차이남 )
•
python 으로 코드를 작성할 경우 결도의 Python Process 에서 처리하고 결과값을 JVM 으로 가져오기 때문에 Performance 가 상대적으로 조금 느리다
어드민 WEB UI
•
포트는 수정이 가능하다
•
Standalone
◦
http://{IP}:8080
•
Worker
◦
http://{IP}:8081
•
Driver
◦
DAG 정보를 보여준다
◦
http://{IP}:4040
•
History
◦
http://{IP}:18080
Hadoop - on Disk Limitations
•
데이터를 직렬화 하고, 복제화하여 디스크에 저장하기 때문에 느리다
•
각 iteration (Map, Reduce) 을 수행할 때 마다 디스크에 작성해야 한다
•
Iterative Algorithms, Interactive Data Mining 에 대해서 굉장히 오래걸린다
Spark - In Memory + DAG
•
Iteration 을 수행할 때 자주 사용되는 데이터는 메모리에 캐시한다
◦
e.g) flatMap → filter → map → reduceByKey → ...
Spark vs Hadoop
•
100TiB 데이터를 기준으로 정렬을 수행
◦
Hadoop
▪
Execution Time: 72 min
▪
Node: 2,100
▪
Core: 50,400
◦
Spark
▪
Execution Time: 23 min
▪
Node: 206
▪
Core: 6,592
◦
10배 정도 적은 Node 로 3배 이상 빠르게 처리함
•
클라우드 환경에서 처리한다고 했을 때 Spark 가 3배 이상 비용이 저렴함
◦
수행시간이 짧고, Node 대수가 적기 때문에
•
알고리즘 처리에서 처리 성능 속도가 굉장히 많이 차이남 ( Iterative Algorithm in Memory )
•
Hadoop 와 비교하여 사용하기 편하다
val spark = new SparkContext(master, appName, (sparkHome), (jars))
val file = spark.textFile("hdfs://...")
val counts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")
Scala
복사
•
Hadoop 대비 Spark 가 훨씬 다양한 API 를 제공한다
Spark Components
•
Spark Driver
◦
사용자가 작성한 애플리케이션
•
Spark Context
•
Spark Application
◦
Driver 와 Executor 들에서 실행되는 프로그램
Spark Context
•
Spark 프로그램의 기본은 Spark Context 를 만들어야 한다
•
Spark Context 를 통해서 Cluster Resource Manager 통신하여 Executor 를 할당 받는다
RDD
•
분산환경 컬렉션이지만, 로컬에서 하나의 객체처럼 사용하면 된다
•
Programming Interface
◦
Transformations
▪
항상 새로운 Dataset 을 생성함
▪
Lazy Evaluation
◦
Actions
▪
실제로 수행하여 데이터를 저장, 조회 등을 한다
◦
Persistence
▪
RDD 를 메모리 혹은 디스크에 캐시 ( Storage Level 을 사용하여 섞어서도 사용 가능 )
•
Lazy Evaluation + No Cache
◦
Wikipedia 데이터를 처리하는데 20초 정도 소요
◦
1TB 데이터를 처리하는데 170초 정도 소요
// Base RDD
val lines = saprk.textFile("hdfs://...")
// Transformed RDD
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t')(2))
// action #1
messages.filter(_.contains("foo")).count
// action #2
messages.filter(_.contains("bar")).count
Scala
복사
•
Lazy Evaluation + Cache
◦
Wikipedia 데이터를 처리하는데 2~3초 정도 소요
◦
1TB 데이터를 처리하는데 5 ~ 7초 정도 소요
// Base RDD
val lines = saprk.textFile("hdfs://...")
// Transformed RDD
val errors = lines.filter(_.startsWith("ERROR"))
val messages = errors.map(_.split('\t')(2))
// Message Cache
// Action 이 수행되기 전까지 cache 를 수행하지 않음
val cachedMsgs = messages.cache()
// action #1
cachedMsgs.filter(_.contains("foo")).count
// action #2
cachedMsgs.filter(_.contains("bar")).count
Scala
복사
•
Fault Toerance
◦
Lineage
▪
Transform 에 대한 시리지를 추적해서 장애난 서버가 처리하던 작업을 남은 서버가 처리하고 복구함
◦
Failure: Recompute lost data by lineage
DAG
•
reduce 의 경우 파티션의 개수가 변경된다 (Shuffle)
◦
파티션 개수를 정의하지 않으면 알아서 정의된다
•
Stage 는 Shuffle 이 일어나지 않는걸 기준으로 스테이지를 나눈다
sc.textFile("/wiki/pagecounts') // RDD[String]
.map(line => line.split("\t") // RDD[List[String]]
.map(R => (R[0], int(R[1])) // RDD[(String, Int)]
.reduceByKey(_+_, 3) // RDD[(String, Int)]
.collect() // Array[(String, Int)]
Scala
복사
•
Stage1
1.
Read HDFS Split
2.
Apply both the maps
3.
Start Partial Reduce
4.
Write Shuffle Data
•
Stage2
1.
Read shuffle data
2.
Final Reduce
3.
Send result to driver
Comining Processing Types
•
1개의 애플리케이션에서 다양한 언어를 사용할 수 있다
// Spark SQL
df.createOrReplaceTempView("historic_tweets")
val points = spark.sql("SELECT latitude, longitude from historic_tweets").cache()
// MLib
val kmeans = new KMeans().setK(10).setSeed(1L)
val model = kmeans.fit(points)
// Spark Streaming
TwitterUtils.createStream(ssc, None)
.map(t => (model.computeCost(t.location), 1))
.reduceByKeyAndWindow(_ + _, Seconds(60))
Scala
복사
Large-Scale Usage
•
Largest Cluster: 8,000 Nodes (Tecent)
•
Top Streaming Intake: 1TB/Hour (HHMI Janelia Farm)
•
Largest Single Job: 1PB ( Alibaba, Databricks)
•
2014 On-Disk Sort Record: Fatest Open Source Engine for storing a PB
Competitors
•
Apache Strom
•
Apache Flink
실습
Hadoop 실행
$ ./hadoop/sbin/start-all.sh
# HDFS WEB UI
$ curl http://localhost:50070
# Yarn WEB UI
$ curl http://localhost:8088
Shell
복사
Spark Standalone 실행
$ cp ./spark3/conf/workers.template ./spark3/conf/workers
$ vim ./spark3/conf/workers
$ vim ./spark3/conf/spark-env.sh
$ ./spark3/sbin/start-all.sh
# Spark Master WEB UI
$ curl http://localhost:8080
Shell
복사
•
workers
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# A Spark Worker will be started on each of the machines listed below.
worker-01
worker-02
worker-03
Shell
복사
•
spark-env.sh ( master, executor 모두 필요 )
# added by claude
SPARK_MASTER_PORT=7177
SPARK_MASTER_WEBUI_PORT=8180
SPARK_PUBLIC_DNS=${HOSTNAME}
SPARK_WORKER_PORT=40001
SPARK_WORKER_WEBUI_PORT=8181
JAVA_HOME=/home/ubuntu/jdk8
Shell
복사
Spark (Scala, Standalone) 실행
$ ./spark3/bin/spark-shell --master spark://master-01:7177 \
--executor-memory 2G \
--executor-cores 2 \
--total-executor-cores 6
scala> sc.master
res4: String = spark://master-01:7177
scala> sc.version
res5: String = 3.1.1
Shell
복사
Word Count
scala> val f = sc.textFile("README.md")
f: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
scala> val wc = f.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
wc: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:25
scala> wc.collect.foreach(println)
(package,1)
(this,1)
(integration,1)
(Python,2)
(cluster.,1)
(its,1)
([run,1)
(There,1)
(general,2)
(have,1)
(pre-built,1)
(Because,1)
(YARN,,1)
(locally,2)
(changed,1)
(locally.,1)
...
scala> wc.saveAsTextFile("wc_out.obj")
scala> wc.saveAsTextFile("hdfs://matser-01:9000/wc_out.obj")
Scala
복사