Spark
Structured Streaming
•
System 의 시간과 관계없이 들어온 데이터의 timestamp 를 참조하여 데이터를 처리한다.
•
윈도우는 시작시간 ≤ 현재시간 < 종료시간 을 기준으로 데이터를 자른다. ( 엔진 설명 문서가 잘못되어있음 )
•
데이터를 집계할 때 과거 데이터를 사용하지 않으려면 Watermark 를 사용한다
Prerequisite
•
Python 3.7
Prepare
•
all node
$ sudo python3.7 -m pip install -U numpy pandas sklearn matplotlib hdfs
$ sudo python3.7 -m pip list
Shell
복사
•
master node
$ wget -O $HOME/spark3/spark-streaming-twitter_2.12-2.4.0.jar https://repo1.maven.org/maven2/org/apache/bahir/spark-streaming-twitter_2.12/2.4.0/spark-streaming-twitter_2.12-2.4.0.jar
$ wget -O $HOME/spark3/spark-tags_2.12-3.1.1.jar https://repo1.maven.org/maven2/org/apache/spark/spark-tags_2.12/3.1.1/spark-tags_2.12-3.1.1.jar
$ wget -O $HOME/spark3/twitter4j-stream-4.0.6.jar https://repo1.maven.org/maven2/org/twitter4j/twitter4j-stream/4.0.6/twitter4j-stream-4.0.6.jar
$ wget -O $HOME/spark3/twitter4j-core-4.0.6.jar https://repo1.maven.org/maven2/org/twitter4j/twitter4j-core/4.0.6/twitter4j-core-4.0.6.jar
$ wget -O $HOME/spark3/gson-2.8.5.jar https://repo1.maven.org/maven2/com/google/code/gson/gson/2.8.5/gson-2.8.5.jar
Shell
복사
•
Zeppelin Interpreter 설정에 위의 5가지 jar 정보 추가
실습
•
트위터 인증 모듈
object Utils extends Serializable {
import twitter4j.auth.OAuthAuthorization
import twitter4j.conf.ConfigurationBuilder
def getAuth = {
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = Array("{{ consumerKey }}", "{{ consumerSecret }}", "{{ accessToken }}", "{{ accessTokenSecret }}")
val builder: ConfigurationBuilder = new ConfigurationBuilder()
builder.setOAuthConsumerKey(consumerKey);
builder.setOAuthConsumerSecret(consumerSecret);
builder.setOAuthAccessToken(accessToken);
builder.setOAuthAccessTokenSecret(accessTokenSecret);
Some(new OAuthAuthorization(builder.build()))
}
}
Scala
복사
•
트위터 데이터 수집
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
import com.google.gson.Gson
import twitter4j.conf.ConfigurationBuilder
import twitter4j.auth.OAuthAuthorization
val Array(outputDirectory, numTweetsToCollect, intervalSecs, partitionsEachInterval) = Array("hdfs://master-01:9000/data/twitter/tweets_", "2000", "5", "1")
val ssc = new StreamingContext(sc, Seconds(intervalSecs.toLong))
val tweetInputDStream = TwitterUtils.createStream(ssc, Utils.getAuth)
val tweetStream = tweetInputDStream.mapPartitions(iter => {
var gson = new Gson()
iter.map(gson.toJson(_))
})
var numTweetsCollected = 0L
tweetStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0) {
rdd.coalesce(partitionsEachInterval.toInt).saveAsTextFile(outputDirectory + time.milliseconds.toString)
numTweetsCollected += count
if (numTweetsCollected >= numTweetsToCollect.toLong) {
ssc.stop(false)
}
}
println(s"""${numTweetsCollected}개 수집 완료""")
})
Scala
복사
•
데이터 읽기
import pyspark
df_tweet = spark \
.read \
.json("hdfs://master-01:9000/data/twitter/tweets_*") \
.presist(pyspark.StorageLevel.MEMORY_ONLY)
df_tweet.createOrReplaceTempView("df_tweet")
Python
복사
•
학습
from pyspark.sql.functions import *
from pyspark.sql.types import Row
from pyspark.ml.feature import HashingTF
from pyspark.ml.clustering import KMeans, KMeansModel
tweet_path, model_path, num_cluster, num_iteration = "hdfs://master-01:9000/data/twitter/tweets_*", "hdfs://master-01:9000/model/twitter/kmeans", 100, 50
df_tweet = spark.read.json(tweet_path)
rdd_texts = df_tweet.rdd.map(lambda row: row['text'])
n = 2
rdd_terms = rdd_texts.map(lambda text: [text[i:i+n] for i in range(len(text)-(n-1))])
df_terms = rdd_terms.map(lambda x: Row(terms=x)).toDF()
hashingTF = HashingTF(inputCol="terms", outputCol="features", numFeatures=1000)
df_features = hashingTF.transform(df_terms)
df_features.cache()
kmeans = KMeans() \
.setFeatureCol("features") \
.setPredictionCol("prediction") \
.setK(num_cluster) \
.setInitMode("k-means||") \
.setMaxIter(num_iteration) \
.setSeed(11)
model = kmeans.fit(df_features)
model.write().overwrite().save(model_path)
df_features.unpersist()
Python
복사
•
타겟 클러스터 선택
n = 2
df_some_tweets = spark \
.sql("SELECT lang, text from df_tweet LIMIT 2000") \
.rdd \
.map(lambda row: Row(lang=row["lang"], text=row["text"], text_ngram=([row["text"][i:i+n] for i in range(len(row["text"])-(n-1))]))) \
.toDF()
hashingTF2 = HashingTF(inputCol="text_ngram", outputCol="features", numFeatures=1000)
df_some_features = hashingTF2.transform(df_somw_tweets)
model = KMeansModel.load(df_some_tweets)
df_some_transformed = model2.transform(df_some_features)
df_some_transformed.cache()
for num in range(num_cluster):
df_some_transformed.where(col('prediction') == num).select("lang", regexp_replace("text", "\n", " ").alias("text"), "prediction").show(3, truncate=False)
df_some_transformed.unpersist()
Python
복사
◦
원하는 데이터들이 모인 클러스터 번호를 찾아냄
•
서비스 적용하기 ( Predict, Filter, Save ... )
import org.apache.spark.ml.clustering.KMeansModel
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
val Array(modelPath, clusterNumber) = Array("hdfs://master-01:9000/model/twitter/kmeans", "63")
val ssc = new StreamingContext(sc, Seconds(intervalSecs.toLong))
val tweets = TwitterUtils.createStream(ssc, Utils.getAuth)
val tweetsText = tweets.map(status => (status.getLang, status.getText))
val model = KMeansModel.load(modelPath)
val hashingTF = new HashingTF()
.setInputCol("text_ngram")
.setOutputCol("features")
.setNumFeatures(1000)
tweetsText.foreachRDD((rdd, time) => {
val df_tweets = rdd
.map(lang_text => (lang_text._1, lang_text._2, lang_text._2.sliding(2).toSeq, time.milliseconds)) //--n-gram DataFrame 생성....
.toDF("lang", "text", "text_ngram", "time")
val df_features = hashingTF.transform(df_tweets)
val df_transformed = model.transform(df_features)
val df_tweet_filtered = df_transformed
.where($"prediction" === clusterNumber.toInt)
.select($"lang", regexp_replace($"text", "\n", " ").as("text"), $"time")
df_tweet_filtered.createOrReplaceTempView("df_tweet_filtered")
df_tweet_filtered.write.mode("append").saveAsTable("df_tweet_filtered_append")
})
ssc.start()
Scala
복사