Http2kafka

1.功能描述

能够接收http1.1 post消息,通过连接kafka server, 把数据发送给kafka server。

2.依赖模块

load_module modules/njt_http_kafka_module.so 

3.指令说明

kafka_broker_list

Syntax kafka_broker_list {broker1,broker2}
Default -
Context NJT_HTTP_MAIN_CONF|NJT_CONF_TAKE1

配置kafka broker列表地址,逗号分隔

kafka_sasl_plaintext

Syntax kafka_sasl_plaintext user_name={admin} password={123456}
Default -
Context NJT_HTTP_MAIN_CONF|NJT_CONF_TAKE2

如果是sasl_plaintext 用户名密码验证模式,则配置该指令,设置用户名和密码

kafka_topic

Syntax kafka_topic {topic_name}
Default -
Context `NJT_HTTP_LOC_CONF

配置需要发送的topic 名称

kafka_partition

Syntax kafka_partition [auto|partition_number]
Default kafka_partition auto
Context `NJT_HTTP_LOC_CONF

配置需要发送的partition分区, 默认为auto,不指定特定分区,eg:

    kafka_partition auto; # default value
  # kafka_partition 0;
  # kafka_partition 1;

4.配置样例

njet.conf:

...
load_module modules/njt_http_kafka_module.so     #动态加载kafka代理模块
...

http {
    #设置broker list
    kafka_broker_list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094;
    
    #sasl_plaintext 安全模式, 需要设置用户名和密码
    kafka_sasl_plaintext user_name=admin password=123456;    

   server {
            listen 8080;
            server_name localhost;

            location /test1 {
                kafka_topic test-topic;    #设置topic name
            }

            location /test2 {
                kafka_topic test-topic;  #设置topic name
            }
        }
}

5.调用样例

以本地路径/root/tool/ssl_kafka 为示例,说明

新建文件 docker-compose.yaml

version: "3"

networks:
  default:
    name: kafka-cluster-network

services:

  zookeeper:
    image: wurstmeister/zookeeper
    hostname: zookeeper
    container_name: zookeeper
    restart: always
    ports:
      - 2182:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      SERVER_JVMFLAGS: -Djava.security.auth.login.config=/opt/zookeeper-3.4.13/secrets/server_jaas.conf
    volumes:
      - /root/tool/ssl_kafka/volumn/conf:/opt/zookeeper-3.4.13/conf
      - /root/tool/ssl_kafka/volumn/secrets:/opt/zookeeper-3.4.13/secrets/


  kafka1:
    #image: wurstmeister/kafka:2.11-0.11.0.3
    image: wurstmeister/kafka:2.12-2.4.1
    restart: always
    hostname: kafka1
    container_name: kafka1
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://192.168.40.136:9092
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ADVERTISED_HOST_NAME: kafka1
      KAFKA_CREATE_TOPIC: "test-topic:2:2"
      KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
      KAFKA_SUPER_USERS: User:admin
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" #设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问,默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_OPTS: -Djava.security.auth.login.config=/opt/kafka/secrets/server_jaas.conf
    volumes:
      - /root/tool/ssl_kafka/volumn/secrets/:/opt/kafka/secrets/

  kafka2:
    image: wurstmeister/kafka:2.12-2.4.1
    restart: always
    hostname: kafka2
    container_name: kafka2
    depends_on:
      - zookeeper
    ports:
      - 9093:9092
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://192.168.40.136:9093
      KAFKA_ADVERTISED_PORT: 9093
      KAFKA_ADVERTISED_HOST_NAME: kafka2
      KAFKA_CREATE_TOPIC: "test-topic:2:2"
      KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
      KAFKA_SUPER_USERS: User:admin
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" #设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问,默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_OPTS: -Djava.security.auth.login.config=/opt/kafka/secrets/server_jaas.conf
    volumes:
      - /root/tool/ssl_kafka/volumn/secrets/:/opt/kafka/secrets/
  kafka3:
    image: wurstmeister/kafka:2.12-2.4.1
    restart: always
    hostname: kafka3
    container_name: kafka3
    depends_on:
      - zookeeper
    ports:
      - 9094:9092
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ADVERTISED_LISTENERS: SASL_PLAINTEXT://192.168.40.136:9094
      KAFKA_ADVERTISED_PORT: 9094
      KAFKA_ADVERTISED_HOST_NAME: kafka3
      KAFKA_CREATE_TOPIC: "test-topic:2:2"
      KAFKA_LISTENERS: SASL_PLAINTEXT://0.0.0.0:9092
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_PLAINTEXT
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
      KAFKA_SUPER_USERS: User:admin
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" #设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问,默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_OPTS: -Djava.security.auth.login.config=/opt/kafka/secrets/server_jaas.conf
    volumes:
      - /root/tool/ssl_kafka/volumn/secrets/:/opt/kafka/secrets/

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    restart: always
    ports:
        - 10010:8080
    environment:
        - DYNAMIC_CONFIG_ENABLED=true
        - SERVER_SERVLET_CONTEXT_PATH=/kafka-ui
        - KAFKA_CLUSTERS_0_NAME=local
        - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:9092,kafka2:9092,kafka3:9092
        - KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=SASL_PLAINTEXT
        - KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM=PLAIN
        - KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="123456";
    depends_on:
      - zookeeper
      - kafka1
      - kafka2

创建挂载和配置目录volumn

mkdir volumn

在volumn下分别创建 conf secrets 两个文件夹, conf下放配置相关,secrets下放用户名密码配置

conf目录下,创建configuration.xsl log4j.properties zoo.cfg

configuration.xsl

<?xml version="1.0"?>
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0">
<xsl:output method="html"/>
<xsl:template match="configuration">
<html>
<body>
<table border="1">
<tr>
 <td>name</td>
 <td>value</td>
 <td>description</td>
</tr>
<xsl:for-each select="property">
<tr>
  <td><a name="{name}"><xsl:value-of select="name"/></a></td>
  <td><xsl:value-of select="value"/></td>
  <td><xsl:value-of select="description"/></td>
</tr>
</xsl:for-each>
</table>
</body>
</html>
</xsl:template>
</xsl:stylesheet>

log4j.properties :

# Define some default values that can be overridden by system properties
zookeeper.root.logger=INFO, CONSOLE
zookeeper.console.threshold=INFO
zookeeper.log.dir=.
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=DEBUG
zookeeper.tracelog.dir=.
zookeeper.tracelog.file=zookeeper_trace.log

#
# ZooKeeper Logging Configuration
#

# Format is "<default threshold> (, <appender>)+

# DEFAULT: console appender only
log4j.rootLogger=${zookeeper.root.logger}

# Example with rolling log file
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE

# Example with rolling log file and tracing
#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE

#
# Log INFO level and above messages to the console
#
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n

#
# Add ROLLINGFILE to rootLogger to get log file output
#    Log DEBUG level and above messages to a log file
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold}
log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file}

# Max log file size of 10MB
log4j.appender.ROLLINGFILE.MaxFileSize=10MB
# uncomment the next line to limit number of backup files
log4j.appender.ROLLINGFILE.MaxBackupIndex=10

log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n


#
# Add TRACEFILE to rootLogger to get log file output
#    Log DEBUG level and above messages to a log file
log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
log4j.appender.TRACEFILE.Threshold=TRACE
log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file}

log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
### Notice we are including log4j's NDC here (%x)
log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n

zoo.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zookeeper-3.4.13/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
autopurge.purgeInterval=1

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

jaasLoginRenew=3600000

zookeeper.sasl.client=true

secrets 目录下创建三个文件consumer.properties producer.properties server_jaas.conf

consumer.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

producer.properties:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

server_jaas.conf

Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="123456";
};


Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="123456"
    user_super="123456"
    user_admin="123456";
};

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="123456"
    user_admin="123456";
};

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="123456";
};

producer生产者测试命令

docker-compose exec kafka1 kafka-console-producer.sh --topic test-topic --broker-list kafka1:9092,kafka2:9092 --producer.config /opt/kafka/secrets/producer.properties

consumer消费者测试命令

docker-compose exec kafka1 kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server kafka1:9092,kafka2:9092 --consumer.config /opt/kafka/secrets/consumer.properties

curl发送消息:

curl http://localhost:8080/test1 -d "==========hello1"

kafka消费端能够接收到数据

docker-compose exec kafka1 kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server kafka1:9092,kafka2:9092 --consumer.config /opt/kafka/secrets/consumer.properties

==========hello1