随着项目的运营,收集了很多的用户数据。最近业务上想做些社交图谱相关的产品,但因为数据很多、很杂,传统的数据库查询已经满足不了业务的需求。
试着用Spark
来做,权当练练手了。
安装Spark
因为有Scala
的开发经验,所以就不用官方提供的二进制包了,自编译scala 2.11
版本。
下载Spark:http://ftp.cuhk.edu.hk/pub/packages/apache.org/spark/spark-1.5.0/spark-1.5.0.tgz
1 2 3 4
| tar zxf spark-1.5.0.tgz cd spark-1.5.0 ./dev/change-scala-version.sh 2.11 mvn -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests clean package
|
以上命令完成Spark
基于scala 2.11
版本的编译。可以运行自带的一个示例程序来验证安装是否成功。
1
| ./bin/run-example SparkPi
|
编写Standalone application
使用sbt
来构建一个可提交的简单Spark
程序,功能是计算每个用户加入的群组,并把结果保存下来。project/Build.scala
配置文件如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import _root_.sbt.Keys._ import _root_.sbt._ import sbtassembly.AssemblyKeys._
object Build extends Build {
override lazy val settings = super.settings :+ { shellPrompt := (s => Project.extract(s).currentProject.id + " > ") }
lazy val root = Project("spark-mongodb", file(".")) .settings( scalaVersion := "2.11.7", assemblyJarName in assembly := "spark-mongodb.jar", assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false), libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % verSpark % "scopeProvidedTest, "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.0" excludeAll( ExclusionRule(organization = "javax.servlet"), ExclusionRule(organization = "commons-beanutils"), ExclusionRule(organization = "org.apache.hadoop"))) ) private val scopeProvidedTest = "provided,test" private val verSpark = "1.5.0" }
|
数据存储在MongoDB
数据库中,所以我们还需要使用mongo-hadoop
连接器来访问MongoDB
数据库。
示例程序
示例程序非常的简单,把数据从数据库里全部读出,使用map
来把每条记录里用户ID对应加入的群组ID转换成一个Set
,再使用
reduceByKey
来把相同用户ID的set
合并到一起,存入数据库即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import com.mongodb.BasicDBObject import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat} import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkContext} import org.bson.BSONObject
import scala.collection.JavaConverters._
object QQGroup {
def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("QQGroup") val sc = new SparkContext(sparkConf)
val inputConfig = new Configuration() inputConfig.set("mongo.input.uri", "mongodb://192.168.31.121:27017/db.userGroup") inputConfig.set("mongo.input.fields", """{"userId":1, "groupId":1, "_id":0}""") inputConfig.set("mongo.input.noTimeout", "true")
val documentRDD = sc.newAPIHadoopRDD( inputConfig, classOf[MongoInputFormat], classOf[Object], classOf[BSONObject])
val userRDD = documentRDD.map { case (_, doc) => (getValue(doc, "userId"), getValue(doc, "groupId")) }.reduceByKey(_ ++ _)
val resultRDD = userRDD.map { case (userId, groupIds) => val o = new BasicDBObject() o.put("groupIds", groupIds.asJava) userId -> o }
val outputConfig = new Configuration() outputConfig.set("mongo.output.uri", "mongodb://192.168.31.121:27017/db_result.userGroup")
resultRDD.saveAsNewAPIHadoopFile( "file://this-is-completely-unused", classOf[Object], classOf[BSONObject], classOf[MongoOutputFormat[Object, BSONObject]], outputConfig) }
def getValue(dbo: BSONObject, key: String) = { val value = dbo.get(key) if (value eq null) "" else value.asInstanceOf[String] } }
|
MongoDB
官方提供了Hadoop
连接器,Spark
可以使用mongo-hadoop
连接器来读、写MongoDB
数据库。
主要的输入配置荐有:
- mongo.input.uri: MongoDB的连接URI
- mongo.input.fields: 指定返回哪些数据,与
db.query
里的第2个参数功能一样
- mongo.input.query: MongoDB的查询参数
相应的MongoDB
也提供了一系列的输出参数,如:
- mongo.output.uri: MongoDB的连接URI
sc.newAPIHadoopRDD()
方法有4个参数,分别为:配置、输入格式化类、待映射数据主键类型、待映射数据类型。
主要的操作代码:
1 2 3 4 5 6 7 8 9
| val userRDD = documentRDD.map { case (_, doc) => (getValue(doc, "userId"), Set(getValue(doc, "groupId"))) }.reduceByKey(_ ++ _)
val resultRDD = userRDD.map { case (userId, groupIds) => val o = new BasicDBObject() o.put("groupIds", groupIds.asJava) userId -> o }
|
先使用map
方法获取userId
和groupId
,并把groupId
转换为一个Set
。
在把数据转换成Tuple2
,就是一个KV的形式以后,我们就可以调用一系列的转换方法来对RDD
进行操作,这里使用reduceByKey
方法来将同一个userId
的所以value
都合并在一起。这样我们就有了所有用户对应加入的群组
的一个RDD集了。
(RDD上有两种类型的操作。一种是“变换”,它只是描述了待进行的操作指令,并不会触发实际的计算;另一种是“动作”,
它将触发实际的计算动作,这时候系统才会实际的从数据源读入数据,操作内存,保存数据等)
最后使用resultRDD.saveAsNewAPIHadoopFile()
方法来把计算结果存入MongoDB
,这里的一个参数:用于指定
HDFS的存储位置并不会使用到,因为mongo-hadoop
将会使用mongo.output.uri
指定的存储URI连接地址来保存数据。