Spark Streaming Kafka CreateDirectDStreaming 遇见的问题

12 篇文章 0 订阅
问题1:
spark-submit 提交任务报错如下:
分析:起初我的spark 集群是部署在yarn上,所以在spark-env和spark-default下配置了hadoop相关参数。最后我想使用spark standalone模式跑程序,就把spark-env和spark-default下的hadoop相关参数
注释掉了。之后提交程序提示:
Exception in thread "main" java.net.ConnectException: Call From node1/192.168.88.130 to node1:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
异常:就是说连接hadoop hdfs 拒绝,当时排错心想还是spark conf下有hadoop配置没注释完整。一遍遍检查最终也没找到,最后怀疑是linux环境变量有HADOOP_CONF_DIR的配置,结果使用echo $HADOOP_CONF_DIR
果然存在,在/etc/profile中配置的,注释掉解决问题。
总结:尽量不要把局部应用环境变量配置在/etc/profile中,而是配置大数据框架的环境变量中。


问题2:

博主是Linux单节点的伪分布式,一个Master和一个Worker,并且Master和Worker在同一节点上。此时忽略了系统是跑在分布式的环境下的,所以当时糊涂设置为本地

文件系统路径,提交应用之后发现提示,并且CheckPoint没有生效。
 WARN SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on 
 the local filesystem. Directory 'file:/home/daxineckpoint' appears to be on the local filesystem.
 

 spark 应用跑在集群模式下,checkpoint directory是不可以设置在本地文件系统的


问题3:


package com.sparkstreaming.direct

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by Dax1n on 2016/12/1.
  */
object DirectCreateDstream1 {
  val kafkaParams = Map[String, String](
    "metadata.broker.list" -> "node1:9092,node1:9093,node1:9094",
    "group.id" -> "onlyOneCk1")

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    conf.setAppName("LocalDirect").setMaster("local[2]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")


	
    def createStreamingContext():StreamingContext={
      val ssc = new StreamingContext(sc, Seconds(2))
      ssc.checkpoint("C:\\streamingcheckpoint1")
      val dStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,Set("orderNumOnlyOne1"))

      val dStream1 = dStream.map{
        x=>
          x._1+" - "+x._2
      }

      dStream1.print()
      ssc
    }

// 重重注意:对于Spark的Transform和Action都要写在getOrCreate的createStreamingContext函数中,否则报错!!!,此处更多技巧看官方文档
//官网地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html的 Checkpointing 章节
//
    val ssc = StreamingContext.getOrCreate("C:\\streamingcheckpoint1",createStreamingContext _)
//错误信息:
//16/12/01 09:04:38 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
//org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@4c2a67cc has not been initialized
	
    ssc.start()
    ssc.awaitTermination()


  }

}






如果把Spark 的 Transform和Action写在创建CreateStreamContext函数外面会报如下错误:

16/12/01 09:04:38 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped

org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@4c2a67cc has not been initialized


错误代码 : 

package com.sparkstreaming.direct

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by Dax1n on 2016/12/1.
  */
object DirectCreateDstream1 {
  val kafkaParams = Map[String, String](
    "metadata.broker.list" -> "node1:9092,node1:9093,node1:9094",
    "group.id" -> "onlyOneCk1")

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    conf.setAppName("LocalDirect").setMaster("local[2]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")


	
    def createStreamingContext():StreamingContext={
      val ssc = new StreamingContext(sc, Seconds(2))
      ssc.checkpoint("C:\\streamingcheckpoint1")
  
      ssc
    }

 //错误写法:Transform和Action写在创建CreateStreamContext函数外面
    val ssc = StreamingContext.getOrCreate("C:\\streamingcheckpoint1",createStreamingContext _)
	
	    val dStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,Set("orderNumOnlyOne1"))

      val dStream1 = dStream.map{
        x=>
          x._1+" - "+x._2
      }

      dStream1.print()
 
    ssc.start()
    ssc.awaitTermination()

  }

}




  • 0
    点赞
  • 0
    收藏
    觉得还不错? 一键收藏
  • 1
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值