+-
重新启动PySpark作业,无法获得在Pyspark消费者停机时插入Kafka Topic的记录。

我正在运行一个pyspark作业,数据流来自Kafka。我试图在我的windows系统中复制一个场景,以找出当消费者在数据不断被输入Kafka时发生的情况。

这是我所期望的。

生产者启动并产生消息1、2和3。 消费者在线并消费消息1、2和3。 现在消费者由于某种原因下线了,而生产者产生了消息4、5、6,以此类推......。 当消费者上线时,我的期望是它应该读到它离开的地方。所以消费者必须能够从消息4、5、6等处读取......。

我的pyspark应用程序无法实现我的期望。这里是我如何创建一个Spark Session。

session.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "clickapijson")
  .option("startingoffsets" , "latest") \
  .load()

我上网查了一下,收集了不少信息。看来groupID在这里是相关的。Kafka维护着特定groupID中每个消费者读取的偏移量的轨迹。如果一个消费者订阅了一个groupId的主题,比如说G1,kafka就会注册这个group和consumerID,并保持这个groupID和ConsumerID的跟踪。如果在任何情况下,消费者因为某些原因不得不下线,并以相同的groupID重新启动,那么kafka将拥有已经读取的偏移量信息,所以消费者将从它离开的地方读取数据。

当我在CLI中使用下面的命令来调用消费者的工作时,这种情况就会发生。

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic "clickapijson" --consumer-property group.id=test 

现在,当我的生产者产生1、2和3的信息时,消费者能够消费。在读完第3条信息后,我杀死了正在运行的消费者工作(CLI .bat文件)。我的生产者产生了消息4、5和6,以此类推......现在我把我的消费者工作(CLI .bat文件)带回来,它能够从它离开的地方(从消息4)读取数据。这是我所期望的行为。

我无法在pyspark中做同样的事情。

当我在Pyspark中加入 option("group.id" , "test")它抛出了一个错误,说Kafka选项 group.id 不支持,因为用户指定的消费群组不用于跟踪偏移。

观察控制台的输出,每次我的pyspark消费者作业被踢掉时,它都会创建一个新的groupID。如果我的 pyspark 作业之前运行了一个 groupID 并失败了,那么当它重新启动时,它就不会拾取相同的旧 groupID。它是随机得到一个新的groupID。Kafka有之前groupID的偏移信息,但没有当前新生成的groupID。因此,我的pyspark应用程序无法在Kafka宕机时读取输入的数据。

如果是这样的话,那么当消费者的工作因为某些故障而停止时,我的数据会不会丢失?

我怎样才能给我自己的Pyspark应用提供groupid,或者我怎样才能用同样的groupid重新启动我的Pyspark应用?

2
投票

在当前的Spark版本(2.4.5)中,不可能提供你自己的 group.id 因为它是由Spark自动创建的(正如你已经观察到的)。关于Spark从Kafka读取数据时的偏移管理的完整细节,请参考以下内容 此处 并总结如下。

注意,以下Kafka参数不能设置,Kafka源或sink将抛出异常。

group.id: Kafka源将为每个查询自动创建一个唯一的组id。

auto.offset.reset : Kafka源会自动为每个查询创建唯一的组id。: 设置源选项startOffsets来指定从哪里开始。结构化流管理内部消耗的偏移量。,而不是依靠kafka Consumer来完成。这将确保在动态订阅新的topicspartitions时不会遗漏数据。请注意,startOffsets只适用于新的流式查询开始时,而且恢复查询将总是从查询停止的地方开始。

enable.auto.commit: Kafka源没有提交任何偏移。

为了让Spark能够记住它从Kafka读取的位置,你需要启用检查点,并提供一个路径位置来存储检查点文件。在Python中,这将是这样的。

aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()

关于检查点的更多细节在Spark的文档中给出了 使用检查点从故障中恢复.