티스토리 뷰

이 글은 로그 분석을 위한 아키텍처 구현과 발생한 이슈에 대한 내용을 정리하였습니다

 

들어가기에 앞서 아키텍처 구성(AWS 서비스)은 데모 수준의 사양으로 구성되었습니다

운영을 위한 아키텍처가 필요한 경우 같은 구성에서 서버의 사양은 달리 설정해야 할 것 같습니다

 

 

진행 목록
1. Create MSK
2. Create Opensearch
3. Create EC2 for Web Application(API Server) and connect MSK
4. Create EC2 for Logstash and connect both MSK and Opensearch
5. Use Opensearch Dashboard

 

3. Create EC2 for Web Application(API Server) and connect MSK

 

3_1. API Application 배포를 위한 EC2 생성

AWS EC2 콘솔에서 인스턴스 실행 버튼 클릭

이름, OS(Amazon linux 2023), 인스턴스 유형(t2.medium), 키 페어(SSH 접근용) 생성 및 적용(생성시 ppk 파일로)

값 입력 및 설정 후 인스턴스 시작 버튼을 클릭하여 인스턴스를 생성합니다

키 페어 생성시 pem 파일로 받았어도 해당 파일로 ppk 파일 생성할 수 있습니다(구글 검색: pem to ppk)

 

생성한 인스턴스 클릭, 보안 탭 클릭, 보안 그룹 아래 sg- 로 시작하는 보안 그룹 ID 클릭(보안 그룹 ID 메모)

인바운드 규칙에서 우측 편집 버튼을 클릭하여 클라이언트(사용자)의 접근을 허용하기 위한 설정을 진행합니다

해당 보안 그룹은 테스트용이며 운영시 0.0.0.0/0 으로 모든 공용 IP에 대해 허용하시면 안됩니다

 

3_2. API EC2 에서 AWS MSK 에 접근하기 위한 트래픽 허용 설정

MSK 콘솔로 돌아가서 클러스터 정보의 속성 탭 - 네트워킹 설정 - 보안 그룹을 클릭합니다

인바운드 규칙에서 인바운드 규칙 편집 버튼을 클릭합니다

유형은 모든 트래픽, 소스는 사용자 지정, 옆에 돋보기 클릭하여 3_1 에서 메모한 보안 그룹 ID를 설정하고 저장합니다

이 설정으로 API EC2 에서 MSK 에 트래픽 접근이 허용됩니다

 

3_3. Kafka 접근 테스트 및 토픽 생성

API 애플리케이션 EC2 에 SSH 연결합니다

브라우저에서 EC2 인스턴스 클릭 후 상단 우측에 연결 버튼을 클릭하여 연결하거나

SSH 연결프로그램을 이용하여 퍼블릭 IPv4 주소 + ppk 파일을 사용하여 연결합니다

JDK 17, Kafka 클라이언트 설치

# JDK 설치
sudo dnf update
sudo dnf install java-17-amazon-corretto

# Kafka 클라이언트 설치, kafka 버전은 AWS MSK 클러스터 요약에서 확인 가능합니다
wget https://archive.apache.org/dist/kafka/{YOUR MSK VERSION}/kafka_2.13-{YOUR MSK VERSION}.tgz
tar -xzf kafka_2.13-{YOUR MSK VERSION}.tgz

AWS MSK 에 접근하기 위한 config 파일 생성

Kafka 압축 해제한 폴더의 bin 폴더 안에 client.properties(파일명은 마음대로) 파일을 생성하고 아래와 같이 설정합니다

sudo vi client.properties

# 설정 내용
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="AWS SM 에서 설정한 username 값" password="AWS SM 에서 설정한 password 값";

토픽 생성 명령어로 정상적으로 연결되었는지 확인하고 토픽도 생성합니다(토픽명 메모)

(압축해제한 카프카 폴더)/bin/kafka-topics.sh --create --bootstrap-server (AWS MSK 클라이언트 프라이빗 엔드포인트 중 1개) --command-config client.properties(위에서 생성한 구성 파일) --replication-factor 3 --partitions 1 --topic (원하는 토픽명)

조금 기다리시면 정상적으로 연결된 경우 Created topic 설정한 토픽명 이 터미널에 출력됩니다

이로써 해당 EC2 인스턴스에서 MSK 접근 허용이 확인되었습니다

 

3_4. API 애플리케이션에서 Kafka 접근을 위한 설정들(Producer)

해당 EC2 인스턴스에서 MSK 접근은 확인했고 실제 사용은 API 애플리케이션에서 호출할 테니 앱에서 Kafka 를

호출하기 위한 설정을 합니다

 

테스트시 진행한 API 애플리케이션 개발 스펙은 JDK 17, Springboot 3.1.4 버전을 사용했습니다

Kafka 의존성 추가

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>kafka</artifactId>
    <version>2.20.162</version>
</dependency>

Kafka Producer 구성 클래스(코드 설명은 안에 주석으로)

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String url;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

	// 아래 설정값으로 AWS MSK 에 대한 접근 권한을 얻고 요청이 가능해집니다
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // url은 AWS MSK 클라이언트 정보의 프라이빗 엔드포인트 입니다
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, url);
        // 데이터 직렬화
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // AWS MSK 클러스터 생성 후 변경했던 인증 방식을 적용하는 구간
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
        // AWS SM 에서 생성한 정보를 username, password 에 적용하면 됩니다
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"AWS SM 에서 설정한 username 값\" password=\"AWS SM 에서 설정한 password 값\";");
        return props;
    }
}

Kafka Producer 요청 클래스

topicName 필드 값을 위 Kafka 클라이언트 설치 후 생성했던 토픽명으로 설정하시면 됩니다

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class LogController {
    private static final Logger logger = LoggerFactory.getLogger(LogController.class);
    private final KafkaTemplate<String, String> kafkaTemplate;
    public LogController(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @Value("${spring.kafka.topic-name}")
    private String topicName;

    @GetMapping("/log/{message}")
    public String createLog(@PathVariable String message) {
        logger.info(message);
        kafkaTemplate.send(topicName, message);
        return "created";
    }
}

 

3_5. 배포

API 애플리케이션 배포는 maven 으로 패키징한 jar 파일을 배포하였고

주의할점은 port 를 3_1 에서 허용한 port 로 동일하게 설정하여 배포하였습니다

 

3_6. 포스트맨으로 토픽에 메시지 전달 및 확인

반드시 포스트맨이 아니여도 되며 API 애플리케이션에 요청할 수 있으면 됩니다

EC2 인스턴스의 퍼블릭 IPv4 + 배포한 port + /log/{message} 로 요청을 전달한 후 정상 응답(200)을 확인했다면

EC2 에 설치한 Kafka 클라이언트 폴더로 가서 메시지가 있는지 명령어를 호출하여 확인해 봅니다

(압축해제한 카프카 폴더)/bin/kafka-console-consumer.sh --bootstrap-server (AWS MSK 클라이언트 프라이빗 엔드포인트 중 1개) --consumer.config client.properties(위에서 생성한 구성 파일) --topic (위에서 생성한 토픽명) --from-beginning

정상적으로 처리되었다면 message 값이 그대로 출력됩니다

 

여기까지 정상적으로 진행되었다면 특정 사용자가 웹 애플리케이션에 어떠한 요청을 하였고 해당 요청을 진행 중

생성된 로그를 카프카에 전달하는 단계까지 완료되었습니다

다음 글에서는 Kafka 에서 가지고 있는 로그 데이터를 Logstash(AWS EC2) 에서 가지고 오는 작업을 진행하겠습니다