Blog

Apache Spark 을 이용한 빅데이터 분석 (9)

Spark

Streaming

Spark 는 죽었다깨어나도 배치다
DStream: RDD 기반의 스트리밍
초창기 모델
Structued Streaming: DataFame 기반의 스트리밍
아직 제약사항이 좀 있긴하지만, 운영환경에서 충분히 사용할만하고 계속해서 발전 중
대용량 스트리밍 데이터를 처리하기 위한 스트리밍
(Kafka, Flume, HDFS, Kinesis, Twitter) ⇒ Spark Streaming ⇒ (HDFS, Databases, Dashboards)
Key Concepts
Input DStream
RDD 의 연속
Transformations
표준 RDD 의 작업 ( map, countByValue, reduceByKey, join 등 )
상태 저장 작업 ( window, updateStateByKey 등 )
Output Operations

Discretized Stream (DStream)

지속적으로 들어오는 데이터를 N초 간격으로 RDD 로 모아서 작은 배치 작업으로 처리함
데이터가 없으면 빈 RDD 를 만들어서 처리하기도함
배치 사이즈는 보통 500ms ~ 2s 정도로 처리함 ( latency 가 발생함 )
앞에 들어온 데이터를 모두 처리하고 난 후 다음 데이터를 처리함 ( 데이터의 순서가 보장되며, 데이터는 병렬적으로 처리되지 않음 )
Stateful
window
N초 단위로 분리된 RDD 들을 묶어서 1개의 Window 로 묶어서 한번에 연산을 처리할 수 있음 ( Window length, Sliding Interval )
e.g) 최근 검색어 Top 5 (1시간) 을 개발
updateStateByKey
Key 를 기준으로 이전 시점에 처리했던 데이터를 업데이트함

실습

import org.apache.spark.streaming._ val ssc = new StreamingContext(sc, Seconds(2)) // Input val dstream_lines = ssc.socketTextStream("HOST", 9999) // Transformation val dstream_tsv = dstream_lines .map(line => line.split("\t")) .filter(arr => arr.length == 3) .map(arr => ( arr(0), arr(1), arr(2) )) val dstream_word_counts = dstream_tsv .map(tuple => tuple._2) .flatMap(document => document.split(" ")) .map(word => (word, 1)) .reduceByKey((v1, v2) => v1 + v2) // Output dstream_word_counts .foreachRDD { (rdd, time) => rdd .toDF("word", "count") .select($"word", $"count".cast("long")) .write .format("csv") .option("sep", ",") .option("header", false) .mode("append") .saveAsTable("dstream_word_counts_foreachrdd") } ssc.start
Scala
ssc.getState
INITIALIZED: context 가 생성되었지만 시작되지 않은 상태
ACTIVE: 시작된 상태
STOPPED: 종료된 사앹
updateStateByKey
import org.apache.spark.streaming._ val ssc = new StreamingContext(sc, Seconds(2)) // Input val dstream_lines = ssc.socketTextStream("HOST", 9999) // Transformation val dstream_tsv = dstream_lines .map(line => line.split("\t")) .filter(arr => arr.length == 3) .map(arr => ( arr(0), arr(1), arr(2) )) val dstream_word_counts = dstream_tsv .map(tuple => tuple._2) .flatMap(document => document.split(" ")) .map(word => (word, 1)) .reduceByKey((v1, v2) => v1 + v2) // Output def updatefunction(newValues: Seq[Int], state: Option[Long]): Option[Long] = { val newState = newValues.sum + state.getOrElse(0L) Some(newState) } dstream_word_counts .updateStateByKey(updateFunction) .foreachRDD { (rdd, time) => rdd .map(tuple => tuple._1 + "," + tuple._2) .saveAsTextFile("hdfs://master:9000/data") } ssc.checkpoint("hdfs://master:9000/checkpoint") ssc.start
Scala

Structured Streaming

Structured Streaming 은 Engine 이 Transaction 을 처리해줌
Data Stream 으로 들어온 데이터들을 1개의 DataFrame 에 계속 추가하는 구조이다
e.g) 쿼리를 실행할 때 마다 결과가 달라질 수 있다
DStream 과는 조금 다른 컨셉을 가지고 있다
새로운 데이터는 기존 테이블에 Append 되는 개념이다
논리적으로 데이터를 계속 쌓는다고 표현하지만, 실제 쿼리 결과를 Increment 하는 구현체이다
SELECT * FROM ... 이런 형태의 코드는 작성이 불가능하다
Input Source 는 File Source, Kafka Source 만 제대로 지원하고, Socket Source, Rate Source 는 테스팅 용도로만 사용한다
메모리에 Sink 할 경우 Driver 의 메모리에 저장되기 때문에 디버깅 용도로만 사용해야 한다

실습

val df_stream_lines = spark .readStream .format("socket") .option("host", "master-01") .option("port", 9999) .load() val df_stream_tsv = df_stream_lines .map(line => line.getAs[String](0).split("\t")) .filter(arr => arr.length === 3) .map(arr => ( arr(0), arr(1), arr(2) )) .toDF("id", "document", "label") val df_stream_word_counts = df_stream_tsv .select("document") .select(split($"document", " ").as("arr")) .select(explode($"arr").as("word")) .groupBy("word") .agg(count("word").as("count")) // File Write val query = df_stream_tsv .writeStream .outputMode("append") .option("checkpointLocation", "hdfs://master:9000/checkpoint") .format("json") .option("path", "hdfs://master:9000/data/df_stream_tsv.json") .queryName("df_stream_tsv_json") .start() // Memory Write val query2 = df_stream_tsv .writeStream .outputMode("update") .format("memory") .queryName("df_stream_word_counts_memory") .start()
Scala