Spark
Word2vec
•
단어를 벡터로 바꿔주는 알고리즘
•
벡터의 차원이 많아질수록 메모리를 많이 사용하게 되기 때문에, driver 와 executor 의 메모리를 충분히 확보해줘야한다.
•
Spark 는 Skip-gram 만 지원한다.
◦
CBOW 보다는 Skip-gram 이 좀 더 좋다.
◦
일반적으로 Skip-gram 을 사용하는것을 권장한다
•
예시 사이트: https://word2vec.kr/search/
Prerequisite
•
Spark 메모리 확보
◦
spark.driver.memory: 6g
◦
spark.executor.memory: 6g
•
선형대수(linear algebra) 라이브러리 다운로드
$ wget -O $HOME/spark3/jblas-1.2.4.jar https://repo1.maven.org/maven2/org/jblas/jblas/1.2.4/jblas-1.2.4.jar
Plain Text
복사
•
Spark jars 설정
◦
spark.jars: ...$HOME/spark3/jblas-1.2.4.jar
•
위키피디아 데이터
◦
다운로드 사이트: https://dumps.wikimedia.org/kowiki/
$ mkdir -p $HOME/data/wiki
$ wget -O $HOME/data/wiki/kowiki-latest-pages-articles.xml.bz2 https://dumps.wikimedia.org/kowiki/latest/kowiki-latest-pages-articles.xml.bz2
$ sudo pip install wikiextractor
$ python -m wikiextractor.WikiExtractor $HOME/data/wiki/kowiki-latest-pages-articles.xml.bz2
$ hdfs dfs -mkdir -p /data/wiki
$ hdfs dfs -put -f /$HOME/data/wiki/text/ /data/wiki/
Plain Text
복사
모델 준비 실습
위키피디아 데이터 불러오기
val ko_wiki_df = spark
.read
.option('lineSep', '</doc>')
.textFile('hdfs://master-01:9000/data/wiki/text/*/wiki*')
Scala
복사
제목과 내용 분리
val ko_wiki_df2 = ko_wiki_df
.select(split($"value", " title=\"", 2)(1).as("value"))
.select(split($"value", "\">", 2).as("value"))
.select($"value"(0).as("title"), trim($"value"(1)).as("content"))
ko_wiki_df2.createOrReplaceTempView("ko_wiki_parsed")
Scala
복사
NLTK, KoNLPy 설치
$ pip install -U nltk konlpy
$ PYTHON_COMMAND_TEMP="""
import nltk, konlpy
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
""
$ python -c "$PYTHON_COMMAND_TEMP"
Plain Text
복사
NLTK, KoNLPy 테스트
from nltk.tokenize import word_tokenize
from nltk.tag import pos_tag
from konlpy.tag import Okt, Kkma
text = "Hello World. My name is blabla"
print(word_tokenize(text))
# ['Hello', 'World', '.', 'My', 'name', 'is', 'blabla']
print(pos_tag(word_tokenize(text)))
# [('Hello', 'NNP'), ('World', 'NNP'), ('.', '.'), ('My', 'PRP$'), ('name', 'NN'), ('is', 'VBZ'), ('blabla', 'JJ')]
text_ko = '안녕하세요. 블라블라블라입니다. 여행을 가봐요'
okt = Okt()
print(okt.morphs(text_ko))
# ['안녕하세요', '.', '블라블라', '블', '라', '입니다', '.', '여행', '을', '가봐요']
print(okt.pos(text_ko))
# [('안녕하세요', 'Adjective'), ('.', 'Punctuation'), ('블라블라', 'Noun'), ('블', 'Noun'), ('라', 'Josa'), ('입니다', 'Adjective'), ('.', 'Punctuation'), ('여행', 'Noun'), ('을', 'Josa'), ('가봐요', 'Verb')]
kkma = Kkma()
print(kkma.morphs(text_ko))
# ['안녕', '하', '세요', '.', '블라', '블라', '블라', '이', 'ㅂ니다', '.', '여행', '을', '가보', '아요']
print(kkma.pos(text_ko))
# [('안녕', 'NNG'), ('하', 'XSV'), ('세요', 'EFN'), ('.', 'SF'), ('블라', 'NNG'), ('블라', 'NNG'), ('블라', 'NNG'), ('이', 'VCP'), ('ㅂ니다', 'EFN'), ('.', 'SF'), ('여행', 'NNG'), ('을', 'JKO'), ('가보', 'VV'), ('아요', 'EFN')]
Python
복사
학습데이터 형태소 분석
from pyspark.sql.functions import *
from pyspark.sql.types import Row
from konlpy.tag import Okt
okt = Okt()
ko_wiki_train_data_df = spark \
.table("ko_wiki_parsed") \
.where(col("title").isNotNull()) \
.where(col("content").isNotNull()) \
.rdd \
.map(lambda row: Row(title=row.title, content=row.content, text=Okt().nouns(row.content))) \
.toDF()
ko_wiki_train_data_df = ko_wiki_train_data_df.rdd.coalesce(10).toDF()
ko_wiki_train_data_df.createOrReplaceTempView("ko_wiki_train_data")
ko_wiki_train_data_df \
.write \
.format("parquet") \
.mode("overwrite") \
.saveAsTable("ko_wiki_train_data_table")
Python
복사
학습 데이터 캐시
val ko_wiki_train_data_hdfs_df = spark.read.load("hdfs://master-01:9000/data/warehouse/ko_wiki_train_data_table")
ko_wiki_train_data_hdfs_df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
Scala
복사
Word2vec 트레이닝
import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}
val word2vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setStepSize(0.025)
.setMaxSentenceLength(1000)
.setMinCount(5)
.setSeed(1L)
.setVectorSize(100)
.setWindowSize(5)
val model = word2vec.fit(ko_wiki_train_data_hdfs_df)
val ko_wiki_predicted_df = model.transform(ko_wiki_train_data_hdfs_df)
model
.write
.overwrite()
.save("hdfs://master-01:9000/model/wirki/word2vc/ml_v100")
Scala
복사
모델 사용 실습
Word2vec 모델 불러오기
import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}
val sameModel = Word2VecModel.load("hdfs://master-01:9000/model/wiki/word2vec/ml_v100")
val df_vectors = sameModel.getVectors.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
Scala
복사
벡터 연산을 위해 DoubleMatrix 생성 함수
•
spark 에서 제공하는 vector 에는 연산이 없기 때문에 jblas 에 있는 DoubleMatrix 으로 변환
•
DenseVector 가 조금 더 메모리 효율적임
import org.jblas.DoubleMatrix
import org.jblas.{DoubleMatrix => DM}
import breeze.linalg.DenseVector
import breeze.linalg.{DenseVector => DV}
import org.apache.spark.ml.linalg.Vector
def word2DM(word: String) = {
new DM(df_vectors.where($"word" === word).head.getAs[Vector]("vector").toArray)
}
def word2DV(word: String) = {
new DV(df_vectors.where('word === word).head.getAs[Vector]("vector").toArray)
}
Scala
복사
연산 함수 정의
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.DataFrame
def searchByRelation(positiveWord: String, positiveWord2: String, num: Int): DataFrame = {
val target = word2DM(positiveWord).add(word2DM(positiveWord2))
val result = sameModel.findSynonyms(Vectors.dense(target.toArray), num)
result
}
def searchByRelation(positiveWord: String, negativeWord: String, positiveWord2: String, num: Int): DataFrame = {
val target = word2DM(positiveWord).sub(word2DM(negativeWord)).add(word2DM(positiveWord2))
val result = sameModel.findSynonyms(Vectors.dense(target.toArray), num)
result
}
Scala
복사
검색
val positive_word_01 = "서울"
val negative_word_02 = "대한민국"
val positive_word_03 = "일본"
val result = searchByRelation(positive_word_01, negative_word_02, positive_word_03, 1)
result.show()
+----+-----------------+
|word| similarity|
+----+-----------------+
|도쿄|0.658542811870575|
+----+-----------------+
Scala
복사
차원 축소 ( PCA )
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.DenseMatrix
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vector
val pca = new PCA()
.setInputCol("vector")
.setOutputCol("pca_features")
.setK(2)
val pca_model = pca.fit(df_vectors)
val sameModelVectorsPCA = pca_model.transform(df_vectors)
val seq2: Seq[(String, Vector)] = sameModelVectorsPCA
.map(row => (row.getString(0), row.getAs[Vector](2)))
.collect()
.toSeq
Scala
복사
추천 시스템
•
Collaborative Filtering ( 협업 필터링 )
◦
유사한 행위를 한 사람들을 대상으로 세그먼트로 분류하여 추천한다
◦
matrix completion 라고도 불린다
•
순서
◦
데이터 수집 → 데이터 탐색 및 정렬 → 모델 생성 → 추천 → 모델 평가 → 하이퍼 파라미터 튜닝