pyspark structured streaming를 활용하여 kafka데이터를 HA 구성 및 실시간으로 가져올 때의 중요사항들입니다.
1. 특정 kafka topic(test-topic, partition 2)을 같은 작업을 진행하는 spark application 2개가 바라봅니다.(HA를 위함)
2. group.id는 같고, checkpoint location은 달라야합니다.
3. 1번 spark app은 test-topic의 0번 파티션, 1번 spark app은 test-topic의 1번 파티션을 바라봅니다.
4. 평소에는 하나의 app이 하나의 파티션을 바라보다가, 다른 하나의 app이 다운되면 다운된 app이 바라봐야할 파티션도 실행중인 app이 함께 바라봅니다.
5. 추후 다운된 app이 재기동되면서 kafka 리밸런싱이 일어나 다시 각 app당 하나의 파티션을 맡아 읽습니다.
위 과정을 검증하다가 나타난 에러입니다.
pyspark.sql.utils.StreamingQueryException: Set(test-topic) are gone. Kafka option 'kafka.group.id' has been set on this query, it is
not recommended to set this option. This option is unsafe to use since multiple concurrent
queries or sources using the same group id will interfere with each other as they are part
of the same consumer group. Restarted queries may also suffer interference from the
previous run having the same group id. The user should have only one query per group id,
and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka
consumers from the previous query are marked dead by the Kafka group coordinator before the
restarted query starts running.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
StreamingQueryException:
Cannot find earliest offsets of Set(topic-name-0)
과정은 이러합니다.
1. 두 개의 spark app이 test-topic의 0,1번 파티션을 각각 바라봅니다.
2. 1번 app을 다운시킵니다.
3. 2번 app이 두 개의 파티션을 모두 읽는 것을 확인하였습니다.
4. 다시 1번 app을 구동하였습니다.
5. 리밸런싱이 일어나 다시 각 app당 하나의 파티션을 읽는 것이 목적이었으나, 기존에 실행되고있던 2번 app이 다운되었습니다.
해결방안
1. 위 에러들은 offset이 chekpoint location내 내용과 다르거나, 다운되었다가 재기동된 app이 가져가야할 kafka topic의 정보를 이미 다른 app이 가지고있어 가져오지못할 때, 나타나는 에러입니다.
2. spark streaming 소스상에 readStream option중 startingOffset이 earliest였습니다. 이를 재기동할때 이전에 읽은 데이터까지는 기억하기 위해 lastest로 변경 후 기동하였습니다.
3. readStream option에 readstream .option("failOnDataLoss", "false")를 추가하였습니다. 해당 옵션은 토픽이 삭제되거나, offset의 범위를 벗어날 때 쿼리를 실패할지의 여부이며, 공식문서에 잘못된 경보일 수 있으니, 예상 동작이 되지않으면 비활성화를 할 수 있음을 나타냅니다. 리밸런싱 과정이 의도된 상황이기에 해당 값을 False로 두었습니다.
정리
1. 2개의 park structured streaming app(.py)으로 하나의 kafka 토픽을 파티션별로 읽을 수 있고, 하나의 app이 다운되어도 다른 기동중인 app이 다른 파티션까지 읽을 수 있습니다.(kafka 리밸런싱)
2. 이 때, 소스상에서 두 app은 같은 group.id를 가져야하며, 서로 다른 checkpoint location을 가져야합니다.
3. 다운되었다가 재기동 할 시의 app의 소스는 startingOffset이 earliest가 아닌 lastest임을 권장합니다.
4. 의도된 리밸런싱이기에 failOnDataLoss를 false로 지정합니다.
예제소스
spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka broker_list") \
.option("subscribe", "kafka topic") \
.option("startingOffsets", "earliest") \ -> 재기동시에는 lastest
.option("kafka.group.id", "kafka group_id") \
.option("failOnDataLoss", "false") \ -> 리밸런싱 기간 대기
.option("maxOffsetsPerTrigger", "20 ") \
.load()
query = spark \
.writeStream \
.outputMode("append") \
.foreachBatch(lambda df, batch_id : microBatchProcessor(df, batch_id, preproces_session)) \
.option("checkpointLocation", "/user/test/checkpoint/app_01/") \ -> 각 app마다 다르게설정(hdfs)
.start()