zookeeper + kafka + storm 集群搭建

本贴最后更新于 3262 天前,其中的信息可能已经事过境迁

.首先:需要三台测试机器(由于zookeeper 的选举机制,官方推荐是3台,并且是奇数台机器,{1台机器多个端口也可以})

192.168.12.28
192.168.12.151
192.168.12.152
 
环境及版本
jdk : java version "1.7.0_79"
os : fedora --x86_64-22-3
zookeeper :3.4.6
kafka:2.11-0.9.0.0
storm:0.10.0
使用:连续加号(+++++)分隔配置文件内容和正文
 
 
1.搭建zookeeper集群
先到apache 的zookeeper 项目中下载包
文档地址:http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html
包地址:http://www.apache.org/dyn/closer.cgi/zookeeper/
3.4.6 url:http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
 
下载包到测试机,解压 tar -zxvf zookeeper-3.4.6.tar.gz 
先进入conf 目录 配置 zoo.cfg,如下
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/usr/local/zookeeper-3.4.6/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#这连接客户端包括(比如kafka。strom等连接,所以请注意这个连接数不要太小,导致部署失败,或者客户端连接失败)
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature192
#autopurge.purgeInterval=1
##这是zookeeper 机集群地址。第一个端口是集群之间通信的端口(监听端口,和通信端口和选举端口不能重复,否则报错地址已用),第二个是选举leader时使用的
server.1=192.168.12.28:2888:3888
server.2=192.168.12.151:2888:3888
server.3=192.168.12.152:2888:3888
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
按这个配置,配置3台测试机器
 
到bin 目录启动zookeeper 集群:
 ./zkServer.sh  start     
查看集群状态
 ./zkServer.sh  status
 
mode:leader  说明他是leader 否则是follower
leader  挂掉后,集群会自动选举新的leader
 
在3台机器重复此操作
 
使用client 连接zookeeper集群(集群中启动的任意一台机器都可以)
./zkCli.sh --server192.168.12.28:2181
 
ls /                         查看根目录
create /test   this is test dir     创建目录
 
到此,zookeeper    集群搭建完毕
这是一写zookeeper 的配置信息

broker.id

整数,建议根据ip区分

 

log.dirs

kafka存放消息文件的路径,

默认/tmp/kafka-logs

port

broker用于接收producer消息的端口

 

zookeeper.connnect

zookeeper连接

格式为  ip1:port,ip2:port,ip3:port

message.max.bytes

单条消息的最大长度

 

num.network.threads

broker用于处理网络请求的线程数

如不配置默认为3,server.properties默认是2

num.io.threads

broker用于执行网络请求的IO线程数

如不配置默认为8,server.properties默认是2可适当增大,

queued.max.requests

排队等候IO线程执行的requests

默认为500

host.name

broker的hostname

默认null,建议写主机的ip,不然消费端不配置hosts会有麻烦

num.partitions

topic的默认分区数

默认1

log.retention.hours

消息被删除前保存多少小时

默认1周168小时

auto.create.topics.enable

是否可以程序自动创建Topic

默认true,建议false

default.replication.factor

消息备份数目

默认1不做复制,建议修改

num.replica.fetchers

用于复制leader消息到follower的IO线程数

默认1

 
 
 
 
 
 
2.搭建 kafka 集群
文档地址:http://kafka.apache.org/documentation.html#quickstart
包地址:
tar -xzf kafka_2.11-0.9.0.0.tgz
修改 config/server.properties
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# The id of the broker. This must be set to a unique integer for each broker.
##必须唯一
broker.id=0
 
############################# Socket Server Settings #############################
#客户端连接的时候请按照此地址连接, 同一个地址,不同表示方式会导致生产和消费 的使用异常
listeners=PLAINTEXT://192.168.12.28:9092
 
# The port the socket server listens on
##客户端连接kafka的端口
#port=9092
 
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
 
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>
 
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
 
# The number of threads handling network requests
num.network.threads=3
 
# The number of threads doing disk I/O
num.io.threads=8
 
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
 
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
 
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
 
 
############################# Log Basics #############################
 
# A comma seperated list of directories under which to store log files
//这个不要设置到机器的临时目录,否则启动可能会报错
log.dirs=/usr/local/kafka_2.11-0.9.0.0/data
 
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
 
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
 
############################# Log Flush Policy #############################
 
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
 
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
 
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
 
############################# Log Retention Policy #############################
 
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
 
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
 
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
 
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
 
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
 
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
 
############################# Zookeeper #############################
 
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
##kafka 是基于 zookeeper 的,保存kafka的数据信息、配置,读取偏移等
zookeeper.connect=192.168.12.28:2181,192.168.12.151:2181,192.168.12.152:2181
 
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
 
把此配置应用到3台测试机,注意:broker.id不能唯一
 
进入 bin 目录
启动 kafka 后面的参数是kafka 的配置文件目录,启动失败会立即报错
./kafka-server-start.sh  ../config/server.properties
 
启动3台kafka集群
 
测试kafka集群:
先创建一个test主题,
./kafka-topics.sh --create --zookeeper 192.168.12.28:2181 --replication-factor 1 --partitions 1 --topic test
查看创建的主题信息 
./kafka-topics.sh --zookeeper 192.168.12.28:2181  --describe  --topic testtopic
====================================================================
Topic:testtopic          PartitionCount:1           ReplicationFactor:1                   Configs:
 Topic: testtopic          Partition: 0                Leader: 4                                    Replicas: 4                       Isr: 4
====================================================================
Partition : 分区
L eader :负责读写指定分区的节点
Replicas : 复制该分区log的节点列表
Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
 
通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为testtopic的Topic中生产消息,执行如下脚本:
bin/kafka-console-producer.sh --broker-list 192.168.12.28:9092,192.168.12.151:9092,192.168.12.152:9092 --topic testtopic
 
在另一个终端,启动Consumer,并订阅我们上面创建的名称为testtopic5的Topic中生产的消息,执行如下脚本
bin/kafka-console-consumer.sh --zookeeper 192.168.12.28:2181,192.168.12.151:2181,192.168.12.152:2181 --from-beginning --topic testtopic
 
可以在Producer终端上输入字符串消息行,然后回车(一行一条数据),就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。
 
到此,kafka集群搭建完毕(具体详细的参数配置请查看文档)
 
 
 
 
 
 
3.搭建storm 集群
文档地址:http://storm.apache.org/documentation.html
包地址 : http://storm.apache.org/downloads.html
0.10.0 : http://124.202.164.11/files/4168000007207070/mirrors.cnnic.cn/apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz
 
tar -zxvf apache-storm-0.10.0.tar.gz
cd apache-storm-0.10.0/conf
 
修改配置 storm.yaml
1)storm 依赖 zookeeper

如果Zookeeper集群使用的不是默认端口,那么还需要storm.zookeeper.port选项。

2) storm.local.dir: Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给以足够的访问权限。然后在storm.yaml中配置该目录,如:

storm.local.dir: "/home/admin/storm/workdir"   

3) java.library.path: Storm使用的本地库(ZMQ和JZMQ)加载路径,默认为”/usr/local/lib:/opt/local/lib:/usr/lib”,一般来说ZMQ和JZMQ默认安装在/usr/local/lib 下,因此不需要配置即可。

4) nimbus.host: Storm集群Nimbus机器地址(存在单点问题),各个Supervisor工作节点需要知道哪个机器是Nimbus,以便下载Topologies的jars、confs等文件

5) supervisor.slots.ports: 对于每个Supervisor工作节点,需要配置该工作节点可以运行的worker数量。每个worker占用一个单独的端口用于接收消息,该配置选项即用于定义哪些端口是可被worker使用的。默认情况下,每个节点上可运行4个workers,分别在6700、6701、6702和6703端口,如:supervisor.slots.ports:- 6700- 6701- 6702- 6703


+++++++++++++++++++++++++++++++++++++++++++++++
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
     - "192.168.12.28"
     - "192.168.12.151"
     - "192.168.12.152"
 
nimbus.host: "192.168.12.28"
 
storm.local.dir: "/usr/local/apache-storm-0.10.0/workdata"
 
supervisor.slots.ports:
    - 6700
    - 6701
 
#
#
# ##### These may optionally be filled in:
#    
## List of custom serializations
# topology.kryo.register:
#     - org.mycompany.MyType
#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer
#
## List of custom kryo decorators
# topology.kryo.decorators:
#     - org.mycompany.MyDecorator
#
## Locations of the drpc servers
# drpc.servers:
#     - "server1"
#     - "server2"
 
## Metrics Consumers
# topology.metrics.consumer.register:
#   - class: "backtype.storm.metric.LoggingMetricsConsumer"
#     parallelism.hint: 1
#   - class: "org.mycompany.MyMetricsConsumer"
#     parallelism.hint: 1
#     argument:
#       - endpoint: "metrics-collector.mycompany.org"
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 
三台测试机的配置相同即可,现在启动storm 
cd ../bin/
./storm nimbus   启动主节点//指定的主节点机器启动
./storm supervisor  启动工作子节点
./storm  ui     启动storm 自带的监控UI, 使用host:8080访问
 
 
自此,storm 集群搭建完毕
 
4.kafka + storm 继承
       刚发现这竟然没写,周末补
 
  • Storm
    4 引用 • 8 回帖 • 1 关注
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    36 引用 • 35 回帖
  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 11 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Thymeleaf

    Thymeleaf 是一款用于渲染 XML/XHTML/HTML5 内容的模板引擎。类似 Velocity、 FreeMarker 等,它也可以轻易的与 Spring 等 Web 框架进行集成作为 Web 应用的模板引擎。与其它模板引擎相比,Thymeleaf 最大的特点是能够直接在浏览器中打开并正确显示模板页面,而不需要启动整个 Web 应用。

    11 引用 • 19 回帖 • 394 关注
  • 以太坊

    以太坊(Ethereum)并不是一个机构,而是一款能够在区块链上实现智能合约、开源的底层系统。以太坊是一个平台和一种编程语言 Solidity,使开发人员能够建立和发布下一代去中心化应用。 以太坊可以用来编程、分散、担保和交易任何事物:投票、域名、金融交易所、众筹、公司管理、合同和知识产权等等。

    34 引用 • 367 回帖 • 1 关注
  • 爬虫

    网络爬虫(Spider、Crawler),是一种按照一定的规则,自动地抓取万维网信息的程序。

    106 引用 • 275 回帖
  • 创造

    你创造的作品可能会帮助到很多人,如果是开源项目的话就更赞了!

    186 引用 • 1021 回帖
  • 大疆创新

    深圳市大疆创新科技有限公司(DJI-Innovations,简称 DJI),成立于 2006 年,是全球领先的无人飞行器控制系统及无人机解决方案的研发和生产商,客户遍布全球 100 多个国家。通过持续的创新,大疆致力于为无人机工业、行业用户以及专业航拍应用提供性能最强、体验最佳的革命性智能飞控产品和解决方案。

    2 引用 • 14 回帖
  • JavaScript

    JavaScript 一种动态类型、弱类型、基于原型的直译式脚本语言,内置支持类型。它的解释器被称为 JavaScript 引擎,为浏览器的一部分,广泛用于客户端的脚本语言,最早是在 HTML 网页上使用,用来给 HTML 网页增加动态功能。

    730 引用 • 1282 回帖 • 5 关注
  • webpack

    webpack 是一个用于前端开发的模块加载器和打包工具,它能把各种资源,例如 JS、CSS(less/sass)、图片等都作为模块来使用和处理。

    42 引用 • 130 回帖 • 252 关注
  • frp

    frp 是一个可用于内网穿透的高性能的反向代理应用,支持 TCP、UDP、 HTTP 和 HTTPS 协议。

    17 引用 • 7 回帖
  • 强迫症

    强迫症(OCD)属于焦虑障碍的一种类型,是一组以强迫思维和强迫行为为主要临床表现的神经精神疾病,其特点为有意识的强迫和反强迫并存,一些毫无意义、甚至违背自己意愿的想法或冲动反反复复侵入患者的日常生活。

    15 引用 • 161 回帖 • 1 关注
  • Follow
    4 引用 • 12 回帖 • 1 关注
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    89 引用 • 113 回帖 • 1 关注
  • RESTful

    一种软件架构设计风格而不是标准,提供了一组设计原则和约束条件,主要用于客户端和服务器交互类的软件。基于这个风格设计的软件可以更简洁,更有层次,更易于实现缓存等机制。

    30 引用 • 114 回帖
  • NetBeans

    NetBeans 是一个始于 1997 年的 Xelfi 计划,本身是捷克布拉格查理大学的数学及物理学院的学生计划。此计划延伸而成立了一家公司进而发展这个商用版本的 NetBeans IDE,直到 1999 年 Sun 买下此公司。Sun 于次年(2000 年)六月将 NetBeans IDE 开源,直到现在 NetBeans 的社群依然持续增长。

    78 引用 • 102 回帖 • 707 关注
  • Ubuntu

    Ubuntu(友帮拓、优般图、乌班图)是一个以桌面应用为主的 Linux 操作系统,其名称来自非洲南部祖鲁语或豪萨语的“ubuntu”一词,意思是“人性”、“我的存在是因为大家的存在”,是非洲传统的一种价值观,类似华人社会的“仁爱”思想。Ubuntu 的目标在于为一般用户提供一个最新的、同时又相当稳定的主要由自由软件构建而成的操作系统。

    127 引用 • 169 回帖 • 1 关注
  • IBM

    IBM(国际商业机器公司)或万国商业机器公司,简称 IBM(International Business Machines Corporation),总公司在纽约州阿蒙克市。1911 年托马斯·沃森创立于美国,是全球最大的信息技术和业务解决方案公司,拥有全球雇员 30 多万人,业务遍及 160 多个国家和地区。

    17 引用 • 53 回帖 • 144 关注
  • 架构

    我们平时所说的“架构”主要是指软件架构,这是有关软件整体结构与组件的抽象描述,用于指导软件系统各个方面的设计。另外还有“业务架构”、“网络架构”、“硬件架构”等细分领域。

    142 引用 • 442 回帖
  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖
  • Markdown

    Markdown 是一种轻量级标记语言,用户可使用纯文本编辑器来排版文档,最终通过 Markdown 引擎将文档转换为所需格式(比如 HTML、PDF 等)。

    171 引用 • 1537 回帖
  • golang

    Go 语言是 Google 推出的一种全新的编程语言,可以在不损失应用程序性能的情况下降低代码的复杂性。谷歌首席软件工程师罗布派克(Rob Pike)说:我们之所以开发 Go,是因为过去 10 多年间软件开发的难度令人沮丧。Go 是谷歌 2009 发布的第二款编程语言。

    500 引用 • 1395 回帖 • 243 关注
  • JSON

    JSON (JavaScript Object Notation)是一种轻量级的数据交换格式。易于人类阅读和编写。同时也易于机器解析和生成。

    53 引用 • 190 回帖
  • V2Ray
    1 引用 • 15 回帖 • 2 关注
  • Hprose

    Hprose 是一款先进的轻量级、跨语言、跨平台、无侵入式、高性能动态远程对象调用引擎库。它不仅简单易用,而且功能强大。你无需专门学习,只需看上几眼,就能用它轻松构建分布式应用系统。

    9 引用 • 17 回帖 • 644 关注
  • OnlyOffice
    4 引用 • 15 关注
  • Tomcat

    Tomcat 最早是由 Sun Microsystems 开发的一个 Servlet 容器,在 1999 年被捐献给 ASF(Apache Software Foundation),隶属于 Jakarta 项目,现在已经独立为一个顶级项目。Tomcat 主要实现了 JavaEE 中的 Servlet、JSP 规范,同时也提供 HTTP 服务,是市场上非常流行的 Java Web 容器。

    162 引用 • 529 回帖 • 7 关注
  • jsoup

    jsoup 是一款 Java 的 HTML 解析器,可直接解析某个 URL 地址、HTML 文本内容。它提供了一套非常省力的 API,可通过 DOM,CSS 以及类似于 jQuery 的操作方法来取出和操作数据。

    6 引用 • 1 回帖 • 491 关注
  • Pipe

    Pipe 是一款小而美的开源博客平台。Pipe 有着非常活跃的社区,可将文章作为帖子推送到社区,来自社区的回帖将作为博客评论进行联动(具体细节请浏览 B3log 构思 - 分布式社区网络)。

    这是一种全新的网络社区体验,让热爱记录和分享的你不再感到孤单!

    134 引用 • 1127 回帖 • 110 关注
  • 又拍云

    又拍云是国内领先的 CDN 服务提供商,国家工信部认证通过的“可信云”,乌云众测平台认证的“安全云”,为移动时代的创业者提供新一代的 CDN 加速服务。

    20 引用 • 37 回帖 • 580 关注