Spark Streaming Kafka CreateDirectDStreaming 遇见的问题

本贴最后更新于 3105 天前,其中的信息可能已经沧海桑田

问题 1:
spark-submit 提交任务报错如下:
分析:起初我的 spark 集群是部署在 yarn 上,所以在 spark-env 和 spark-default 下配置了 hadoop 相关参数。最后我想使用 spark standalone 模式跑程序,就把 spark-env 和 spark-default 下的 hadoop 相关参数
注释掉了。之后提交程序提示:
Exception in thread "main" java.net.ConnectException: Call From node1/192.168.88.130 to node1:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
异常:就是说连接 hadoop hdfs 拒绝,当时排错心想还是 spark conf 下有 hadoop 配置没注释完整。一遍遍检查最终也没找到,最后怀疑是 linux 环境变量有 HADOOP_CONF_DIR 的配置,结果使用 echo $HADOOP_CONF_DIR
果然存在,在/etc/profile 中配置的,注释掉解决问题。
总结:尽量不要把局部应用环境变量配置在/etc/profile 中,而是配置大数据框架的环境变量中。

问题 2:
博主是 Linux 单节点的伪分布式,一个 Master 和一个 Worker,并且 Master 和 Worker 在同一节点上。此时忽略了系统是跑在分布式的环境下的,所以当时糊涂设置为本地
文件系统路径,提交应用之后发现提示,并且 CheckPoint 没有生效。
WARN SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on
the local filesystem. Directory 'file:/home/daxineckpoint' appears to be on the local filesystem.

spark 应用跑在集群模式下,checkpoint directory 是不可以设置在本地文件系统的,因为程序在分布式环境下运行。

package com.sparkstreaming.direct

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

  • Created by Dax1n on 2016/12/1.
    */
    object DirectCreateDstream1 {
    val kafkaParams = Map[String, String](
    "metadata.broker.list" -> "node1:9092,node1:9093,node1:9094",
    "group.id" -> "onlyOneCk1")

def main(args: Array[String]): Unit = {

val conf = new SparkConf() conf.setAppName("LocalDirect").setMaster("local[2]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") def createStreamingContext():StreamingContext={ val ssc = new StreamingContext(sc, Seconds(2)) ssc.checkpoint("C:\\streamingcheckpoint1") val dStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,Set("orderNumOnlyOne1")) val dStream1 = dStream.map{ x=> x._1+" - "+x._2 } dStream1.print() ssc }

// 重重注意:对于 Spark 的 Transform 和 Action 都要写在 getOrCreate 的 createStreamingContext 函数中,否则报错!!!,此处更多技巧看官方文档
//官网地址:http://spark.apache.org/docs/latest/streaming-programming-guide.html 的 Checkpointing 章节
//
val ssc = StreamingContext.getOrCreate("C:\streamingcheckpoint1",createStreamingContext _)
//错误信息:
//16/12/01 09:04:38 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
//org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@4c2a67cc has not been initialized

ssc.start() ssc.awaitTermination()

}

}

如果把 Spark 的 Transform 和 Action 写在创建 CreateStreamContext 函数外面会报如下错误:

16/12/01 09:04:38 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@4c2a67cc has not been initialized

错误代码 :

package com.sparkstreaming.direct

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

  • Created by Dax1n on 2016/12/1.
    */
    object DirectCreateDstream1 {
    val kafkaParams = Map[String, String](
    "metadata.broker.list" -> "node1:9092,node1:9093,node1:9094",
    "group.id" -> "onlyOneCk1")

def main(args: Array[String]): Unit = {

val conf = new SparkConf() conf.setAppName("LocalDirect").setMaster("local[2]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") def createStreamingContext():StreamingContext={ val ssc = new StreamingContext(sc, Seconds(2)) ssc.checkpoint("C:\\streamingcheckpoint1") ssc }

//错误写法:Transform 和 Action 写在创建 CreateStreamContext 函数外面
val ssc = StreamingContext.getOrCreate("C:\streamingcheckpoint1",createStreamingContext _)

val dStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,Set("orderNumOnlyOne1")) val dStream1 = dStream.map{ x=> x._1+" - "+x._2 } dStream1.print() ssc.start() ssc.awaitTermination()

}

}

我的 CSDN:http://blog.csdn.net/dax1n/article/details/53425668

  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1063 引用 • 3455 回帖 • 150 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • 单点登录

    单点登录(Single Sign On)是目前比较流行的企业业务整合的解决方案之一。SSO 的定义是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系统。

    9 引用 • 25 回帖 • 2 关注
  • 印象笔记
    3 引用 • 16 回帖 • 1 关注
  • Git

    Git 是 Linux Torvalds 为了帮助管理 Linux 内核开发而开发的一个开放源码的版本控制软件。

    211 引用 • 358 回帖
  • 数据库

    据说 99% 的性能瓶颈都在数据库。

    345 引用 • 754 回帖
  • 游戏

    沉迷游戏伤身,强撸灰飞烟灭。

    187 引用 • 831 回帖
  • 安装

    你若安好,便是晴天。

    132 引用 • 1184 回帖 • 1 关注
  • Mobi.css

    Mobi.css is a lightweight, flexible CSS framework that focus on mobile.

    1 引用 • 6 回帖 • 766 关注
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 37 关注
  • DNSPod

    DNSPod 建立于 2006 年 3 月份,是一款免费智能 DNS 产品。 DNSPod 可以为同时有电信、网通、教育网服务器的网站提供智能的解析,让电信用户访问电信的服务器,网通的用户访问网通的服务器,教育网的用户访问教育网的服务器,达到互联互通的效果。

    6 引用 • 26 回帖 • 537 关注
  • Quicker

    Quicker 您的指尖工具箱!操作更少,收获更多!

    37 引用 • 157 回帖
  • 开源

    Open Source, Open Mind, Open Sight, Open Future!

    413 引用 • 3590 回帖
  • 资讯

    资讯是用户因为及时地获得它并利用它而能够在相对短的时间内给自己带来价值的信息,资讯有时效性和地域性。

    56 引用 • 85 回帖 • 1 关注
  • 外包

    有空闲时间是接外包好呢还是学习好呢?

    26 引用 • 233 回帖
  • 职场

    找到自己的位置,萌新烦恼少。

    127 引用 • 1708 回帖 • 1 关注
  • Shell

    Shell 脚本与 Windows/Dos 下的批处理相似,也就是用各类命令预先放入到一个文件中,方便一次性执行的一个程序文件,主要是方便管理员进行设置或者管理用的。但是它比 Windows 下的批处理更强大,比用其他编程程序编辑的程序效率更高,因为它使用了 Linux/Unix 下的命令。

    125 引用 • 74 回帖
  • 智能合约

    智能合约(Smart contract)是一种旨在以信息化方式传播、验证或执行合同的计算机协议。智能合约允许在没有第三方的情况下进行可信交易,这些交易可追踪且不可逆转。智能合约概念于 1994 年由 Nick Szabo 首次提出。

    1 引用 • 11 回帖 • 2 关注
  • WebClipper

    Web Clipper 是一款浏览器剪藏扩展,它可以帮助你把网页内容剪藏到本地。

    3 引用 • 9 回帖
  • API

    应用程序编程接口(Application Programming Interface)是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工作机制的细节。

    79 引用 • 431 回帖 • 1 关注
  • 30Seconds

    📙 前端知识精选集,包含 HTML、CSS、JavaScript、React、Node、安全等方面,每天仅需 30 秒。

    • 精选常见面试题,帮助您准备下一次面试
    • 精选常见交互,帮助您拥有简洁酷炫的站点
    • 精选有用的 React 片段,帮助你获取最佳实践
    • 精选常见代码集,帮助您提高打码效率
    • 整理前端界的最新资讯,邀您一同探索新世界
    488 引用 • 384 回帖 • 6 关注
  • Ngui

    Ngui 是一个 GUI 的排版显示引擎和跨平台的 GUI 应用程序开发框架,基于
    Node.js / OpenGL。目标是在此基础上开发 GUI 应用程序可拥有开发 WEB 应用般简单与速度同时兼顾 Native 应用程序的性能与体验。

    7 引用 • 9 回帖 • 403 关注
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    89 引用 • 113 回帖 • 2 关注
  • Facebook

    Facebook 是一个联系朋友的社交工具。大家可以通过它和朋友、同事、同学以及周围的人保持互动交流,分享无限上传的图片,发布链接和视频,更可以增进对朋友的了解。

    4 引用 • 15 回帖 • 446 关注
  • 音乐

    你听到信仰的声音了么?

    62 引用 • 512 回帖
  • 强迫症

    强迫症(OCD)属于焦虑障碍的一种类型,是一组以强迫思维和强迫行为为主要临床表现的神经精神疾病,其特点为有意识的强迫和反强迫并存,一些毫无意义、甚至违背自己意愿的想法或冲动反反复复侵入患者的日常生活。

    15 引用 • 161 回帖 • 1 关注
  • 新人

    让我们欢迎这对新人。哦,不好意思说错了,让我们欢迎这位新人!
    新手上路,请谨慎驾驶!

    52 引用 • 228 回帖
  • Flume

    Flume 是一套分布式的、可靠的,可用于有效地收集、聚合和搬运大量日志数据的服务架构。

    9 引用 • 6 回帖 • 661 关注
  • 区块链

    区块链是分布式数据存储、点对点传输、共识机制、加密算法等计算机技术的新型应用模式。所谓共识机制是区块链系统中实现不同节点之间建立信任、获取权益的数学算法 。

    92 引用 • 752 回帖