使用MLlib库中的机器学习算法对垃圾邮件进行分类
分类的垃圾邮件的如图中分成4个文件夹,两个文件夹是训练集合,两个文件夹是测试集合
build.sbt文件
name := "spark-first"version := "1.0"scalaVersion := "2.11.8"libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.1.0", "org.apache.hadoop" % "hadoop-common" % "2.7.2", "mysql" % "mysql-connector-java" % "5.1.31", "org.apache.spark" %% "spark-sql" % "2.1.0", "org.apache.spark" %% "spark-streaming" % "2.1.0", "org.apache.spark" % "spark-mllib_2.11" % "2.1.0")
代码
import org.apache.hadoop.io.{IntWritable, LongWritable, MapWritable, Text}import org.apache.spark.SparkContextimport org.apache.spark.SparkConfimport org.apache.spark._import org.apache.hadoop.mapreduce.Jobimport org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormatimport org.apache.hadoop.mapreduce.lib.output.TextOutputFormatimport org.apache.spark.sql.SQLContextimport java.util.Propertiesimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.streaming.StreamingContext._import org.apache.spark.streaming.dstream.DStreamimport org.apache.spark.streaming.Durationimport org.apache.spark.streaming.Secondsimport org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.feature.HashingTFimport org.apache.spark.mllib.classification.LogisticRegressionWithSGD/** * Created by common on 17-4-6. */object SparkRDD { def main(args: Array[String]) { val conf = new SparkConf().setAppName("WordCount").setMaster("local") val sc = new SparkContext(conf) val spam = sc.textFile("input/email/spam") val normal = sc.textFile("input/email/ham") // 创建一个HashingTF实例来把邮件文本映射为包含10000个特征的向量 val tf = new HashingTF(numFeatures = 10000) // 各邮件都被切分为单词,每个单词被映射为一个特征 val spamFeatures = spam.map(email => tf.transform(email.split(" "))) val normalFeatures = normal.map(email => tf.transform(email.split(" "))) // 创建LabeledPoint数据集分别存放阳性(垃圾邮件)和阴性(正常邮件)的例子 val positiveExamples = spamFeatures.map(features => LabeledPoint(1, features)) val negativeExamples = normalFeatures.map(features => LabeledPoint(0, features)) val trainingData = positiveExamples.union(negativeExamples) trainingData.cache() // 因为逻辑回归是迭代算法,所以缓存训练数据RDD // 使用SGD算法运行逻辑回归 val model = new LogisticRegressionWithSGD().run(trainingData) // 以阳性(垃圾邮件)和阴性(正常邮件)的例子分别进行测试 val posTest = tf.transform( "Experience with BiggerPenis Today! Grow 3-inches more ...".split(" ")) val negTest = tf.transform( "That is cold. Is there going to be a retirement party? ...".split(" ")) println("Prediction for positive test example: " + model.predict(posTest)) println("Prediction for negative test example: " + model.predict(negTest)) }}
结果