##以往在 spark 中读写 hbase,往往需要进行相当多的操作,这里提供一个快捷的操作方式。
测试坏境为本地模式,依赖为 spark1.6.x,hbase0.98,以及 spark-hbase-connector。这里需要说明的是,这种读写的方式没有提供原生 api 中的各种过滤接口,因此,适合进行归档的应用场景。如果需要进行复杂过滤条件下的读操作,最好使用原生 api。
项目的 git 地址 https://github.com/nerdammer/spark-hbase-connector?spm=5176.doc28131.2.6.cycZVO
##maven 依赖如下:
it.nerdammer.bigdata
spark-hbase-connector_2.10
1.0.3
##废话不多说,直接上代码:
import org.apache.spark.{SparkConf, SparkContext}
import it.nerdammer.spark.hbase._
object FastSparkWithHbase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("hbase").set("spark.hbase.host", "192.168.93.111").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val rdd = sc.parallelize(0 to 20)
.map(i => (i.toString, i + 1, "see" + util.Random.nextInt(100)))
//写数据,需要保证表结构已存在
rdd.toHBaseTable("spark_t").toColumns("col1", "col2").inColumnFamily("colfamily1").save()
//读数据,不支持复杂过滤器,可以考虑转换成rdd(不建议)
val hBaseRDD: HBaseReaderBuilder[(String, Int, String)] = sc.hbaseTable[(String, Int, String)]("spark_t")
.select("id+1", "stringValue").inColumnFamily("mycf")
hBaseRDD.foreach(println(_))
sc.stop()
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于