Spark
Streaming
Spark 는 죽었다깨어나도 배치다
•
DStream: RDD 기반의 스트리밍
◦
초창기 모델
•
Structued Streaming: DataFame 기반의 스트리밍
◦
아직 제약사항이 좀 있긴하지만, 운영환경에서 충분히 사용할만하고 계속해서 발전 중
•
대용량 스트리밍 데이터를 처리하기 위한 스트리밍
◦
(Kafka, Flume, HDFS, Kinesis, Twitter) ⇒ Spark Streaming ⇒ (HDFS, Databases, Dashboards)
•
Key Concepts
◦
Input DStream
▪
RDD 의 연속
◦
Transformations
▪
표준 RDD 의 작업 ( map, countByValue, reduceByKey, join 등 )
▪
상태 저장 작업 ( window, updateStateByKey 등 )
◦
Output Operations
Discretized Stream (DStream)
•
지속적으로 들어오는 데이터를 N초 간격으로 RDD 로 모아서 작은 배치 작업으로 처리함
•
데이터가 없으면 빈 RDD 를 만들어서 처리하기도함
•
배치 사이즈는 보통 500ms ~ 2s 정도로 처리함 ( latency 가 발생함 )
•
앞에 들어온 데이터를 모두 처리하고 난 후 다음 데이터를 처리함 ( 데이터의 순서가 보장되며, 데이터는 병렬적으로 처리되지 않음 )
•
Stateful
◦
window
▪
N초 단위로 분리된 RDD 들을 묶어서 1개의 Window 로 묶어서 한번에 연산을 처리할 수 있음 ( Window length, Sliding Interval )
▪
e.g) 최근 검색어 Top 5 (1시간) 을 개발
◦
updateStateByKey
▪
Key 를 기준으로 이전 시점에 처리했던 데이터를 업데이트함
실습
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(2))
// Input
val dstream_lines = ssc.socketTextStream("HOST", 9999)
// Transformation
val dstream_tsv = dstream_lines
.map(line => line.split("\t"))
.filter(arr => arr.length == 3)
.map(arr => ( arr(0), arr(1), arr(2) ))
val dstream_word_counts = dstream_tsv
.map(tuple => tuple._2)
.flatMap(document => document.split(" "))
.map(word => (word, 1))
.reduceByKey((v1, v2) => v1 + v2)
// Output
dstream_word_counts
.foreachRDD { (rdd, time) =>
rdd
.toDF("word", "count")
.select($"word", $"count".cast("long"))
.write
.format("csv")
.option("sep", ",")
.option("header", false)
.mode("append")
.saveAsTable("dstream_word_counts_foreachrdd")
}
ssc.start
Scala
복사
•
ssc.getState
◦
INITIALIZED: context 가 생성되었지만 시작되지 않은 상태
◦
ACTIVE: 시작된 상태
◦
STOPPED: 종료된 사앹
•
updateStateByKey
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(2))
// Input
val dstream_lines = ssc.socketTextStream("HOST", 9999)
// Transformation
val dstream_tsv = dstream_lines
.map(line => line.split("\t"))
.filter(arr => arr.length == 3)
.map(arr => ( arr(0), arr(1), arr(2) ))
val dstream_word_counts = dstream_tsv
.map(tuple => tuple._2)
.flatMap(document => document.split(" "))
.map(word => (word, 1))
.reduceByKey((v1, v2) => v1 + v2)
// Output
def updatefunction(newValues: Seq[Int], state: Option[Long]): Option[Long] = {
val newState = newValues.sum + state.getOrElse(0L)
Some(newState)
}
dstream_word_counts
.updateStateByKey(updateFunction)
.foreachRDD { (rdd, time) =>
rdd
.map(tuple => tuple._1 + "," + tuple._2)
.saveAsTextFile("hdfs://master:9000/data")
}
ssc.checkpoint("hdfs://master:9000/checkpoint")
ssc.start
Scala
복사
Structured Streaming
•
Structured Streaming 은 Engine 이 Transaction 을 처리해줌
•
Data Stream 으로 들어온 데이터들을 1개의 DataFrame 에 계속 추가하는 구조이다
◦
e.g) 쿼리를 실행할 때 마다 결과가 달라질 수 있다
◦
DStream 과는 조금 다른 컨셉을 가지고 있다
◦
새로운 데이터는 기존 테이블에 Append 되는 개념이다
•
논리적으로 데이터를 계속 쌓는다고 표현하지만, 실제 쿼리 결과를 Increment 하는 구현체이다
◦
SELECT * FROM ... 이런 형태의 코드는 작성이 불가능하다
•
Input Source 는 File Source, Kafka Source 만 제대로 지원하고, Socket Source, Rate Source 는 테스팅 용도로만 사용한다
•
메모리에 Sink 할 경우 Driver 의 메모리에 저장되기 때문에 디버깅 용도로만 사용해야 한다
실습
val df_stream_lines = spark
.readStream
.format("socket")
.option("host", "master-01")
.option("port", 9999)
.load()
val df_stream_tsv = df_stream_lines
.map(line => line.getAs[String](0).split("\t"))
.filter(arr => arr.length === 3)
.map(arr => ( arr(0), arr(1), arr(2) ))
.toDF("id", "document", "label")
val df_stream_word_counts = df_stream_tsv
.select("document")
.select(split($"document", " ").as("arr"))
.select(explode($"arr").as("word"))
.groupBy("word")
.agg(count("word").as("count"))
// File Write
val query = df_stream_tsv
.writeStream
.outputMode("append")
.option("checkpointLocation", "hdfs://master:9000/checkpoint")
.format("json")
.option("path", "hdfs://master:9000/data/df_stream_tsv.json")
.queryName("df_stream_tsv_json")
.start()
// Memory Write
val query2 = df_stream_tsv
.writeStream
.outputMode("update")
.format("memory")
.queryName("df_stream_word_counts_memory")
.start()
Scala
복사