Spark
Prerequisite
•
Kafka 클러스터 ( kafka 2.8.0 )
◦
Topic 2개 ( log, result )
Streaming ( + Watermark )
•
어느 시점 (이벤트 타임)을 기준으로 들어온 데이터를 가지고 있지 않고 버리는 방법
실습
•
Kafka Source
val df_stream_kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server-01:9092,server-02:9092,server-03:9092")
.option("subscribe", "log")
.load()
val df_stream_value = df_stream_kafka
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select($"value")
.as[String]
val df_stream_lines = df_stream_value
.filter(_ != null) //--filter dirty data....
.filter(!_.trim.equals("")) //--filter dirty data....
.map(_.split(","))
.filter(_.length == 2) //--filter dirty data....
.map(arr => (arr(0).trim, arr(1).trim))
.toDF("eventTime", "value")
.select($"value", to_timestamp($"eventTime", "yyyy-MM-dd HH:mm:ss").as("timestamp")) //--casting TimestampType....
.as[(String, java.sql.Timestamp)]
val df_stream_word_counts = df_stream_lines
.select(explode(split($"value", " ")).as("word")) //--split.... + explode....
.groupBy("word") //--groupBy....
.agg(count("word").as("count")) //--agg....
Scala
df_stream_kafka.printSchema
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
df_stream_value.printSchema
root
|-- value: string (nullable = true)
df_stream_lines.printSchema
root
|-- value: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
Scala
•
Kafka Sink
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val query_word_counts_kafka = df_stream_word_counts
.selectExpr("concat(word, '(', count, ')') as value")
.writeStream
.outputMode("complete")
.format("kafka")
.option("kafka.bootstrap.servers", "server-01:9092,server-02:9092,server-03:9092")
.option("topic", "result")
.option("checkpointLocation", "hdfs://master-01:9000/checkpoint/kafak_streaming")
.queryName("word_counts_kafka")
.start()
Scala
•
(Watermark) Kafka Source
val df_stream_kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "server-01:9092,server-02:9092,server-03:9092")
.option("subscribe", "log")
.load()
val df_stream_value = df_stream_kafka
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select($"value")
.as[String]
val df_stream_lines = df_stream_value
.filter(_ != null) //--filter dirty data....
.filter(!_.trim.equals("")) //--filter dirty data....
.map(_.split(","))
.filter(_.length == 2) //--filter dirty data....
.map(arr => (arr(0).trim, arr(1).trim))
.toDF("eventTime", "value")
.select($"value", to_timestamp($"eventTime", "yyyy-MM-dd HH:mm:ss").as("timestamp")) //--casting TimestampType....
.as[(String, java.sql.Timestamp)]
val df_stream_word_counts_window = df_stream_lines
.select(explode(split($"value", " ")).as("word"), $"timestamp")
.withWatermark("timestamp", "10 minutes")
.groupBy(window($"timestamp", "10 minutes", "5 minutes", "0 second"), $"word")
.agg(count("word").as("count"))
Scala
•
(Watermark) Kafka Sink
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val query_word_counts_window_kafka = df_stream_word_counts_window
.selectExpr("concat( cast(window as string), ' : ', word, '(', count, ')' ) as value")
.writeStream
.outputMode("complete")
.format("kafka")
.option("kafka.bootstrap.servers", "server-01:9092,server-02:9092,server-03:9092")
.option("topic", "result")
.option("checkpointLocation", "hdfs://master-01:9000/checkpoint/kafak_streaming")
.queryName("word_counts_kafka")
.start()
Scala
•