环境
一共三台测试机
– 操作系统: centos7 ; hostname: c1 ; ip: 192.168.33.21
– 操作系统: centos7 ; hostname: c2 ; ip: 192.168.33.22
– 操作系统: centos7 ; hostname: c3 ; ip: 192.168.33.23
zookeeper
安装
cd ~
mkdir -p download && cd download
wget -c http://www.eu.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
tar xvzf zookeeper-3.4.6.tar.gz
mkdir -p /usr/local/server
cp -r zookeeper-3.4.6 /usr/local/server/zookeeper
cd /usr/local/server/zookeeper/conf
touch zoo.cfg
打开 zoo.cfg
添加
tickTime=2000
dataDir=/data/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
server.1=c1:2888:3888
server.2=c2:2888:3888
server.3=c3:2888:3888
打开 /etc/hosts
添加
192.168.33.20 c1
192.168.33.21 c2
192.168.33.22 c3
创建数据目录
mkdir -p /data/zookeeper
创建myid文件, id 与 zoo.cfg 中的序号对应
echo 1 > /data/zookeeper/my.id
常用命令
启动
/usr/local/server/zookeeper/bin/zkServer.sh start
重启
/usr/local/server/zookeeper/bin/zkServer.sh restart
关闭
/usr/local/server/zookeeper/bin/zkServer.sh stop
在其中一台用客户端连接
/usr/local/server/zookeeper/bin/zkCli.sh -server c1:2181
查看状态
/usr/local/server/zookeeper/bin/zkServer.sh status
测试
在 c1 上连接 c1 上的 zookeeper
/usr/local/server/zookeeper/bin/zkCli.sh -server c1:2181
添加一个根节点目录
create /project xxxx
在 c2 上连接 c2 上的 zookeeper
/usr/local/server/zookeeper/bin/zkCli.sh -server c2:2181
执行
get /project
看是否能得到数据,如果有 zookeeper 集群就完成了
备注
- 注意要关闭防火墙
kafka
Producer在发布消息到某个Partition时,先通过Zookeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。在将来的版本中,Kafka会考虑提供更高的持久性。
Consumer读消息也是从Leader读取,只有被commit过的消息(offset低于HW的消息)才会暴露给Consumer。
Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)
安装
cd ~/download
wget -c http://ftp.jaist.ac.jp/pub/apache/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
cp -r kafka_2.10-0.8.2.2 /usr/local/server/kafka
cd /usr/local/server/kafka/conf
mkdir -p /data/storm
在 c1 上
打开 server.properties
,
修改以下项
borker=1
host.name=c1
zookeeper.connect=c1:2181,c2:2181,c3:2181
c2 上
borker=2
host.name=c2
zookeeper.connect=c1:2181,c2:2181,c3:2181
c3 上
borker=3
host.name=c3
zookeeper.connect=c1:2181,c2:2181,c3:2181
保存
常用命令
启动
/usr/local/server/kafka/bin/kafka-server-start.sh -daemon /usr/local/server/kafka/config/server.properties
关闭
/usr/local/server/kafka/bin/kafka-server-stop.sh
创建 topic
/usr/local/server/kafka/bin/kafka-topics.sh --create --topic log --partitions 3 --zookeeper c1:2181/kafka --replication-factor 1
列出 topic
/usr/local/server/kafka/bin/kafka-topics.sh --list --zookeeper c1:2181/kafka
列出 topic 详细信息
/usr/local/server/kafka/bin/kafka-topics.sh --describe --zookeeper c1:2181/kafka
生产数据
/usr/local/server/kafka/bin/kafka-console-producer.sh --broker-list c1:9092 --topic topic1
消费数据
/usr/local/server/kafka/bin/kafka-console-consumer.sh --zookeeper c2:2181 --topic topic1
测试
/usr/local/server/kafka/bin/kafka-topics.sh --create --topic test1 --partitions 1 --zookeeper c1:2181 --replication-factor 1
/usr/local/server/kafka/bin/kafka-topics.sh --create --topic test2 --partitions 3 --zookeeper c1:2181 --replication-factor 1
/usr/local/server/kafka/bin/kafka-topics.sh --create --topic test3 --partitions 1 --zookeeper c1:2181 --replication-factor 3
/usr/local/server/kafka/bin/kafka-topics.sh --create --topic test4 --partitions 3 --zookeeper c1:2181 --replication-factor 3
删除topic
登录 zookeeper,删除 /kafka/brokers/topics/topic1
/kafka/config/topics/test1
删除 kafka 日志目录 /tmp/kafka-logs
下对应的 topic 分区文件
storm
安装
cd ~/download
wget -c http://ftp.jaist.ac.jp/pub/apache/storm/apache-storm-0.9.6/apache-storm-0.9.6.tar.gz
tar xvzf apache-storm-0.9.6.tar.gz
cp -r apache-storm-0.9.6 /usr/local/server/storm
cd /usr/local/server/storm/conf
mkdir -p /data/storm
我们把 c1 做为 nimbus,c2 c3 做为 supervisord,所以配置如下
打开 storm.yaml
修改 storm.zookeeper.servers
为
storm.zookeeper.servers:
- "c1"
- "c2"
- "c3"
修改 nimbus.host
为
nimbus.host: "c1"
添加 storm.local.dir=/data/storm
保存
常用命令
启动 nimbus (c1 上)
nohup /usr/local/server/storm/bin/storm nimbus >/dev/null 2>&1 &
启动 Supervisor (c2,c3 上)
nohup /usr/local/server/storm/bin/storm supervisor >/dev/null 2>&1 &
启动 storm ui (c1 上)
nohup /usr/local/server/storm/bin/storm ui >/dev/null 2>&1 &
访问 storm ui 可以通过 http://c1:8080/
关闭 storm
kill -9 `ps aux | grep storm | grep -v grep | awk '{print $2}'` > /dev/null 2>&1 &
提交 topology
./storm jar ~/Workdir/workspace/storm-demo/target/demo-1.0-SNAPSHOT.jar storm_demo.demo.App pro -c nimbus.host=c1
转载请注明:大后端 » zookeeper kafka storm 集群安装与使用