Spark消费Apache-kafka代码示例

技术文档网 2021-04-29
Spark消费开源Kafka

当前从kafka 0.10.0版本以后对安全访问验证做了限制,支持SSL和SASL认证两种模式。所以为了测试对开源kafka的支持,我们需要重新部署kafka和zookeeper。

注意事项

kafka的server和源码中访问的kafka client的包必须一致,否则会出现不可预知的数据消费问题。

代码讲解

package demo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * spark 与 kafka 集成demo
  */
object SparkAndKafkaDemo {

  /**
    * Main 程序的主方法
    * @param args
    */
  def main(args: Array[String]): Unit = {
    /**定义程序运行的名称**/
    val appName = "sparkKafkaDemo";
    /**消费组**/
    val groupId = "test_group"
    /**kafka的地址**/
    val brokers = "tbds-xxxx:9092";
    /**初始化spark conf的信息,采用本地运行模式**/
    val sparkConf = new SparkConf().setAppName(appName);
    val sc = new StreamingContext(sparkConf,Seconds(5));
    //定义topic
    var topics = Array("test01").toSet;
    //定义kafka的参数
    val kafkaParams = collection.Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.serializer" -> classOf[org.apache.kafka.common.serialization.StringSerializer],
      "value.serializer" -> classOf[org.apache.kafka.common.serialization.StringSerializer],
      "key.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
      "value.deserializer" -> classOf[org.apache.kafka.common.serialization.StringDeserializer],
      "group.id" -> groupId,
        //消费的位置,从最初,还是最新开始消费
      "auto.offset.reset" -> "latest",
        //是否自动提交,如果不需要就手动提交,当发送或者消费成功以后。
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    //定义消费者,忽略offset的设置了哈
    var consumerStrategy = ConsumerStrategies.Subscribe[String,String](topics,kafkaParams);

    //通过kafaUtils来进行数据的读取
    val lines = KafkaUtils.createDirectStream(sc,LocationStrategies.PreferConsistent,consumerStrategy)

    //读取的数据进行转换成单词,根据空格进行分割,然后在统计技术
    lines.foreachRDD(r => {
     r.foreach(x =>{
       print("=================================================="+x.value())
     })
    })
    //通过sparkcontenxt进行程序的启动
    sc.start()
    //程序如果结束以后关闭,程序结束
    sc.awaitTermination()
  }
}

编译和打包

1571397526564

发布程序

登录服务器,然后切换到spark的bin目录,然后提交jar

1571397618304

发送测试数据

1571397643649

检查是否已经完成消费

1571397690506

查看yarn的日志,数据是否打印

1571397752192

代码

https://github.com/TBDSUDC/TBDSDemo/tree/master/%E4%BD%BF%E7%94%A8TBDS%20maven%E5%BC%80%E5%8F%91%E6%A1%88%E4%BE%8B/kafkademo

相关文章

  1. 基于-SLF4J-MDC-机制的日志链路追踪配置属性

    ums: # ================ 基于 SLF4J MDC 机制的日志链路追踪配置属性 ================ mdc: # 是否支持基于 SLF4J MDC

  2. ajax-跨域访问

    ajax 跨域访问 <!DOCTYPE html> <html xmlns:th="http://www.w3.org/1999/xhtml"> <head>

  3. 给第三方登录时用的数据库表-user_connection-与-auth_token-添加-redis-cache

    spring: # 设置缓存为 Redis cache: type: redis # redis redis: host: 192.168.88.88 port

  4. Java动态代理

    Jdk动态代理 通过InvocationHandler和Proxy针对实现了接口的类进行动态代理,即必须有相应的接口 应用 public class TestProxy { public

  5. Java读取classpath中的文件

    public void init() { try { //URL url = Thread.currentThread().getContextClassLo

随机推荐

  1. 基于-SLF4J-MDC-机制的日志链路追踪配置属性

    ums: # ================ 基于 SLF4J MDC 机制的日志链路追踪配置属性 ================ mdc: # 是否支持基于 SLF4J MDC

  2. ajax-跨域访问

    ajax 跨域访问 <!DOCTYPE html> <html xmlns:th="http://www.w3.org/1999/xhtml"> <head>

  3. 给第三方登录时用的数据库表-user_connection-与-auth_token-添加-redis-cache

    spring: # 设置缓存为 Redis cache: type: redis # redis redis: host: 192.168.88.88 port

  4. Java动态代理

    Jdk动态代理 通过InvocationHandler和Proxy针对实现了接口的类进行动态代理,即必须有相应的接口 应用 public class TestProxy { public

  5. Java读取classpath中的文件

    public void init() { try { //URL url = Thread.currentThread().getContextClassLo