Blog

Apache Spark 을 이용한 빅데이터 분석 (4)

Spark

Spark 이해하기

Hadoop MapReduce 대비 100배까지 성능 향상 ( 애플리케이션 비지니스 로직에 따라 많이 차이남 )
python 으로 코드를 작성할 경우 결도의 Python Process 에서 처리하고 결과값을 JVM 으로 가져오기 때문에 Performance 가 상대적으로 조금 느리다

어드민 WEB UI

포트는 수정이 가능하다
Standalone
http://{IP}:8080
Worker
http://{IP}:8081
Driver
DAG 정보를 보여준다
http://{IP}:4040
History
http://{IP}:18080

Hadoop - on Disk Limitations

데이터를 직렬화 하고, 복제화하여 디스크에 저장하기 때문에 느리다
각 iteration (Map, Reduce) 을 수행할 때 마다 디스크에 작성해야 한다
Iterative Algorithms, Interactive Data Mining 에 대해서 굉장히 오래걸린다

Spark - In Memory + DAG

Iteration 을 수행할 때 자주 사용되는 데이터는 메모리에 캐시한다
e.g) flatMap → filter → map → reduceByKey → ...

Spark vs Hadoop

100TiB 데이터를 기준으로 정렬을 수행
Hadoop
Execution Time: 72 min
Node: 2,100
Core: 50,400
Spark
Execution Time: 23 min
Node: 206
Core: 6,592
10배 정도 적은 Node 로 3배 이상 빠르게 처리함
클라우드 환경에서 처리한다고 했을 때 Spark 가 3배 이상 비용이 저렴함
수행시간이 짧고, Node 대수가 적기 때문에
알고리즘 처리에서 처리 성능 속도가 굉장히 많이 차이남 ( Iterative Algorithm in Memory )
Hadoop 와 비교하여 사용하기 편하다
val spark = new SparkContext(master, appName, (sparkHome), (jars)) val file = spark.textFile("hdfs://...") val counts = file.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile("hdfs://...")
Scala
Hadoop 대비 Spark 가 훨씬 다양한 API 를 제공한다

Spark Components

Spark Driver
사용자가 작성한 애플리케이션
Spark Context
Spark Application
Driver 와 Executor 들에서 실행되는 프로그램

Spark Context

Spark 프로그램의 기본은 Spark Context 를 만들어야 한다
Spark Context 를 통해서 Cluster Resource Manager 통신하여 Executor 를 할당 받는다

RDD

분산환경 컬렉션이지만, 로컬에서 하나의 객체처럼 사용하면 된다
Programming Interface
Transformations
항상 새로운 Dataset 을 생성함
Lazy Evaluation
Actions
실제로 수행하여 데이터를 저장, 조회 등을 한다
Persistence
RDD 를 메모리 혹은 디스크에 캐시 ( Storage Level 을 사용하여 섞어서도 사용 가능 )
Lazy Evaluation + No Cache
Wikipedia 데이터를 처리하는데 20초 정도 소요
1TB 데이터를 처리하는데 170초 정도 소요
// Base RDD val lines = saprk.textFile("hdfs://...") // Transformed RDD val errors = lines.filter(_.startsWith("ERROR")) val messages = errors.map(_.split('\t')(2)) // action #1 messages.filter(_.contains("foo")).count // action #2 messages.filter(_.contains("bar")).count
Scala
Lazy Evaluation + Cache
Wikipedia 데이터를 처리하는데 2~3초 정도 소요
1TB 데이터를 처리하는데 5 ~ 7초 정도 소요
// Base RDD val lines = saprk.textFile("hdfs://...") // Transformed RDD val errors = lines.filter(_.startsWith("ERROR")) val messages = errors.map(_.split('\t')(2)) // Message Cache // Action 이 수행되기 전까지 cache 를 수행하지 않음 val cachedMsgs = messages.cache() // action #1 cachedMsgs.filter(_.contains("foo")).count // action #2 cachedMsgs.filter(_.contains("bar")).count
Scala
Fault Toerance
Lineage
Transform 에 대한 시리지를 추적해서 장애난 서버가 처리하던 작업을 남은 서버가 처리하고 복구함
Failure: Recompute lost data by lineage

DAG

reduce 의 경우 파티션의 개수가 변경된다 (Shuffle)
파티션 개수를 정의하지 않으면 알아서 정의된다
Stage 는 Shuffle 이 일어나지 않는걸 기준으로 스테이지를 나눈다
sc.textFile("/wiki/pagecounts') // RDD[String] .map(line => line.split("\t") // RDD[List[String]] .map(R => (R[0], int(R[1])) // RDD[(String, Int)] .reduceByKey(_+_, 3) // RDD[(String, Int)] .collect() // Array[(String, Int)]
Scala
Stage1
1.
Read HDFS Split
2.
Apply both the maps
3.
Start Partial Reduce
4.
Write Shuffle Data
Stage2
1.
Read shuffle data
2.
Final Reduce
3.
Send result to driver

Comining Processing Types

1개의 애플리케이션에서 다양한 언어를 사용할 수 있다
// Spark SQL df.createOrReplaceTempView("historic_tweets") val points = spark.sql("SELECT latitude, longitude from historic_tweets").cache() // MLib val kmeans = new KMeans().setK(10).setSeed(1L) val model = kmeans.fit(points) // Spark Streaming TwitterUtils.createStream(ssc, None) .map(t => (model.computeCost(t.location), 1)) .reduceByKeyAndWindow(_ + _, Seconds(60))
Scala

Large-Scale Usage

Largest Cluster: 8,000 Nodes (Tecent)
Top Streaming Intake: 1TB/Hour (HHMI Janelia Farm)
Largest Single Job: 1PB ( Alibaba, Databricks)
2014 On-Disk Sort Record: Fatest Open Source Engine for storing a PB

Competitors

Apache Strom
Apache Flink

실습

Hadoop 실행

$ ./hadoop/sbin/start-all.sh # HDFS WEB UI $ curl http://localhost:50070 # Yarn WEB UI $ curl http://localhost:8088
Shell

Spark Standalone 실행

$ cp ./spark3/conf/workers.template ./spark3/conf/workers $ vim ./spark3/conf/workers $ vim ./spark3/conf/spark-env.sh $ ./spark3/sbin/start-all.sh # Spark Master WEB UI $ curl http://localhost:8080
Shell
workers
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # A Spark Worker will be started on each of the machines listed below. worker-01 worker-02 worker-03
Shell
spark-env.sh ( master, executor 모두 필요 )
# added by claude SPARK_MASTER_PORT=7177 SPARK_MASTER_WEBUI_PORT=8180 SPARK_PUBLIC_DNS=${HOSTNAME} SPARK_WORKER_PORT=40001 SPARK_WORKER_WEBUI_PORT=8181 JAVA_HOME=/home/ubuntu/jdk8
Shell

Spark (Scala, Standalone) 실행

$ ./spark3/bin/spark-shell --master spark://master-01:7177 \ --executor-memory 2G \ --executor-cores 2 \ --total-executor-cores 6 scala> sc.master res4: String = spark://master-01:7177 scala> sc.version res5: String = 3.1.1
Shell

Word Count

scala> val f = sc.textFile("README.md") f: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24 scala> val wc = f.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) wc: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:25 scala> wc.collect.foreach(println) (package,1) (this,1) (integration,1) (Python,2) (cluster.,1) (its,1) ([run,1) (There,1) (general,2) (have,1) (pre-built,1) (Because,1) (YARN,,1) (locally,2) (changed,1) (locally.,1) ... scala> wc.saveAsTextFile("wc_out.obj") scala> wc.saveAsTextFile("hdfs://matser-01:9000/wc_out.obj")
Scala