zookeeper + kafka + storm 集群搭建

本贴最后更新于 3076 天前,其中的信息可能已经事过境迁

.首先:需要三台测试机器(由于zookeeper 的选举机制,官方推荐是3台,并且是奇数台机器,{1台机器多个端口也可以})

192.168.12.28
192.168.12.151
192.168.12.152
 
环境及版本
jdk : java version "1.7.0_79"
os : fedora --x86_64-22-3
zookeeper :3.4.6
kafka:2.11-0.9.0.0
storm:0.10.0
使用:连续加号(+++++)分隔配置文件内容和正文
 
 
1.搭建zookeeper集群
先到apache 的zookeeper 项目中下载包
文档地址:http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html
包地址:http://www.apache.org/dyn/closer.cgi/zookeeper/
3.4.6 url:http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
 
下载包到测试机,解压 tar -zxvf zookeeper-3.4.6.tar.gz 
先进入conf 目录 配置 zoo.cfg,如下
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/usr/local/zookeeper-3.4.6/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#这连接客户端包括(比如kafka。strom等连接,所以请注意这个连接数不要太小,导致部署失败,或者客户端连接失败)
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature192
#autopurge.purgeInterval=1
##这是zookeeper 机集群地址。第一个端口是集群之间通信的端口(监听端口,和通信端口和选举端口不能重复,否则报错地址已用),第二个是选举leader时使用的
server.1=192.168.12.28:2888:3888
server.2=192.168.12.151:2888:3888
server.3=192.168.12.152:2888:3888
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
按这个配置,配置3台测试机器
 
到bin 目录启动zookeeper 集群:
 ./zkServer.sh  start     
查看集群状态
 ./zkServer.sh  status
 
mode:leader  说明他是leader 否则是follower
leader  挂掉后,集群会自动选举新的leader
 
在3台机器重复此操作
 
使用client 连接zookeeper集群(集群中启动的任意一台机器都可以)
./zkCli.sh --server192.168.12.28:2181
 
ls /                         查看根目录
create /test   this is test dir     创建目录
 
到此,zookeeper    集群搭建完毕
这是一写zookeeper 的配置信息

broker.id

整数,建议根据ip区分

 

log.dirs

kafka存放消息文件的路径,

默认/tmp/kafka-logs

port

broker用于接收producer消息的端口

 

zookeeper.connnect

zookeeper连接

格式为  ip1:port,ip2:port,ip3:port

message.max.bytes

单条消息的最大长度

 

num.network.threads

broker用于处理网络请求的线程数

如不配置默认为3,server.properties默认是2

num.io.threads

broker用于执行网络请求的IO线程数

如不配置默认为8,server.properties默认是2可适当增大,

queued.max.requests

排队等候IO线程执行的requests

默认为500

host.name

broker的hostname

默认null,建议写主机的ip,不然消费端不配置hosts会有麻烦

num.partitions

topic的默认分区数

默认1

log.retention.hours

消息被删除前保存多少小时

默认1周168小时

auto.create.topics.enable

是否可以程序自动创建Topic

默认true,建议false

default.replication.factor

消息备份数目

默认1不做复制,建议修改

num.replica.fetchers

用于复制leader消息到follower的IO线程数

默认1

 
 
 
 
 
 
2.搭建 kafka 集群
文档地址:http://kafka.apache.org/documentation.html#quickstart
包地址:
tar -xzf kafka_2.11-0.9.0.0.tgz
修改 config/server.properties
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# The id of the broker. This must be set to a unique integer for each broker.
##必须唯一
broker.id=0
 
############################# Socket Server Settings #############################
#客户端连接的时候请按照此地址连接, 同一个地址,不同表示方式会导致生产和消费 的使用异常
listeners=PLAINTEXT://192.168.12.28:9092
 
# The port the socket server listens on
##客户端连接kafka的端口
#port=9092
 
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
 
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>
 
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>
 
# The number of threads handling network requests
num.network.threads=3
 
# The number of threads doing disk I/O
num.io.threads=8
 
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
 
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
 
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
 
 
############################# Log Basics #############################
 
# A comma seperated list of directories under which to store log files
//这个不要设置到机器的临时目录,否则启动可能会报错
log.dirs=/usr/local/kafka_2.11-0.9.0.0/data
 
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
 
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
 
############################# Log Flush Policy #############################
 
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
 
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
 
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
 
############################# Log Retention Policy #############################
 
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
 
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
 
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
 
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
 
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
 
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
 
############################# Zookeeper #############################
 
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
##kafka 是基于 zookeeper 的,保存kafka的数据信息、配置,读取偏移等
zookeeper.connect=192.168.12.28:2181,192.168.12.151:2181,192.168.12.152:2181
 
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
 
把此配置应用到3台测试机,注意:broker.id不能唯一
 
进入 bin 目录
启动 kafka 后面的参数是kafka 的配置文件目录,启动失败会立即报错
./kafka-server-start.sh  ../config/server.properties
 
启动3台kafka集群
 
测试kafka集群:
先创建一个test主题,
./kafka-topics.sh --create --zookeeper 192.168.12.28:2181 --replication-factor 1 --partitions 1 --topic test
查看创建的主题信息 
./kafka-topics.sh --zookeeper 192.168.12.28:2181  --describe  --topic testtopic
====================================================================
Topic:testtopic          PartitionCount:1           ReplicationFactor:1                   Configs:
 Topic: testtopic          Partition: 0                Leader: 4                                    Replicas: 4                       Isr: 4
====================================================================
Partition : 分区
L eader :负责读写指定分区的节点
Replicas : 复制该分区log的节点列表
Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
 
通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为testtopic的Topic中生产消息,执行如下脚本:
bin/kafka-console-producer.sh --broker-list 192.168.12.28:9092,192.168.12.151:9092,192.168.12.152:9092 --topic testtopic
 
在另一个终端,启动Consumer,并订阅我们上面创建的名称为testtopic5的Topic中生产的消息,执行如下脚本
bin/kafka-console-consumer.sh --zookeeper 192.168.12.28:2181,192.168.12.151:2181,192.168.12.152:2181 --from-beginning --topic testtopic
 
可以在Producer终端上输入字符串消息行,然后回车(一行一条数据),就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。
 
到此,kafka集群搭建完毕(具体详细的参数配置请查看文档)
 
 
 
 
 
 
3.搭建storm 集群
文档地址:http://storm.apache.org/documentation.html
包地址 : http://storm.apache.org/downloads.html
0.10.0 : http://124.202.164.11/files/4168000007207070/mirrors.cnnic.cn/apache/storm/apache-storm-0.10.0/apache-storm-0.10.0.tar.gz
 
tar -zxvf apache-storm-0.10.0.tar.gz
cd apache-storm-0.10.0/conf
 
修改配置 storm.yaml
1)storm 依赖 zookeeper

如果Zookeeper集群使用的不是默认端口,那么还需要storm.zookeeper.port选项。

2) storm.local.dir: Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给以足够的访问权限。然后在storm.yaml中配置该目录,如:

storm.local.dir: "/home/admin/storm/workdir"   

3) java.library.path: Storm使用的本地库(ZMQ和JZMQ)加载路径,默认为”/usr/local/lib:/opt/local/lib:/usr/lib”,一般来说ZMQ和JZMQ默认安装在/usr/local/lib 下,因此不需要配置即可。

4) nimbus.host: Storm集群Nimbus机器地址(存在单点问题),各个Supervisor工作节点需要知道哪个机器是Nimbus,以便下载Topologies的jars、confs等文件

5) supervisor.slots.ports: 对于每个Supervisor工作节点,需要配置该工作节点可以运行的worker数量。每个worker占用一个单独的端口用于接收消息,该配置选项即用于定义哪些端口是可被worker使用的。默认情况下,每个节点上可运行4个workers,分别在6700、6701、6702和6703端口,如:supervisor.slots.ports:- 6700- 6701- 6702- 6703


+++++++++++++++++++++++++++++++++++++++++++++++
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
     - "192.168.12.28"
     - "192.168.12.151"
     - "192.168.12.152"
 
nimbus.host: "192.168.12.28"
 
storm.local.dir: "/usr/local/apache-storm-0.10.0/workdata"
 
supervisor.slots.ports:
    - 6700
    - 6701
 
#
#
# ##### These may optionally be filled in:
#    
## List of custom serializations
# topology.kryo.register:
#     - org.mycompany.MyType
#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer
#
## List of custom kryo decorators
# topology.kryo.decorators:
#     - org.mycompany.MyDecorator
#
## Locations of the drpc servers
# drpc.servers:
#     - "server1"
#     - "server2"
 
## Metrics Consumers
# topology.metrics.consumer.register:
#   - class: "backtype.storm.metric.LoggingMetricsConsumer"
#     parallelism.hint: 1
#   - class: "org.mycompany.MyMetricsConsumer"
#     parallelism.hint: 1
#     argument:
#       - endpoint: "metrics-collector.mycompany.org"
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 
三台测试机的配置相同即可,现在启动storm 
cd ../bin/
./storm nimbus   启动主节点//指定的主节点机器启动
./storm supervisor  启动工作子节点
./storm  ui     启动storm 自带的监控UI, 使用host:8080访问
 
 
自此,storm 集群搭建完毕
 
4.kafka + storm 继承
       刚发现这竟然没写,周末补
 
  • Storm
    4 引用 • 8 回帖 • 1 关注
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    36 引用 • 35 回帖
  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 14 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • 工具

    子曰:“工欲善其事,必先利其器。”

    288 引用 • 734 回帖
  • PostgreSQL

    PostgreSQL 是一款功能强大的企业级数据库系统,在 BSD 开源许可证下发布。

    22 引用 • 22 回帖
  • 分享

    有什么新发现就分享给大家吧!

    248 引用 • 1795 回帖
  • Solo

    Solo 是一款小而美的开源博客系统,专为程序员设计。Solo 有着非常活跃的社区,可将文章作为帖子推送到社区,来自社区的回帖将作为博客评论进行联动(具体细节请浏览 B3log 构思 - 分布式社区网络)。

    这是一种全新的网络社区体验,让热爱记录和分享的你不再感到孤单!

    1435 引用 • 10056 回帖 • 489 关注
  • SpaceVim

    SpaceVim 是一个社区驱动的模块化 vim/neovim 配置集合,以模块的方式组织管理插件以
    及相关配置,为不同的语言开发量身定制了相关的开发模块,该模块提供代码自动补全,
    语法检查、格式化、调试、REPL 等特性。用户仅需载入相关语言的模块即可得到一个开箱
    即用的 Vim-IDE。

    3 引用 • 31 回帖 • 105 关注
  • jsoup

    jsoup 是一款 Java 的 HTML 解析器,可直接解析某个 URL 地址、HTML 文本内容。它提供了一套非常省力的 API,可通过 DOM,CSS 以及类似于 jQuery 的操作方法来取出和操作数据。

    6 引用 • 1 回帖 • 484 关注
  • 爬虫

    网络爬虫(Spider、Crawler),是一种按照一定的规则,自动地抓取万维网信息的程序。

    106 引用 • 275 回帖 • 1 关注
  • Gzip

    gzip (GNU zip)是 GNU 自由软件的文件压缩程序。我们在 Linux 中经常会用到后缀为 .gz 的文件,它们就是 Gzip 格式的。现今已经成为互联网上使用非常普遍的一种数据压缩格式,或者说一种文件格式。

    9 引用 • 12 回帖 • 147 关注
  • Typecho

    Typecho 是一款博客程序,它在 GPLv2 许可证下发行,基于 PHP 构建,可以运行在各种平台上,支持多种数据库(MySQL、PostgreSQL、SQLite)。

    12 引用 • 65 回帖 • 445 关注
  • 七牛云

    七牛云是国内领先的企业级公有云服务商,致力于打造以数据为核心的场景化 PaaS 服务。围绕富媒体场景,七牛先后推出了对象存储,融合 CDN 加速,数据通用处理,内容反垃圾服务,以及直播云服务等。

    27 引用 • 225 回帖 • 163 关注
  • 996
    13 引用 • 200 回帖 • 10 关注
  • Flume

    Flume 是一套分布式的、可靠的,可用于有效地收集、聚合和搬运大量日志数据的服务架构。

    9 引用 • 6 回帖 • 637 关注
  • V2Ray
    1 引用 • 15 回帖 • 1 关注
  • TGIF

    Thank God It's Friday! 感谢老天,总算到星期五啦!

    288 引用 • 4485 回帖 • 663 关注
  • 智能合约

    智能合约(Smart contract)是一种旨在以信息化方式传播、验证或执行合同的计算机协议。智能合约允许在没有第三方的情况下进行可信交易,这些交易可追踪且不可逆转。智能合约概念于 1994 年由 Nick Szabo 首次提出。

    1 引用 • 11 回帖 • 2 关注
  • Caddy

    Caddy 是一款默认自动启用 HTTPS 的 HTTP/2 Web 服务器。

    12 引用 • 54 回帖 • 159 关注
  • Docker

    Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的操作系统上。容器完全使用沙箱机制,几乎没有性能开销,可以很容易地在机器和数据中心中运行。

    492 引用 • 926 回帖
  • 反馈

    Communication channel for makers and users.

    123 引用 • 913 回帖 • 250 关注
  • NetBeans

    NetBeans 是一个始于 1997 年的 Xelfi 计划,本身是捷克布拉格查理大学的数学及物理学院的学生计划。此计划延伸而成立了一家公司进而发展这个商用版本的 NetBeans IDE,直到 1999 年 Sun 买下此公司。Sun 于次年(2000 年)六月将 NetBeans IDE 开源,直到现在 NetBeans 的社群依然持续增长。

    78 引用 • 102 回帖 • 683 关注
  • Kubernetes

    Kubernetes 是 Google 开源的一个容器编排引擎,它支持自动化部署、大规模可伸缩、应用容器化管理。

    110 引用 • 54 回帖 • 1 关注
  • 阿里云

    阿里云是阿里巴巴集团旗下公司,是全球领先的云计算及人工智能科技公司。提供云服务器、云数据库、云安全等云计算服务,以及大数据、人工智能服务、精准定制基于场景的行业解决方案。

    89 引用 • 345 回帖
  • CSS

    CSS(Cascading Style Sheet)“层叠样式表”是用于控制网页样式并允许将样式信息与网页内容分离的一种标记性语言。

    196 引用 • 540 回帖 • 1 关注
  • 区块链

    区块链是分布式数据存储、点对点传输、共识机制、加密算法等计算机技术的新型应用模式。所谓共识机制是区块链系统中实现不同节点之间建立信任、获取权益的数学算法 。

    91 引用 • 751 回帖 • 1 关注
  • SVN

    SVN 是 Subversion 的简称,是一个开放源代码的版本控制系统,相较于 RCS、CVS,它采用了分支管理系统,它的设计目标就是取代 CVS。

    29 引用 • 98 回帖 • 694 关注
  • SEO

    发布对别人有帮助的原创内容是最好的 SEO 方式。

    35 引用 • 200 回帖 • 27 关注
  • Electron

    Electron 基于 Chromium 和 Node.js,让你可以使用 HTML、CSS 和 JavaScript 构建应用。它是一个由 GitHub 及众多贡献者组成的活跃社区共同维护的开源项目,兼容 Mac、Windows 和 Linux,它构建的应用可在这三个操作系统上面运行。

    15 引用 • 136 回帖
  • Hibernate

    Hibernate 是一个开放源代码的对象关系映射框架,它对 JDBC 进行了非常轻量级的对象封装,使得 Java 程序员可以随心所欲的使用对象编程思维来操纵数据库。

    39 引用 • 103 回帖 • 715 关注