zookeeper + kafka + storm 集群搭建

本贴最后更新于 3309 天前,其中的信息可能已经事过境迁

.首先:需要三台测试机器(由于zookeeper 的选举机制,官方推荐是3台,并且是奇数台机器,{1台机器多个端口也可以})

192.168.12.28
192.168.12.151
192.168.12.152
 
环境及版本
jdk : java version "1.7.0_79"
os : fedora --x86_64-22-3
zookeeper :3.4.6
kafka:2.11-0.9.0.0
storm:0.10.0
使用:连续加号(+++++)分隔配置文件内容和正文
 
 
1.搭建zookeeper集群
先到apache 的zookeeper 项目中下载包
文档地址:http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html
包地址:http://www.apache.org/dyn/closer.cgi/zookeeper/
3.4.6 url:http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
 
下载包到测试机,解压 tar -zxvf zookeeper-3.4.6.tar.gz 
先进入conf 目录 配置 zoo.cfg,如下
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/usr/local/zookeeper-3.4.6/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#这连接客户端包括(比如kafka。strom等连接,所以请注意这个连接数不要太小,导致部署失败,或者客户端连接失败)
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature192
#autopurge.purgeInterval=1
##这是zookeeper 机集群地址。第一个端口是集群之间通信的端口(监听端口,和通信端口和选举端口不能重复,否则报错地址已用),第二个是选举leader时使用的
server.1=192.168.12.28:2888:3888
server.2=192.168.12.151:2888:3888
server.3=192.168.12.152:2888:3888
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
按这个配置,配置3台测试机器
 
到bin 目录启动zookeeper 集群:
 ./zkServer.sh  start     
查看集群状态
 ./zkServer.sh  status
 
mode:leader  说明他是leader 否则是follower
leader  挂掉后,集群会自动选举新的leader
 
在3台机器重复此操作
 
使用client 连接zookeeper集群(集群中启动的任意一台机器都可以)
./zkCli.sh --server192.168.12.28:2181
 
ls /                         查看根目录
create /test   this is test dir     创建目录
 
到此,zookeeper    集群搭建完毕
这是一写zookeeper 的配置信息

broker.id

整数,建议根据ip区分

 

log.dirs

kafka存放消息文件的路径,

默认/tmp/kafka-logs

port

broker用于接收producer消息的端口

 

zookeeper.connnect

zookeeper连接

格式为  ip1:port,ip2:port,ip3:port

message.max.bytes

单条消息的最大长度

 

num.network.threads

broker用于处理网络请求的线程数

如不配置默认为3,server.properties默认是2

num.io.threads

broker用于执行网络请求的IO线程数

如不配置默认为8,server.properties默认是2可适当增大,

queued.max.requests

排队等候IO线程执行的requests

默认为500

host.name

broker的hostname

默认null,建议写主机的ip,不然消费端不配置hosts会有麻烦

num.partitions

topic的默认分区数

默认1

log.retention.hours

消息被删除前保存多少小时

默认1周168小时

auto.create.topics.enable

是否可以程序自动创建Topic

默认true,建议false

default.replication.factor

消息备份数目

默认1不做复制,建议修改

num.replica.fetchers

用于复制leader消息到follower的IO线程数

默认1

 
 
 
 
 
 
2.搭建 kafka 集群
文档地址:http://kafka.apache.org/documentation.html#quickstart
包地址:
tar -xzf kafka_2.11-0.9.0.0.tgz
修改 config/server.properties
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# The id of the broker. This must be set to a unique integer for each broker.
##必须唯一
broker.id=0
 
############################# Socket Server Settings #############################
#客户端连接的时候请按照此地址连接, 同一个地址,不同表示方式会导致生产和消费 的使用异常
listeners=PLAINTEXT://192.168.12.28:9092
 
# The port the socket server listens on
##客户端连接kafka的端口
#port=9092
 
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
 
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>
 
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
 
# The number of threads handling network requests
num.network.threads=3
 
# The number of threads doing disk I/O
num.io.threads=8
 
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
 
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
 
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
 
 
############################# Log Basics #############################
 
# A comma seperated list of directories under which to store log files
//这个不要设置到机器的临时目录,否则启动可能会报错
log.dirs=/usr/local/kafka_2.11-0.9.0.0/data
 
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
 
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
 
############################# Log Flush Policy #############################
 
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
 
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
 
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
 
############################# Log Retention Policy #############################
 
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
 
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
 
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
 
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
 
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
 
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
 
############################# Zookeeper #############################
 
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
##kafka 是基于 zookeeper 的,保存kafka的数据信息、配置,读取偏移等
zookeeper.connect=192.168.12.28:2181,192.168.12.151:2181,192.168.12.152:2181
 
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
 
把此配置应用到3台测试机,注意:broker.id不能唯一
 
进入 bin 目录
启动 kafka 后面的参数是kafka 的配置文件目录,启动失败会立即报错
./kafka-server-start.sh  ../config/server.properties
 
启动3台kafka集群
 
测试kafka集群:
先创建一个test主题,
./kafka-topics.sh --create --zookeeper 192.168.12.28:2181 --replication-factor 1 --partitions 1 --topic test
查看创建的主题信息 
./kafka-topics.sh --zookeeper 192.168.12.28:2181  --describe  --topic testtopic
====================================================================
Topic:testtopic          PartitionCount:1           ReplicationFactor:1                   Configs:
 Topic: testtopic          Partition: 0                Leader: 4                                    Replicas: 4                       Isr: 4
====================================================================
Partition : 分区
L eader :负责读写指定分区的节点
Replicas : 复制该分区log的节点列表
Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
 
通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为testtopic的Topic中生产消息,执行如下脚本:
bin/kafka-console-producer.sh --broker-list 192.168.12.28:9092,192.168.12.151:9092,192.168.12.152:9092 --topic testtopic
 
在另一个终端,启动Consumer,并订阅我们上面创建的名称为testtopic5的Topic中生产的消息,执行如下脚本
bin/kafka-console-consumer.sh --zookeeper 192.168.12.28:2181,192.168.12.151:2181,192.168.12.152:2181 --from-beginning --topic testtopic
 
可以在Producer终端上输入字符串消息行,然后回车(一行一条数据),就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。
 
到此,kafka集群搭建完毕(具体详细的参数配置请查看文档)
 
 
 
 
 
 
3.搭建storm 集群
文档地址:http://storm.apache.org/documentation.html
包地址 : http://storm.apache.org/downloads.html
0.10.0 : http://124.202.164.11/files/4168000007207070/mirrors.cnnic.cn/apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz
 
tar -zxvf apache-storm-0.10.0.tar.gz
cd apache-storm-0.10.0/conf
 
修改配置 storm.yaml
1)storm 依赖 zookeeper

如果Zookeeper集群使用的不是默认端口,那么还需要storm.zookeeper.port选项。

2) storm.local.dir: Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给以足够的访问权限。然后在storm.yaml中配置该目录,如:

storm.local.dir: "/home/admin/storm/workdir"   

3) java.library.path: Storm使用的本地库(ZMQ和JZMQ)加载路径,默认为”/usr/local/lib:/opt/local/lib:/usr/lib”,一般来说ZMQ和JZMQ默认安装在/usr/local/lib 下,因此不需要配置即可。

4) nimbus.host: Storm集群Nimbus机器地址(存在单点问题),各个Supervisor工作节点需要知道哪个机器是Nimbus,以便下载Topologies的jars、confs等文件

5) supervisor.slots.ports: 对于每个Supervisor工作节点,需要配置该工作节点可以运行的worker数量。每个worker占用一个单独的端口用于接收消息,该配置选项即用于定义哪些端口是可被worker使用的。默认情况下,每个节点上可运行4个workers,分别在6700、6701、6702和6703端口,如:supervisor.slots.ports:- 6700- 6701- 6702- 6703


+++++++++++++++++++++++++++++++++++++++++++++++
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
     - "192.168.12.28"
     - "192.168.12.151"
     - "192.168.12.152"
 
nimbus.host: "192.168.12.28"
 
storm.local.dir: "/usr/local/apache-storm-0.10.0/workdata"
 
supervisor.slots.ports:
    - 6700
    - 6701
 
#
#
# ##### These may optionally be filled in:
#    
## List of custom serializations
# topology.kryo.register:
#     - org.mycompany.MyType
#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer
#
## List of custom kryo decorators
# topology.kryo.decorators:
#     - org.mycompany.MyDecorator
#
## Locations of the drpc servers
# drpc.servers:
#     - "server1"
#     - "server2"
 
## Metrics Consumers
# topology.metrics.consumer.register:
#   - class: "backtype.storm.metric.LoggingMetricsConsumer"
#     parallelism.hint: 1
#   - class: "org.mycompany.MyMetricsConsumer"
#     parallelism.hint: 1
#     argument:
#       - endpoint: "metrics-collector.mycompany.org"
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 
三台测试机的配置相同即可,现在启动storm 
cd ../bin/
./storm nimbus   启动主节点//指定的主节点机器启动
./storm supervisor  启动工作子节点
./storm  ui     启动storm 自带的监控UI, 使用host:8080访问
 
 
自此,storm 集群搭建完毕
 
4.kafka + storm 继承
       刚发现这竟然没写,周末补
 
  • Storm
    4 引用 • 8 回帖 • 1 关注
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    36 引用 • 35 回帖 • 1 关注
  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    61 引用 • 29 回帖 • 8 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Hexo

    Hexo 是一款快速、简洁且高效的博客框架,使用 Node.js 编写。

    22 引用 • 148 回帖 • 16 关注
  • 小薇

    小薇是一个用 Java 写的 QQ 聊天机器人 Web 服务,可以用于社群互动。

    由于 Smart QQ 从 2019 年 1 月 1 日起停止服务,所以该项目也已经停止维护了!

    35 引用 • 468 回帖 • 758 关注
  • Tomcat

    Tomcat 最早是由 Sun Microsystems 开发的一个 Servlet 容器,在 1999 年被捐献给 ASF(Apache Software Foundation),隶属于 Jakarta 项目,现在已经独立为一个顶级项目。Tomcat 主要实现了 JavaEE 中的 Servlet、JSP 规范,同时也提供 HTTP 服务,是市场上非常流行的 Java Web 容器。

    162 引用 • 529 回帖 • 12 关注
  • WebSocket

    WebSocket 是 HTML5 中定义的一种新协议,它实现了浏览器与服务器之间的全双工通信(full-duplex)。

    48 引用 • 206 回帖 • 280 关注
  • GitLab

    GitLab 是利用 Ruby 一个开源的版本管理系统,实现一个自托管的 Git 项目仓库,可通过 Web 界面操作公开或私有项目。

    46 引用 • 72 回帖
  • Notion

    Notion - The all-in-one workspace for your notes, tasks, wikis, and databases.

    10 引用 • 77 回帖
  • Sym

    Sym 是一款用 Java 实现的现代化社区(论坛/BBS/社交网络/博客)系统平台。

    下一代的社区系统,为未来而构建

    524 引用 • 4601 回帖 • 709 关注
  • Jenkins

    Jenkins 是一套开源的持续集成工具。它提供了非常丰富的插件,让构建、部署、自动化集成项目变得简单易用。

    54 引用 • 37 回帖 • 1 关注
  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    554 引用 • 675 回帖
  • 自由行
    1 关注
  • SSL

    SSL(Secure Sockets Layer 安全套接层),及其继任者传输层安全(Transport Layer Security,TLS)是为网络通信提供安全及数据完整性的一种安全协议。TLS 与 SSL 在传输层对网络连接进行加密。

    70 引用 • 193 回帖 • 414 关注
  • Android

    Android 是一种以 Linux 为基础的开放源码操作系统,主要使用于便携设备。2005 年由 Google 收购注资,并拉拢多家制造商组成开放手机联盟开发改良,逐渐扩展到到平板电脑及其他领域上。

    337 引用 • 324 回帖 • 2 关注
  • Webswing

    Webswing 是一个能将任何 Swing 应用通过纯 HTML5 运行在浏览器中的 Web 服务器,详细介绍请看 将 Java Swing 应用变成 Web 应用

    1 引用 • 15 回帖 • 651 关注
  • SQLite

    SQLite 是一个进程内的库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是全世界使用最为广泛的数据库引擎。

    4 引用 • 7 回帖 • 1 关注
  • Netty

    Netty 是一个基于 NIO 的客户端-服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    49 引用 • 33 回帖 • 44 关注
  • Oracle

    Oracle(甲骨文)公司,全称甲骨文股份有限公司(甲骨文软件系统有限公司),是全球最大的企业级软件公司,总部位于美国加利福尼亚州的红木滩。1989 年正式进入中国市场。2013 年,甲骨文已超越 IBM,成为继 Microsoft 后全球第二大软件公司。

    107 引用 • 127 回帖 • 341 关注
  • BAE

    百度应用引擎(Baidu App Engine)提供了 PHP、Java、Python 的执行环境,以及云存储、消息服务、云数据库等全面的云服务。它可以让开发者实现自动地部署和管理应用,并且提供动态扩容和负载均衡的运行环境,让开发者不用考虑高成本的运维工作,只需专注于业务逻辑,大大降低了开发者学习和迁移的成本。

    19 引用 • 75 回帖 • 685 关注
  • QQ

    1999 年 2 月腾讯正式推出“腾讯 QQ”,在线用户由 1999 年的 2 人(马化腾和张志东)到现在已经发展到上亿用户了,在线人数超过一亿,是目前使用最广泛的聊天软件之一。

    45 引用 • 557 回帖 • 1 关注
  • DNSPod

    DNSPod 建立于 2006 年 3 月份,是一款免费智能 DNS 产品。 DNSPod 可以为同时有电信、网通、教育网服务器的网站提供智能的解析,让电信用户访问电信的服务器,网通的用户访问网通的服务器,教育网的用户访问教育网的服务器,达到互联互通的效果。

    6 引用 • 26 回帖 • 530 关注
  • SEO

    发布对别人有帮助的原创内容是最好的 SEO 方式。

    36 引用 • 200 回帖 • 40 关注
  • Unity

    Unity 是由 Unity Technologies 开发的一个让开发者可以轻松创建诸如 2D、3D 多平台的综合型游戏开发工具,是一个全面整合的专业游戏引擎。

    25 引用 • 7 回帖 • 117 关注
  • HBase

    HBase 是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了 Google 文件系统所提供的分布式数据存储一样,HBase 在 Hadoop 之上提供了类似于 Bigtable 的能力。

    17 引用 • 6 回帖 • 73 关注
  • 服务器

    服务器,也称伺服器,是提供计算服务的设备。由于服务器需要响应服务请求,并进行处理,因此一般来说服务器应具备承担服务并且保障服务的能力。

    125 引用 • 585 回帖
  • jsDelivr

    jsDelivr 是一个开源的 CDN 服务,可为 npm 包、GitHub 仓库提供免费、快速并且可靠的全球 CDN 加速服务。

    5 引用 • 31 回帖 • 109 关注
  • JavaScript

    JavaScript 一种动态类型、弱类型、基于原型的直译式脚本语言,内置支持类型。它的解释器被称为 JavaScript 引擎,为浏览器的一部分,广泛用于客户端的脚本语言,最早是在 HTML 网页上使用,用来给 HTML 网页增加动态功能。

    730 引用 • 1284 回帖 • 1 关注
  • Ubuntu

    Ubuntu(友帮拓、优般图、乌班图)是一个以桌面应用为主的 Linux 操作系统,其名称来自非洲南部祖鲁语或豪萨语的“ubuntu”一词,意思是“人性”、“我的存在是因为大家的存在”,是非洲传统的一种价值观,类似华人社会的“仁爱”思想。Ubuntu 的目标在于为一般用户提供一个最新的、同时又相当稳定的主要由自由软件构建而成的操作系统。

    127 引用 • 169 回帖
  • 数据库

    据说 99% 的性能瓶颈都在数据库。

    346 引用 • 760 回帖