Blog

Apache Spark 을 이용한 빅데이터 분석 (10)

Spark

Prerequisite

Kafka 클러스터 ( kafka 2.8.0 )
Topic 2개 ( log, result )

Streaming ( + Watermark )

어느 시점 (이벤트 타임)을 기준으로 들어온 데이터를 가지고 있지 않고 버리는 방법

실습

Kafka Source
val df_stream_kafka = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "server-01:9092,server-02:9092,server-03:9092") .option("subscribe", "log") .load() val df_stream_value = df_stream_kafka .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .select($"value") .as[String] val df_stream_lines = df_stream_value .filter(_ != null) //--filter dirty data.... .filter(!_.trim.equals("")) //--filter dirty data.... .map(_.split(",")) .filter(_.length == 2) //--filter dirty data.... .map(arr => (arr(0).trim, arr(1).trim)) .toDF("eventTime", "value") .select($"value", to_timestamp($"eventTime", "yyyy-MM-dd HH:mm:ss").as("timestamp")) //--casting TimestampType.... .as[(String, java.sql.Timestamp)] val df_stream_word_counts = df_stream_lines .select(explode(split($"value", " ")).as("word")) //--split.... + explode.... .groupBy("word") //--groupBy.... .agg(count("word").as("count")) //--agg....
Scala
복사
df_stream_kafka.printSchema root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true) df_stream_value.printSchema root |-- value: string (nullable = true) df_stream_lines.printSchema root |-- value: string (nullable = true) |-- timestamp: timestamp (nullable = true)
Scala
복사
Kafka Sink
import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ val query_word_counts_kafka = df_stream_word_counts .selectExpr("concat(word, '(', count, ')') as value") .writeStream .outputMode("complete") .format("kafka") .option("kafka.bootstrap.servers", "server-01:9092,server-02:9092,server-03:9092") .option("topic", "result") .option("checkpointLocation", "hdfs://master-01:9000/checkpoint/kafak_streaming") .queryName("word_counts_kafka") .start()
Scala
복사
(Watermark) Kafka Source
val df_stream_kafka = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "server-01:9092,server-02:9092,server-03:9092") .option("subscribe", "log") .load() val df_stream_value = df_stream_kafka .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .select($"value") .as[String] val df_stream_lines = df_stream_value .filter(_ != null) //--filter dirty data.... .filter(!_.trim.equals("")) //--filter dirty data.... .map(_.split(",")) .filter(_.length == 2) //--filter dirty data.... .map(arr => (arr(0).trim, arr(1).trim)) .toDF("eventTime", "value") .select($"value", to_timestamp($"eventTime", "yyyy-MM-dd HH:mm:ss").as("timestamp")) //--casting TimestampType.... .as[(String, java.sql.Timestamp)] val df_stream_word_counts_window = df_stream_lines .select(explode(split($"value", " ")).as("word"), $"timestamp") .withWatermark("timestamp", "10 minutes") .groupBy(window($"timestamp", "10 minutes", "5 minutes", "0 second"), $"word") .agg(count("word").as("count"))
Scala
복사
(Watermark) Kafka Sink
import org.apache.spark.sql.streaming.Trigger import scala.concurrent.duration._ val query_word_counts_window_kafka = df_stream_word_counts_window .selectExpr("concat( cast(window as string), ' : ', word, '(', count, ')' ) as value") .writeStream .outputMode("complete") .format("kafka") .option("kafka.bootstrap.servers", "server-01:9092,server-02:9092,server-03:9092") .option("topic", "result") .option("checkpointLocation", "hdfs://master-01:9000/checkpoint/kafak_streaming") .queryName("word_counts_kafka") .start()
Scala
복사