开发者

linux环境kafka安装及配置方式

目录
  • 下载资源
  • 安装zookeeper
    • kafka安装及配置
      • kafka安装(单体)
      • kafka集群配置方式
      • kafka开启kerberos认证
      • kafka自带zookeeper使用
      • kafka常用命令
  • 总结

    下载资源

    1、linux环境安装kafka,需要预先准备相关资源,我使用的是kafka_2.12-2.5.1版本,下载路径为:http://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz,也可以通过命令wget http://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz进行资源获取;

    2、获取并安装zookeeper:(以apache-zookeeper-3.6.1-bin.tar.gz为例),官网:https://zookeeper.apache.org/。

    3、将下载好的kafka及zookeeper压缩包上传到虚拟机服务器,放置到/usr/local/目录中:

    linux环境kafka安装及配置方式

    安装zookeeper

    1、解压apache-zookeeper-3.6.1-bin.tar.gz压缩包,并重命名

    #进入到压缩包存放路径
    cd /usr/local/
    #解压
    tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
    #重命名
    mv apache-zookeeper-3.6.1-binphp zookeeper-3.6.1

    2、配置启动

    #进入配置目录
    cd zookeeper-3.6.1/conf/
    #复制配置文件,不直接修改源文件用于备份使用
    cp zoo_sample.cfg zoo.cfg
    #编辑zoo.cfg文件
    vi zoo.cfg

    修改zoo.cfg中的内容为,主要修改dataDir路径以及端口号:

    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just
    # example sakes.
    dataDir=/usr/local/zookeeper-3.6.1/data/
    # the port at which the clients will connect
    clientPort=2181
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1
    
    ## Metrics Providers编程
    #
    # https://prometheus.io Metrics Exporter
    #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
    #metricsProvider.httpPort=7000
    #metricsProvider.exportJvmInfo=true
    

    3、启动zookeeper

    sh /usr/local/zookeeper-3.6.1/bin/zkServer.sh start 

    linux环境kafka安装及配置方式

    4、查看zookeeper状态

    sh /usr/local/zookeeper-3.6.1/bin/zkServer.sh status

    linux环境kafka安装及配置方式

    另:停止zookeeper:

    sh /usr/local/zookeeper-3.6.1/bin/zkServer.sh stop

    linux环境kafka安装及配置方式

    以上为单体zookeeper安装及启动过程,以下为zookeeper集群的搭建方式:

    与单体的搭建方式类似,只需要重复类似单体的部署模式,集群最少使用3台,以下大概介绍相应的部署方式:

    1、首先将解压好的文件夹复制并重命名

    #创建集群目录
    mkdir zookeeper-cluster
    #将解压好的zookeeper目录复制到集群目录中
    cp -r zookeeper-3.6.1 zookeeper-cluster/
    #进入集群目录中
    cd zookeeper-cluster/
    #重命名复制过来的目录为节点1目录,同理复制出节点2和节点3的目录
    mv zookeeper-3.6.1 zookeeper-1
    cp -r zookeeper-1 ./zookeeper-2
    cp -r zookeeper-1 ./zookeeper-3

    以下为复制完成后的目录结构:

    linux环境kafka安装及配置方式

    2、分别配置相关的配置文件,注意节点名称及dataDir路径不要冲突

    vi zookeeper-1/conf/zoo.cfg
    vi zookeeper-2/conf/zoo.cfg
    vi zookeeper-3/conf/zoo.cfg

    3、分别创建myid文件到data目录中

    echo "1" > zookeeper-1/data/myid
    echo "2" > zookeeper-2/data/myid
    echo "3" > zookeeper-3/data/myid

    修改每个节点中的dataDir,clientPort的值,并增加节点之间的关联属性,以下是节点1的示例,其他节点以此类推:

    linux环境kafka安装及配置方式

    配置完成后分别启动三个节点:

    /usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start
    /usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start
    /usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start

    集群启动成功

    linux环境kafka安装及配置方式

    连接测试,zkCli.sh脚本可以连接集群测试是否启动成功:

    linux环境kafka安装及配置方式

    如果(kerberos服务已经安装并配置完成)开启Kerberos认证需要进行以下操作(以单体zookeeper为例):

    1、生成keytab文件:

    #登录kerberos的命令行界面
    kadmin.local
    #生成随机密码
    addprinc -randkey zookeeper/hadoop.test.com@TEST.COM
    #生成keytab文件
     ktadd -k /etc/security/keytabs/zookeeper.keytab zookeeper/hadoop.test.com@TEST.COM
     #退出命令行
     exit
     #查看生成的keytab文件的用户
     klist -ket /etc/security/keytabs/zookeeper.keytab

    以下是执行过程示例:

    linux环境kafka安装及配置方式

    2、生成jaas文件:

    vi /usr/local/zookeeper-3.6.1/conf/jaas.conf

    jaas.conf文件内容,注意keyTab属性位置及principal用户名配置正确

    Server{
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/zookeeper.keytab"
    principal="zookeeper/hadoop.test.com@TEST.COM"
    useTicketCache=false;
    };
    
    Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/etc/security/keytabs/zookeeper.keytab"
    storeKey=true
    useTicketCache=false
    principal="zookeeper/hadoop.test.com@TEST.COM";
    };
    

    3、修改配置文件zoo.cfg添加配置,在结尾加入以下配置:

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000

    4、添加Java.env文件,并写入相关的内容,注意-Djava.security.auth.login.config为生成的jaas.conf文件的路径:

    echo 'export JVMFLAGS=" -Dsun.security.krb5.debug=true -Djava.security.auth.login.config=/usr/local/zookeeper-3.6.1/conf/jaas.conf"' > /usr/local/zookeeper-3.6.1/conf/java.env

    5、启动验证,启动成功,并连接成功

    linux环境kafka安装及配置方式

    kafka安装及配置

    kafka安装(单体)

    1、解压安装包

    #进入压缩包存放路径
    cd /usr/local/
    #解压压缩包
    tar -zxvf kafka_2.12-2.5.1.tgz
    #进入解压后目录中
    cd kafka_2.12-2.5.1

    linux环境kafka安装及配置方式

    2、修改配置

    #修改服务配置文件
    vi /usr/local/kafka_2.12-2.5.1/config/server.properties

    配置中的内容需要配置:

    listeners=PLAINTEXT://192.168.4.130:9092
    zookeeper.connect=192.168.4.130:2181

    配置完成后(前提:zookeeper已正常启动),即可启动

    服务端启动命令(不加-daemon前台启动,关闭即停止服务,加了-daemon后台启动):

    #启动命令
    /usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/server.properties
    #停止命令
    /usr/local/kafka_2.12-2.5.1/bin/zookeeper-server-stop.sh

    查看服务日志命令:

    tail -f /usr/local/kafka_2.12-2.5.1/logs/server.log

    linux环境kafka安装及配置方式

    以上是kafka单体的配置及启动方式。

    kafka集群配置方式

    本次主要记录同一台服务器上搭建集群,类似于单体的搭建方式,我们只需要配置每个节点的配置文件,然后分别启动即可,如果是分不同的服务器搭建,类似于每台服务器上搭建单体,然后在配置文件中增加相应集群相关的配置项即可使用了,以下默认已经将压缩包解压好后的操作。

    1、复制server.properties文件并重命名,复制出3(集群最好大于等于三个节点)份来

    #复制配置文件
    cp /usr/local/kafka_2.12-2.5.1/config/server.properties /usr/local/kafka_2.12-2.5.1/config/server-1.properties
    cp /usr/local/kafka_2.12-2.5.1/config/server.properties /usr/local/kafka_2.12-2.5.1/config/server-2.properties
    cp /usr/local/kafka_2.12-2.5.1/config/server.properties /usr/local/kafka_2.12-2.5.1/config/server-3.properties
    #编辑配置文件
    vi /usr/local/kafka_2.12-2.5.1/config/server-1.properties
    vi /usr/local/kafka_2.12-2.5.1/config/server-2.properties
    vi /usr/local/kafka_2.12-2.5.1/config/server-3.properties

    之后分别配置复制出来的server-1.properties,server-2.properties,server-3.properties三个配置文件,分别配python置以下配置

    #节点id,不同的节点用不同的数字表示
    broker.id=1
    #对外的ip及端口,端口号每个文件不要用同一个,我使用的分别是9091,9092,9093
    listeners=PLAINTEXT://192.168.4.130:9091
    #数据存放位置,每个节点一个如/kafka-logs-1,/kafka-logs-2,/kafka-logs-3等,不同节点使用文件不可重复,如果重复了容易启动失败
    log.dirs=/usr/local/kafka_2.12-2.5.1/data/cluster/kafka-logs-1
    #填zookeeper的地址,多个用,隔开
    zookeeper.connect=192.168.4.130:2181

    分别配置好之后可以使用启动命令分别启动相关节点

    #启动节点1
    /usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/server-1.properties
    #启动节点2
    /usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/server-2.properties
    #启动节点3
    /usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/server-3.properties
    #查看服务日志
    tail -f /usr/locajsl/kafka_2.12-2.5.1/logs/server.log

    启动成功日志:

    linux环境kafka安装及配置方式

    注:有时存在kafka启动失败报zookeeper连接超时拒绝连接时,可能引起的原因是防火墙没有关闭,关闭防火墙的命令,也有可能是/etc/hosts文件配置不对引起的,遇到该问题可以多向考虑。

    #方式一
    #停止防火墙
    service firewalld stop
    #禁用防火墙
    systemctl disable firewalld
    #方式二:
    chkconfig iptables off

    另:如果之前启动过zookeeper或者kafka,但是数据目录没有清除过的话也会影响我们的启动,一定要仔细核对好。

    kafka开启kerberos认证

    如果我们搭建的kafka(单体或集群)需要开启kerberos认证,可以在安装的时候这样配置:

    1、生成keytab文件:

    #登录kerberos的命令行界面
    kadmin.local
    #生成随机密码
    addprinc -randkey kafka/hadoop.test.com@TEST.COM
    #生成keytab文件
     ktadd -k /etc/security/keytabs/kafka.keytab kafka/hadoop.test.com@TEST.COM
     #退出命令行
     exit
     #查看生成的keytab文件的用户
     klist -ket /etc/security/keytabs/kafka.keytab

    以下是生成过程示例:

    linux环境kafka安装及配置方式

    2、生成jaas文件:

    vi /usr/local/kafka_2.12-2.5.1/config/jaas.conf

    jaas.conf文件内容,注意keyTab属性位置及principal用户名配置正确

    KafkaServer{
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="/etc/security/keytabs/kafka.keytab"
    principal="kafka/hadoop.test.com@TEST.COM";
    };
    KafkaClient{
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="/etc/security/keytabs/kafka.keytab"
    principal="kafka/hadoop.test.com@TEST.COM"
    userTicketCache=true;
    };
    Client{
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    serviceName="kafka"
    keyTab="/etc/security/keytabs/kafka.keytab"
    principal="kafka/hadoop.test.com@TEST.COM"
    userTicketCache=true;
    };

    3、修改kafka的配置文件,如果是单体仅需要修改一个,如果是集群,则需要修改每个节点对应的配置文件:

    vi /usr/local/kafka_2.12-2.5.1/config/server.properties

    配置文件中添加(配置)以下的属性(非kerberos配置时需要的配置默认需要配置好)

    listeners=SASL_PLAINTEXT://172.168.4.130:9093
    advertised.listeners=SASL_PLAINTEXT://172.168.4.130:9093
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=GSSAPI
    sasl.enabled.mechanisms=GSSAPI
    sasl.kerberos.service.name=kafka
    kafka.security.protocol=SASL_PLAINTEXT
    super.users=User:kafka

    4、修改kafka服务启动脚本,配置相关的jaas文件路径

    vi /usr/local/kafka_2.12-2.5.1/bin/kafka-server-start.sh

    添加以下的内容:

    export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/kafka_2.12-2.5.1/config/jaas.conf"

    添加示例:

    linux环境kafka安装及配置方式

    注意:如果kafka连接时,生产者或消费者连接开启kerberos认证的kafka服务器时,需要在相应的脚本中也同样添加该配置

    #生产者连接脚本配置
    vi /usr/local/kafka_2.12-2.5.1/bin/kafka-console-producer.sh
    #消费者连接脚本配置
    vi /usr/local/kafka_2.12-2.5.1/bin/kafka-console-consumer.sh
    #topic连接脚本配置
    vi /usr/local/kafka_2.12-2.5.1/bin/kafka-topics.sh

    同时消费者或生产者的配置文件中需要增加以下配置:

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
    sasl.kerberos.service.name=kafka

    添加示例(以生产者为例):

    linux环境kafka安装及配置方式

    启动kafka服务,连接并使用,参考普通kafka启动及连接相关命令。

    kafka自带zookeeper使用

    注:kafka安装包中也自带了zookeeper,如果不想安装zookeeper,可以使用kafka安装包中自带的zookeeper。

    如果使用kafka安装包中带的zookeeper,需要配置解压后目录中的zookeeper.properties

    vi /usr/local/kafka_2.12-2.5.1/config/zookeeper.properties

    如果不需要修改端口可以默认不修改,使用命令启动自带zookeeper:

    #启动zookeeper
    /usr/local/kafka_2.12-2.5.1/bin/zookeeper-server-start.sh -daemon /usr/local/kafka_2.12-2.5.1/config/zookeeper.properties
    #停止zookeeper
    /usr/local/kafka_2.12-2.5.1/bin/zookeeper-server-stop.sh

    如果zookeeper需要开启kerberos认证需要给zookeeper.properties添加配置:

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000

    在启动脚本zookeeper-server-start.sh中添加如下配置,注意jaas.conf文件的路径,jaas文件生成方式同上。

    export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/local/zookeeper-3.6.1/conf/jaas.conf"

    启动命令如上。

    kafka常用命令

    #创建主题
    /usr/local/kafka_2.12-2.5.1/bin/kafka-topics.sh --create --zookeeper 192.168.4.130:2181 --replication-factor 1 --partitions 3 --topic test01
    #查看主题列表
    /usr/local/kafka_2.12-2.5.1/bin/kafka-topics.sh --list --zookeeper 192.168.4.130:2181/kafka
    #生产者连接并生产数据
    /usr/local/kafka_2.12-www.devze.com2.5.1/bin/kafka-console-producer.sh --broker-list 192.168.4.130:9092  --topic test01 --producer.config /usr/local/kafka_2.12-2.5.1/config/producer.properties
    #消费者连接并消费数据
    /usr/local/kafka_2.12-2.5.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.4.130:9092 --topic test01 --from-beginning --consumer.config  /usr/local/kafka_2.12-2.5.1/config/consumer.properties
    #kafka添加消息写入partition时间戳的方法
    #Kafka消息的时间戳,在消息中增加了一个时间戳字段和时间戳类型。目前支持的时间戳类型有两种: CreateTime 和 LogAppendTime 前者表示producer创建这条消息的时间;后者表示broker接收到这条消息的时间(严格来说,是leader broker将这条消息写入到log的时间)
    /usr/local/kafka_2.12-2.5.1/bin/kafka-topics.sh --alter --topic test01 --zookeeper 192.168.4.130:2181 --config  message.timestamp.type=LogAppendTime
    /usr/local/kafka_2.12-2.5.1/bin/kafka-console-consumer.sh --bootstrap-server 192.168.4.130:9092 --topic test01 --from-beginning --consumer.config  /usr/local/kafka_2.12-2.5.1/config/consumer.properties --property print.timestamp=true
    #删除主题内的消息:kafka-delete-records --bootstrap-server <broker-host:port> --offset-json-file offsets.json
    #–bootstrap-server:需要连接的 brokers 地址;
    #–offset-json-file:包含删除配置的 Json 文件。
    /usr/local/kafka_2.12-2.5.1/bin/kafka-delete-records.sh --bootstrap-server 192.168.4.130:9092 --offset-json-file /usr/local/kafka_2.12-2.5.1/remove.json
    #删除附加:移除kerberos开启的server中的数据,同样需要在相关配置文件中配置kerberos相关的配置,以及脚本中增加相关的配置
    /usr/local/kafka_2.12-2.5.1/bin/kafka-delete-records.sh --bootstrap-server 192.168.4.130:9092 --command-config /usr/local/kafka_2.12-2.5.1/config/delete-kerb.properties  --offset-json-file /usr/local/kafka_2.12-2.5.1/remove.json

    关于移除数据的remove.json配置文件内容:

    {
       "partitions": [
                      {"topic": "test01", "partition": 0, "offset": -1}
                     ],
                     "version":1
     }
    • topic:待删除数据主题
    • partition:待删除的分区
    • offset:删除起始偏移量,设置为 -1,表示将删除主题中所有数据。

    linux环境kafka安装及配置方式

    总结

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新运维

    运维排行榜