问题 1:
spark-submit 提交任务报错如下:
分析:起初我的 spark 集群是部署在 yarn 上,所以在 spark-env 和 spark-default 下配置了 hadoop 相关参数。最后我想使用 spark standalone 模式跑程序,就把 spark-env 和 spark-default 下的 hadoop 相关参数
注释掉了。之后提交程序提示:
Exception in thread "main" java.net.ConnectException: Call From node1/192.168.88.130 to node1:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
异常:就是说连接 hadoop hdfs 拒绝,当时排错心想还是 spark conf 下有 hadoop 配置没注释完整。一遍遍检查最终也没找到,最后怀疑是 linux 环境变量有 HADOOP_CONF_DIR 的配置,结果使用 echo $HADOOP_CONF_DIR
果然存在,在/etc/profile 中配置的,注释掉解决问题。
总结:尽量不要把局部应用环境变量配置在/etc/profile 中,而是配置大数据框架的环境变量中。
问题 2:
博主是 Linux 单节点的伪分布式,一个 Master 和一个 Worker,并且 Master 和 Worker 在同一节点上。此时忽略了系统是跑在分布式的环境下的,所以当时糊涂设置为本地
文件系统路径,提交应用之后发现提示,并且 CheckPoint 没有生效。
WARN SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on
the local filesystem. Directory 'file:/home/daxineckpoint' appears to be on the local filesystem.
spark 应用跑在集群模式下,checkpoint directory 是不可以设置在本地文件系统的,因为程序在分布式环境下运行。
package com.sparkstreaming.direct
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
- Created by Dax1n on 2016/12/1.
*/
object DirectCreateDstream1 {
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "node1:9092,node1:9093,node1:9094",
"group.id" -> "onlyOneCk1")
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("LocalDirect").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
def createStreamingContext():StreamingContext={
val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("C:\\streamingcheckpoint1")
val dStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,Set("orderNumOnlyOne1"))
val dStream1 = dStream.map{
x=>
x._1+" - "+x._2
}
dStream1.print()
ssc
}
// 重重注意:对于 Spark 的 Transform 和 Action 都要写在 getOrCreate 的 createStreamingContext 函数中,否则报错!!!,此处更多技巧看官方文档
//官网地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html 的 Checkpointing 章节
//
val ssc = StreamingContext.getOrCreate("C:\streamingcheckpoint1",createStreamingContext _)
//错误信息:
//16/12/01 09:04:38 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
//org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@4c2a67cc has not been initialized
ssc.start()
ssc.awaitTermination()
}
}
如果把 Spark 的 Transform 和 Action 写在创建 CreateStreamContext 函数外面会报如下错误:
16/12/01 09:04:38 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@4c2a67cc has not been initialized
错误代码 :
package com.sparkstreaming.direct
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
- Created by Dax1n on 2016/12/1.
*/
object DirectCreateDstream1 {
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "node1:9092,node1:9093,node1:9094",
"group.id" -> "onlyOneCk1")
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("LocalDirect").setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
def createStreamingContext():StreamingContext={
val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("C:\\streamingcheckpoint1")
ssc
}
//错误写法:Transform 和 Action 写在创建 CreateStreamContext 函数外面
val ssc = StreamingContext.getOrCreate("C:\streamingcheckpoint1",createStreamingContext _)
val dStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,Set("orderNumOnlyOne1"))
val dStream1 = dStream.map{
x=>
x._1+" - "+x._2
}
dStream1.print()
ssc.start()
ssc.awaitTermination()
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于