zookeeper + kafka + storm 集群搭建

本贴最后更新于 3046 天前,其中的信息可能已经事过境迁

.首先:需要三台测试机器(由于zookeeper 的选举机制,官方推荐是3台,并且是奇数台机器,{1台机器多个端口也可以})

192.168.12.28
192.168.12.151
192.168.12.152
 
环境及版本
jdk : java version "1.7.0_79"
os : fedora --x86_64-22-3
zookeeper :3.4.6
kafka:2.11-0.9.0.0
storm:0.10.0
使用:连续加号(+++++)分隔配置文件内容和正文
 
 
1.搭建zookeeper集群
先到apache 的zookeeper 项目中下载包
文档地址:http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html
包地址:http://www.apache.org/dyn/closer.cgi/zookeeper/
3.4.6 url:http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
 
下载包到测试机,解压 tar -zxvf zookeeper-3.4.6.tar.gz 
先进入conf 目录 配置 zoo.cfg,如下
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/usr/local/zookeeper-3.4.6/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#这连接客户端包括(比如kafka。strom等连接,所以请注意这个连接数不要太小,导致部署失败,或者客户端连接失败)
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature192
#autopurge.purgeInterval=1
##这是zookeeper 机集群地址。第一个端口是集群之间通信的端口(监听端口,和通信端口和选举端口不能重复,否则报错地址已用),第二个是选举leader时使用的
server.1=192.168.12.28:2888:3888
server.2=192.168.12.151:2888:3888
server.3=192.168.12.152:2888:3888
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
按这个配置,配置3台测试机器
 
到bin 目录启动zookeeper 集群:
 ./zkServer.sh  start     
查看集群状态
 ./zkServer.sh  status
 
mode:leader  说明他是leader 否则是follower
leader  挂掉后,集群会自动选举新的leader
 
在3台机器重复此操作
 
使用client 连接zookeeper集群(集群中启动的任意一台机器都可以)
./zkCli.sh --server192.168.12.28:2181
 
ls /                         查看根目录
create /test   this is test dir     创建目录
 
到此,zookeeper    集群搭建完毕
这是一写zookeeper 的配置信息

broker.id

整数,建议根据ip区分

 

log.dirs

kafka存放消息文件的路径,

默认/tmp/kafka-logs

port

broker用于接收producer消息的端口

 

zookeeper.connnect

zookeeper连接

格式为  ip1:port,ip2:port,ip3:port

message.max.bytes

单条消息的最大长度

 

num.network.threads

broker用于处理网络请求的线程数

如不配置默认为3,server.properties默认是2

num.io.threads

broker用于执行网络请求的IO线程数

如不配置默认为8,server.properties默认是2可适当增大,

queued.max.requests

排队等候IO线程执行的requests

默认为500

host.name

broker的hostname

默认null,建议写主机的ip,不然消费端不配置hosts会有麻烦

num.partitions

topic的默认分区数

默认1

log.retention.hours

消息被删除前保存多少小时

默认1周168小时

auto.create.topics.enable

是否可以程序自动创建Topic

默认true,建议false

default.replication.factor

消息备份数目

默认1不做复制,建议修改

num.replica.fetchers

用于复制leader消息到follower的IO线程数

默认1

 
 
 
 
 
 
2.搭建 kafka 集群
文档地址:http://kafka.apache.org/documentation.html#quickstart
包地址:
tar -xzf kafka_2.11-0.9.0.0.tgz
修改 config/server.properties
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# The id of the broker. This must be set to a unique integer for each broker.
##必须唯一
broker.id=0
 
############################# Socket Server Settings #############################
#客户端连接的时候请按照此地址连接, 同一个地址,不同表示方式会导致生产和消费 的使用异常
listeners=PLAINTEXT://192.168.12.28:9092
 
# The port the socket server listens on
##客户端连接kafka的端口
#port=9092
 
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
 
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>
 
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
 
# The number of threads handling network requests
num.network.threads=3
 
# The number of threads doing disk I/O
num.io.threads=8
 
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
 
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
 
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
 
 
############################# Log Basics #############################
 
# A comma seperated list of directories under which to store log files
//这个不要设置到机器的临时目录,否则启动可能会报错
log.dirs=/usr/local/kafka_2.11-0.9.0.0/data
 
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
 
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
 
############################# Log Flush Policy #############################
 
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
 
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
 
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
 
############################# Log Retention Policy #############################
 
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
 
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
 
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
 
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
 
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
 
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
 
############################# Zookeeper #############################
 
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
##kafka 是基于 zookeeper 的,保存kafka的数据信息、配置,读取偏移等
zookeeper.connect=192.168.12.28:2181,192.168.12.151:2181,192.168.12.152:2181
 
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
 
把此配置应用到3台测试机,注意:broker.id不能唯一
 
进入 bin 目录
启动 kafka 后面的参数是kafka 的配置文件目录,启动失败会立即报错
./kafka-server-start.sh  ../config/server.properties
 
启动3台kafka集群
 
测试kafka集群:
先创建一个test主题,
./kafka-topics.sh --create --zookeeper 192.168.12.28:2181 --replication-factor 1 --partitions 1 --topic test
查看创建的主题信息 
./kafka-topics.sh --zookeeper 192.168.12.28:2181  --describe  --topic testtopic
====================================================================
Topic:testtopic          PartitionCount:1           ReplicationFactor:1                   Configs:
 Topic: testtopic          Partition: 0                Leader: 4                                    Replicas: 4                       Isr: 4
====================================================================
Partition : 分区
L eader :负责读写指定分区的节点
Replicas : 复制该分区log的节点列表
Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
 
通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为testtopic的Topic中生产消息,执行如下脚本:
bin/kafka-console-producer.sh --broker-list 192.168.12.28:9092,192.168.12.151:9092,192.168.12.152:9092 --topic testtopic
 
在另一个终端,启动Consumer,并订阅我们上面创建的名称为testtopic5的Topic中生产的消息,执行如下脚本
bin/kafka-console-consumer.sh --zookeeper 192.168.12.28:2181,192.168.12.151:2181,192.168.12.152:2181 --from-beginning --topic testtopic
 
可以在Producer终端上输入字符串消息行,然后回车(一行一条数据),就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。
 
到此,kafka集群搭建完毕(具体详细的参数配置请查看文档)
 
 
 
 
 
 
3.搭建storm 集群
文档地址:http://storm.apache.org/documentation.html
包地址 : http://storm.apache.org/downloads.html
0.10.0 : http://124.202.164.11/files/4168000007207070/mirrors.cnnic.cn/apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz
 
tar -zxvf apache-storm-0.10.0.tar.gz
cd apache-storm-0.10.0/conf
 
修改配置 storm.yaml
1)storm 依赖 zookeeper

如果Zookeeper集群使用的不是默认端口,那么还需要storm.zookeeper.port选项。

2) storm.local.dir: Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给以足够的访问权限。然后在storm.yaml中配置该目录,如:

storm.local.dir: "/home/admin/storm/workdir"   

3) java.library.path: Storm使用的本地库(ZMQ和JZMQ)加载路径,默认为”/usr/local/lib:/opt/local/lib:/usr/lib”,一般来说ZMQ和JZMQ默认安装在/usr/local/lib 下,因此不需要配置即可。

4) nimbus.host: Storm集群Nimbus机器地址(存在单点问题),各个Supervisor工作节点需要知道哪个机器是Nimbus,以便下载Topologies的jars、confs等文件

5) supervisor.slots.ports: 对于每个Supervisor工作节点,需要配置该工作节点可以运行的worker数量。每个worker占用一个单独的端口用于接收消息,该配置选项即用于定义哪些端口是可被worker使用的。默认情况下,每个节点上可运行4个workers,分别在6700、6701、6702和6703端口,如:supervisor.slots.ports:- 6700- 6701- 6702- 6703


+++++++++++++++++++++++++++++++++++++++++++++++
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
     - "192.168.12.28"
     - "192.168.12.151"
     - "192.168.12.152"
 
nimbus.host: "192.168.12.28"
 
storm.local.dir: "/usr/local/apache-storm-0.10.0/workdata"
 
supervisor.slots.ports:
    - 6700
    - 6701
 
#
#
# ##### These may optionally be filled in:
#    
## List of custom serializations
# topology.kryo.register:
#     - org.mycompany.MyType
#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer
#
## List of custom kryo decorators
# topology.kryo.decorators:
#     - org.mycompany.MyDecorator
#
## Locations of the drpc servers
# drpc.servers:
#     - "server1"
#     - "server2"
 
## Metrics Consumers
# topology.metrics.consumer.register:
#   - class: "backtype.storm.metric.LoggingMetricsConsumer"
#     parallelism.hint: 1
#   - class: "org.mycompany.MyMetricsConsumer"
#     parallelism.hint: 1
#     argument:
#       - endpoint: "metrics-collector.mycompany.org"
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 
三台测试机的配置相同即可,现在启动storm 
cd ../bin/
./storm nimbus   启动主节点//指定的主节点机器启动
./storm supervisor  启动工作子节点
./storm  ui     启动storm 自带的监控UI, 使用host:8080访问
 
 
自此,storm 集群搭建完毕
 
4.kafka + storm 继承
       刚发现这竟然没写,周末补
 
  • Storm
    4 引用 • 8 回帖 • 1 关注
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    36 引用 • 35 回帖
  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 5 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • jsoup

    jsoup 是一款 Java 的 HTML 解析器,可直接解析某个 URL 地址、HTML 文本内容。它提供了一套非常省力的 API,可通过 DOM,CSS 以及类似于 jQuery 的操作方法来取出和操作数据。

    6 引用 • 1 回帖 • 477 关注
  • 面试

    面试造航母,上班拧螺丝。多面试,少加班。

    325 引用 • 1395 回帖
  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    86 引用 • 122 回帖 • 625 关注
  • Lute

    Lute 是一款结构化的 Markdown 引擎,支持 Go 和 JavaScript。

    25 引用 • 191 回帖 • 16 关注
  • 心情

    心是产生任何想法的源泉,心本体会陷入到对自己本体不能理解的状态中,因为心能产生任何想法,不能分出对错,不能分出自己。

    59 引用 • 369 回帖
  • SVN

    SVN 是 Subversion 的简称,是一个开放源代码的版本控制系统,相较于 RCS、CVS,它采用了分支管理系统,它的设计目标就是取代 CVS。

    29 引用 • 98 回帖 • 680 关注
  • SSL

    SSL(Secure Sockets Layer 安全套接层),及其继任者传输层安全(Transport Layer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS 与 SSL 在传输层对网络连接进行加密。

    70 引用 • 193 回帖 • 431 关注
  • Ubuntu

    Ubuntu(友帮拓、优般图、乌班图)是一个以桌面应用为主的 Linux 操作系统,其名称来自非洲南部祖鲁语或豪萨语的“ubuntu”一词,意思是“人性”、“我的存在是因为大家的存在”,是非洲传统的一种价值观,类似华人社会的“仁爱”思想。Ubuntu 的目标在于为一般用户提供一个最新的、同时又相当稳定的主要由自由软件构建而成的操作系统。

    125 引用 • 169 回帖 • 1 关注
  • Postman

    Postman 是一款简单好用的 HTTP API 调试工具。

    4 引用 • 3 回帖 • 2 关注
  • 游戏

    沉迷游戏伤身,强撸灰飞烟灭。

    176 引用 • 815 回帖
  • 禅道

    禅道是一款国产的开源项目管理软件,她的核心管理思想基于敏捷方法 scrum,内置了产品管理和项目管理,同时又根据国内研发现状补充了测试管理、计划管理、发布管理、文档管理、事务管理等功能,在一个软件中就可以将软件研发中的需求、任务、bug、用例、计划、发布等要素有序的跟踪管理起来,完整地覆盖了项目管理的核心流程。

    6 引用 • 15 回帖 • 113 关注
  • Ant-Design

    Ant Design 是服务于企业级产品的设计体系,基于确定和自然的设计价值观上的模块化解决方案,让设计者和开发者专注于更好的用户体验。

    17 引用 • 23 回帖
  • Maven

    Maven 是基于项目对象模型(POM)、通过一小段描述信息来管理项目的构建、报告和文档的软件项目管理工具。

    186 引用 • 318 回帖 • 303 关注
  • 百度

    百度(Nasdaq:BIDU)是全球最大的中文搜索引擎、最大的中文网站。2000 年 1 月由李彦宏创立于北京中关村,致力于向人们提供“简单,可依赖”的信息获取方式。“百度”二字源于中国宋朝词人辛弃疾的《青玉案·元夕》词句“众里寻他千百度”,象征着百度对中文信息检索技术的执著追求。

    63 引用 • 785 回帖 • 175 关注
  • 创业

    你比 99% 的人都优秀么?

    84 引用 • 1399 回帖
  • Vditor

    Vditor 是一款浏览器端的 Markdown 编辑器,支持所见即所得、即时渲染(类似 Typora)和分屏预览模式。它使用 TypeScript 实现,支持原生 JavaScript、Vue、React 和 Angular。

    351 引用 • 1814 回帖
  • 程序员

    程序员是从事程序开发、程序维护的专业人员。

    567 引用 • 3532 回帖
  • 资讯

    资讯是用户因为及时地获得它并利用它而能够在相对短的时间内给自己带来价值的信息,资讯有时效性和地域性。

    55 引用 • 85 回帖 • 2 关注
  • Node.js

    Node.js 是一个基于 Chrome JavaScript 运行时建立的平台, 用于方便地搭建响应速度快、易于扩展的网络应用。Node.js 使用事件驱动, 非阻塞 I/O 模型而得以轻量和高效。

    139 引用 • 269 回帖 • 43 关注
  • 开源中国

    开源中国是目前中国最大的开源技术社区。传播开源的理念,推广开源项目,为 IT 开发者提供了一个发现、使用、并交流开源技术的平台。目前开源中国社区已收录超过两万款开源软件。

    7 引用 • 86 回帖
  • Firefox

    Mozilla Firefox 中文俗称“火狐”(正式缩写为 Fx 或 fx,非正式缩写为 FF),是一个开源的网页浏览器,使用 Gecko 排版引擎,支持多种操作系统,如 Windows、OSX 及 Linux 等。

    8 引用 • 30 回帖 • 407 关注
  • Wide

    Wide 是一款基于 Web 的 Go 语言 IDE。通过浏览器就可以进行 Go 开发,并有代码自动完成、查看表达式、编译反馈、Lint、实时结果输出等功能。

    欢迎访问我们运维的实例: https://wide.b3log.org

    30 引用 • 218 回帖 • 628 关注
  • 负能量

    上帝为你关上了一扇门,然后就去睡觉了....努力不一定能成功,但不努力一定很轻松 (° ー °〃)

    88 引用 • 1235 回帖 • 411 关注
  • Rust

    Rust 是一门赋予每个人构建可靠且高效软件能力的语言。Rust 由 Mozilla 开发,最早发布于 2014 年 9 月。

    58 引用 • 22 回帖
  • 深度学习

    深度学习(Deep Learning)是机器学习的分支,是一种试图使用包含复杂结构或由多重非线性变换构成的多个处理层对数据进行高层抽象的算法。

    53 引用 • 40 回帖 • 2 关注
  • 学习

    “梦想从学习开始,事业从实践起步” —— 习近平

    169 引用 • 506 回帖
  • WebClipper

    Web Clipper 是一款浏览器剪藏扩展,它可以帮助你把网页内容剪藏到本地。

    3 引用 • 9 回帖