pyspark structured streaming를 활용하여 kafka데이터를 HA 구성 및 실시간으로 가져올 때의 중요사항들입니다.

 

1. 특정 kafka topic(test-topic, partition 2)을 같은 작업을 진행하는 spark application 2개가 바라봅니다.(HA를 위함)

2. group.id는 같고, checkpoint location은 달라야합니다.

3. 1번 spark app은 test-topic의 0번 파티션, 1번 spark app은 test-topic의 1번 파티션을 바라봅니다.

4. 평소에는 하나의 app이 하나의 파티션을 바라보다가, 다른 하나의 app이 다운되면 다운된 app이 바라봐야할 파티션도 실행중인 app이 함께 바라봅니다.

5. 추후 다운된 app이 재기동되면서 kafka 리밸런싱이 일어나 다시 각 app당 하나의 파티션을 맡아 읽습니다.

 

위 과정을 검증하다가 나타난 에러입니다.

pyspark.sql.utils.StreamingQueryException: Set(test-topic) are gone. Kafka option 'kafka.group.id' has been set on this query, it is
 not recommended to set this option. This option is unsafe to use since multiple concurrent
 queries or sources using the same group id will interfere with each other as they are part
 of the same consumer group. Restarted queries may also suffer interference from the
 previous run having the same group id. The user should have only one query per group id,
 and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka
 consumers from the previous query are marked dead by the Kafka group coordinator before the
 restarted query starts running.
    
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you don't want your streaming query to fail on such cases, set the
 source option "failOnDataLoss" to "false".

 

StreamingQueryException:
Cannot find earliest offsets of Set(topic-name-0)

 

과정은 이러합니다.

1. 두 개의 spark app이 test-topic의 0,1번 파티션을 각각 바라봅니다.

2. 1번 app을 다운시킵니다.

3. 2번 app이 두 개의 파티션을 모두 읽는 것을 확인하였습니다.

4. 다시 1번 app을 구동하였습니다.

5. 리밸런싱이 일어나 다시 각 app당 하나의 파티션을 읽는 것이 목적이었으나, 기존에 실행되고있던 2번 app이 다운되었습니다.

 

해결방안

1. 위 에러들은 offset이 chekpoint location내 내용과 다르거나, 다운되었다가 재기동된 app이 가져가야할 kafka topic의 정보를 이미 다른 app이 가지고있어 가져오지못할 때, 나타나는 에러입니다.

2. spark streaming 소스상에 readStream option중 startingOffset이 earliest였습니다. 이를 재기동할때 이전에 읽은 데이터까지는 기억하기 위해 lastest로 변경 후 기동하였습니다.

3. readStream option에 readstream .option("failOnDataLoss", "false")를 추가하였습니다. 해당 옵션은 토픽이 삭제되거나, offset의 범위를 벗어날 때 쿼리를 실패할지의 여부이며, 공식문서에 잘못된 경보일 수 있으니, 예상 동작이 되지않으면 비활성화를 할 수 있음을 나타냅니다. 리밸런싱 과정이 의도된 상황이기에 해당 값을 False로 두었습니다. 

 

정리

1. 2개의 park structured streaming app(.py)으로 하나의 kafka 토픽을 파티션별로 읽을 수 있고, 하나의 app이 다운되어도 다른 기동중인 app이 다른 파티션까지 읽을 수 있습니다.(kafka 리밸런싱)

2. 이 때, 소스상에서 두 app은 같은 group.id를 가져야하며, 서로 다른 checkpoint location을 가져야합니다.

3. 다운되었다가 재기동 할 시의 app의 소스는 startingOffset이 earliest가 아닌 lastest임을 권장합니다.

4. 의도된 리밸런싱이기에 failOnDataLoss를 false로 지정합니다.

 

예제소스

 
spark.readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "kafka broker_list") \
            .option("subscribe", "kafka topic") \
            .option("startingOffsets", "earliest") \ -> 재기동시에는 lastest
            .option("kafka.group.id", "kafka group_id") \
            .option("failOnDataLoss", "false") \ -> 리밸런싱 기간 대기
            .option("maxOffsetsPerTrigger", "20 ") \
            .load()

query = spark \
            .writeStream \
            .outputMode("append") \
            .foreachBatch(lambda df, batch_id : microBatchProcessor(df, batch_id, preproces_session)) \
            .option("checkpointLocation", "/user/test/checkpoint/app_01/") \ -> 각 app마다 다르게설정(hdfs)
            .start()

 

 

 

 

 

 

반응형

python에서 PIL모듈을 import할 시, 모듈 설치 명령어입니다.

 

-> pip install image, pip install pillow

반응형

'Language > Python' 카테고리의 다른 글

ERROR: No matching distribution found for cv2  (0) 2023.12.18
OSError: cannot write mode RGBA as JPEG  (0) 2023.12.18

spark structred streaming이 .py형식으로 ha구성을 위해 2개 띄워져있고,

같은 kafka topic을 바라보는 상황에서 checkpoint location이 모두 같은 hdfs경로를 바라볼 때의 에러였습니다.

 

pyspark.sql.utils.StreamingQueryException: assertion failed: Concurrent update to the commit log. Multiple streaming jobs detected for 0

 

-> checkpoint location을 다르게 두어 해결하였습니다.

ex) /usr/test/app_01, /usr/test/app_02

반응형

python 에서 import cv시 에러입니다.

 

pip install cv

가 아닌

pip install opencv-python

해당 명령어입니다.

반응형

python 으로 이미지 저장 시, 발생한 에러입니다.

 

# 변경 전

imgDecodingData.save("C:/Users/Desktop/test.jpg")

# 변경 후

imgDecodingData.convert("RGB").save("C:/Users/Desktop/test.jpg")

 

반응형

kafka spark structured streaming 개발 중 아래 에러가 발생하였습니다.

java.lang.IllegalStateException: Set() are gone. Kafka option 'kafka.group.id' has been set on this query, it is
 not recommended to set this option. This option is unsafe to use since multiple concurrent
 queries or sources using the same group id will interfere with each other as they are part
 of the same consumer group. Restarted queries may also suffer interference from the
 previous run having the same group id. The user should have only one query per group id,
 and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka
 consumers from the previous query are marked dead by the Kafka group coordinator before the
 restarted query starts running.

 

환경은 특정 kafka topic에서 spark structured streaming(jar application)으로 데이터를 실시간으로 가지고오고있었고,

checkpoint location을 hdfs 경로에 저장하고 있었습니다.

 

위 에러는 저장하고있는 checkpoint location 내

offset과 commit폴더 내의 개수가 일치하지 않아 생긴 현상이었습니다.

 

checkpoint location을 지운 후 spark streaming을 재실행하여 해결하였습니다.

 

 

반응형

gradle 빌드환경에서 개발 시, 개발에 사용된 외부 라이브러리를 jar에 포함하여 패키지 하기위해,

추가 설정 값 기입이 필요합니다.

 

build.gradle 파일 내 아래 내용 추가

jar 내용 안에 추가 수정해주었습니다.

 

jar {
    manifest {
        attributes 'Main-Class': 'com.kafka.producer.Main'
    }
    from{
        configurations.compileClasspath.collect {it.isDirectory() ? it : zipTree(it)} --> 해당 내용을 추가합니다.
    }
    duplicatesStrategy = DuplicatesStrategy.EXCLUDE  --> 해당 내용을 추가합니다.
}
반응형

gradle 빌드 개발환경에서 빌드 시에 [패키지jar명]에 기본 Manifest 속성이 없습니다.에러 시, 해결법입니다.

 

build.gradle 파일 내 내용을 수정하였습니다.

 attributes 'Main-Class'  --> 해당 클래스명이 메인 클래스명과 달라 발생한 에러였습니다.

 

jar {
    manifest {
        attributes 'Main-Class': 'com.kafka.producer.ExampleProducer' --> 해당 부분입니다.
    }
}
반응형

'Build > Gradle' 카테고리의 다른 글

gradle 외부 라이브러리 포함 빌드  (0) 2023.12.18
gradle 라이브러리 추가  (0) 2023.12.18
gradle 한글깨짐 현상해결  (0) 2023.12.18

gradle 빌드환경에서 개발 시, 필요 라이브러리를 추가하는 방법입니다.

 

build.gradle 파일 내 아래 내용 추가

> dependencies 내 목록을 추가합니다.

 

dependencies {
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0'
    implementation 'org.apache.kafka:kafka-clients:2.8.2'
    implementation 'org.slf4j:slf4j-simple:1.7.30'
    implementation 'org.slf4j:slf4j-api:1.7.30'
    implementation 'org.apache.commons:commons-vfs2:2.8.0'
}
반응형

gradle 빌드 환경에서 한글 깨짐현상이 일어났 때 해결방법입니다.

 

build.gradle 파일 내 아래 내용 추가

tasks.withType(JavaCompile){
    options.encoding = "UTF-8"
}



반응형

+ Recent posts