kafka 2.8.2버전입니다.

배경은 이러합니다.

 

1. kafka cluster는 내부망에 위치합니다.

2. kafka 서버에 데이터를 전송하는 client는 외부망에 위치합니다.

3. 정책 상 외부에서 내부로 데이터를 전송할 시, ssl 인증이 필수였습니다.

4. 이에 kafka cluster 내부망끼리의 통신은 디폴틀값인 PLAINTEXT, 외부 client에서 데이터를 받을 때는 SSL을 사용하기로 하였습니다.

5. kafka 서버단에서 포트를 2개 열어 각 인증체계를 부여하였습니다.

6. PLAINTEXT -> 9092포트, SSL -> 9093포트입니다.

 

vi ${KAFKA_HOME}/config/server.properties

listeners=PLAINTEXT://:9092, SSL://:9093

# 디폴트값
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

advertised.listeners=PLAINTEXT://[각 KAFKA 브로커 호스트명]:9092, SSL://[각 KAFKA 브로커 호스트명]:9093

# 추가
ssl.keystore.location=[keystore.jks 위치]
ssl.keystore.password=[keystore 비밀번호]
ssl.key.password=[key 비밀번호]
ssl.truststore.location=[truststore.jks 위치]
ssl.truststore.password=[truststore 비밀번호]
# 클라이언트 단 인증 방식
ssl.client.auth=required
# Kafka 서버 브로커들끼리 통신할 방법
security.inter.broker.protocol=PLAINTEXT
# 빈 값으로 추가
ssl.endpoint.identification.algorithm=

# 수정 후 kafka 서버 재실행필요

 

이후 클라이언트 설정은 아래 링크를 참조해주시기 바랍니다.

https://developer-woong.tistory.com/79

 

kafka) kafka SSL설정

kafka 2.8.2버전 기준이며, keystore, truststore jks파일이 존재함을 가정합니다. 서버단 설정: vi ${KAFKA_HOME}/config/server.properties listeners=SSL://:9092 # 디폴트값 listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL

developer-woong.tistory.com

 

 

 

 

 

 

반응형

kafka 2.8.2버전 기준이며, keystore, truststore jks파일이 존재함을 가정합니다.

 

서버단 설정:

vi ${KAFKA_HOME}/config/server.properties

listeners=SSL://:9092

# 디폴트값
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

advertised.listeners=SSL://[각 KAFKA 호스트명]:9093

# 추가
ssl.keystore.location=[keystore.jks 위치]
ssl.keystore.password=[keystore 비밀번호]
ssl.key.password=[key 비밀번호]
ssl.truststore.location=[truststore.jks 위치]
ssl.truststore.password=[truststore 비밀번호]
ssl.client.auth=required
# Kafka 서버 브로커들끼리 통신할 방법
security.inter.broker.protocol=SSL
# 빈 값으로 추가
ssl.endpoint.identification.algorithm=

# 수정 후 kafka 서버 재실행필요

 

 

클라이언트단 설정:

# 파일 생성
vi ${KAKFA_HOME}/config/client-ssl-auth.properties

security.protocol=SSL
ssl.truststore.location=[truststore.jks 위치]
ssl.truststore.password=[truststore 비밀번호]
ssl.keystore.location=[keystore.jks 위치]
ssl.keystore.password=[keystore 비밀번호]
ssl.key.password=[key 비밀번호]

# 토픽조회 확인
${KAKFA_HOME}/bin/kafka-topics.sh --list --bootstrap-server\
	# 브로커리스트
	mast01:9093,mast02:9093,mast03:9093\
    --command-config config/client-ssl-auth.properties
    
# 프로듀서실행
${KAKFA_HOME}/bin/kafka-console-producer.sh\
	# 브로커리스트
	--bootstrap-server mast01:9093,mast02:9093,mast03:9093\
    --topic [TOPIC_NAME]\
    --producer.config config/client-ssl-auth.properties

 

kafka 커스텀 프로듀서단:

# 제 상황에서는 개발한 kafka producer의 설정들을 따로 밖의 파일(config.properties)에 지정하고,
# 실행 시 인자로 넘기는 형식입니다. 파일을 넘기는 형식이 아니라면, produver 소스 내 설정해주어야합니다.
vi config.properties
security.protocol=SSL
ssl.truststore.location=[truststore.jks 위치]
ssl.truststore.password=[truststore 비밀번호]
ssl.keystore.location=[keystore.jks 위치]
ssl.keystore.password=[keystore 비밀번호]
ssl.key.password=[key 비밀번호]

 

kafka connetor단:

# 커넥터 실행 시 connect-standalone.properties파일을 바라봅니다.
vi config/connect-standalone.properties
bootstrap.servers=[브로커리스트]
security.protocol=SSL
ssl.truststore.location=[truststore.jks 위치]
ssl.truststore.password=[truststore 비밀번호]
ssl.keystore.location=[keystore.jks 위치]
ssl.keystore.password=[keystore 비밀번호]
ssl.key.password=[key 비밀번호]

producer.bootstrap.servers=[브로커리스트]
producer.security.protocol=SSL
producer.ssl.truststore.location=[truststore.jks 위치]
producer.ssl.truststore.password=[truststore 비밀번호]
producer.ssl.keystore.location=[keystore.jks 위치]
producer.ssl.keystore.password=[keystore 비밀번호]
producer.ssl.key.password=[key 비밀번호]

 

반응형

Hive에서 테이블 컬럼명을 변경하는 쿼리입니다.

 

# 컬럼명변경
alter table [TABLE_NAME] change [OLD_COLUMN] [NEW_COLUMN] [NEW_COLUMN_TYPE]

# 컬럼명 변경과 동시에 특정 컬럼의 뒤에 위치
alter table [TABLE_NAME] change [OLD_COLUMN] [NEW_COLUMN] [NEW_COLUMN_TYPE] after [기존 특정컬럼]
반응형

spark shell을 통해 테스트 코드 작성 중,

hive 테이블을 조회한 데이터프레임이 df.show()를 할 때마다 값이 바뀌는 현상이었습니다.

한 번 로드한 데이터를 메모리 상에 상주시키기 위해

df.cache()함수를 사용하였습니다.

 

예제코드:

from pyspark.sql import SparkSession

spark = SparkSession.builder\
	.config("hive.exec.dynamic.partition.mode", "nonstrict")\
    .config("partitionoverwritemode", "dynamic")\
    .appName("test")\
    .enableHiveSupport().getOrCreate()
    
df = spark.sql("select * from test.sample limit 10")

df.cache()
df.show()

 

 

반응형

scala를 통해 이미지의 저장경로가 담긴 배열의 값을 읽어,

바이트스트림에 저장할때, 나타난 에러입니다.

 

배경은 파일이 저장된 경로들을 담은 배열을 반복하여

파일을 가져와 zip파일에 하나씩 추가하고 압축하는 일이었습니다.

 

반복문 내 소스를 추가하여 해결하였습니다.

예제소스:

// zip파일에 담길 데이터 스트림 선언
val outputStream = new ByteArrayOutputStream()
val zipOutputStream = new ZipArchiveOutputStream(outputStream)
 
// ftp서버의 파일을 읽어 outputStream에 저장
file_list.foreach { row =>
	val file_full_path = row(0).toString
    // 각 경로의 끝은 파일의 이름이었습니다.
	val file_name = file_full_path.split('/').last
 
	// 압축 파일 내 각 파일들의 이름입니다.
	val entry = new ZipArchiveEntry(file_name)
	zipOutputStream.putArchiveEntry(entry)
 
	val fileStream = ftpClient.retrieveFileStream(file_full_path)
	val buffer = new Array[Byte](1024)
	var bytesRead = fileStream.read(buffer)
 
	// 파일의 모든 내용을 읽을 때 까지 반복
	while (bytesRead != -1) {
		zipOutputStream.write(buffer, 0, bytesRead)
		bytesRead = fileStream.read(buffer)
        }
        // 이 구문 넣어줘야 연속해서 파일들 읽을 때 널포인터 에러가 나지 않았습니다.
	if (fileStream!=null) {
		fileStream.close()
		}
	ftpClient.completePendingCommand()
	zipOutputStream.closeArchiveEntry()
      }
 
zipOutputStream.finish()
zipOutputStream.close()

ftpClient.logout()
ftpClient.disconnect()
반응형

IntelliJ 통해 개발 중 변수명에 IntelliJ에서 약속된 단어가 있을 시 나타난 오류였습니다. 아래 방법으로 해결하였습니다.

 

File -> Settings -> Editor -> Inspections -> Proofreading -> Typo 체크 해제 -> OK

반응형

scala로 spark 개발 중에 나타난 에러입니다.

Caused by: java.io.NotSerializableException [클래스명]

 

소스상에 클래스명 뒤에 extends를 추가하여 해결하였습니다.

ex) class sparkSubmit extends Serializable

반응형

scala 에서 json 파싱 시 

{"sizeMapDefined":false,"traversableAgain":true,"empty":false} 값이 나오는 현상이었습니다.

 

다른 json 모듈을 사용하여 해결하였습니다.

아래는 예제 소스입니다.

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

val fileMap = new mutable.HashMap[String, String]
fileMap.put("file_name", "testfile")
fileMap.put("time", "2023-05-12")
fileMap.put("path", "/USER/TEST/)
fileMap.put("id", "1")
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val value= mapper.writeValueAsString(fileMap)
반응형

kafka 2.8.2버전 기준입니다.

 

# 보존 기간을 하루로 설정합니다.
${KAFKA_HOME}/bin/kafka-configs.sh --bootstrap-server ${BOOTSTRAP-SERVERS} --alter --entity-type topics --entity-name ${TOPIC} --add-config retention.ms=86400000

 

토픽별로 위 명령어로 삭제주기를 설정해두어도,  KAFKA 서버 내 server.properties의 내용을 한 번 더 확인하여야합니다.

 

vi server.properties

log.retention.check.interval.ms = 1000

 

위 명령어처럼 토픽에 대해 삭제주기를 1일로 설정해두어도 log.retention.check.interval.ms의 값이 1일보다 큰값이면

예상대로 동작하지않습니다. 해당 컨피그는 메시지가 삭제 대상인지를 확인하는 주기입니다.

 

 

반응형

kafka topic 내 메시지들의 삭제 주기를 설정 중 나타난 에러입니다.

Exception in thread "main" joptsimple.UnrecognizedOptionException: after is not a recognized option

 

실행명령어를 수정하였습니다.

기존: 

${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper mast01:2181,mast02:2181,mast03:2181 --alter --topic test-topic --config retention.ms=86400000

수정: 

${KAFKA_HOME}/bin/kafka-configs.sh --bootstrap-server mast01:9092,mast02: 9092 ,mast03: 9092 --alter --entity-type topics --entity-name test-topic --add-config retention.ms=86400000

 

반응형

'BigData > Kafka' 카테고리의 다른 글

kafka) kafka PLAINTEXT, SSL 둘 다 사용하기  (0) 2023.12.28
kafka) kafka SSL설정  (0) 2023.12.28
kafka) kafka topic 메시지 보존 기간 설정  (0) 2023.12.19
Kafka topic producer consumer 테스트  (0) 2023.07.25
Apache Kafka 설치  (0) 2023.07.25

+ Recent posts