본문 바로가기
Coding/Java Spring

Spring AWS SNS + SQS로 이벤트 처리하는 방법

by Hide­ 2022. 1. 18.
반응형

개요

요즘 MSA, CQRS 등 아키텍처에 관한 흥미로운 주제들이 많다. 이러한 아키텍처적인 관점에서 항상 등장하는 부분이 바로 Event driven을 통한 loose coupling이다. CQRS를 예를 들어서 설명해보자. 데이터를 저장하는 명령 요청이 들어오면 쓰기DB에 데이터를 적재하고 읽기DB와의 데이터 동기화를 위해 메시지를 발행한다. 그리고 읽기DB에서는 메시지를 수신하여 데이터를 동기화시킨다.

만약 쓰기DB쪽 서버에서 직접 읽기DB 서버에 데이터 저장을 요청하는 형태라고 생각해보자. 이러한 경우 읽기DB 서버에 문제가 생긴다면 해당 문제는 쓰기DB 서버까지 전파된다. 따라서 쓰기DB는 단순히 이벤트를 발행하고 필요한 곳에서 해당 이벤트를 가져와서 사용하는 형태로 구성하여 각 서버간 의존성을 분리하는 것이라고 보면 된다.

RabbitMQ, Kafka등 여러가지 선택지가 있겠지만 본 포스팅에서는 메시지의 개수가 적을 경우 비용적인 부담도 없고 아주 간단하게 구성해볼 수 있는 AWS의 SNS + SQS를 스프링으로 사용해보는 내용에 대해 기술한다.

AWS SNS & SQS 설정

먼저 AWS콘솔에서 SNS를 검색하면 Simple Notification Service라는 메뉴가 나온다. 클릭하면 위처럼 메인 화면으로 이동하며 주제를 생성할 수 있다. 원하는 이름을 적고 다음 단계로 넘어간다.

선입선출과 표준 방식 둘 중 하나를 고를 수 있다. 선입선출의 경우 먼저 들어온 순서대로 메시지가 발행되므로 순서를 보장할 수 있다는 장점이 있다. 하지만 표준 방식보다 처리량이 낮다. 또한 표준에서는 SQS뿐만 아니라 다양한 서비스도 지원하는 반면 선입선출 방식은 SQS만 지원한다.

Event driven architecture를 설계하다보면 메시지의 순서 보장을 어떠한 형태로 처리할 지에 대한 고민이 반드시 필요하다. 단순히 위에 나와있는 FIFO 기능을 사용하면 될까? 물론 매니지드 서비스에서 제공해주는 기능이기에 개발자가 순서에 대한 신경을 쓰지않아도 된다는 장점은 분명히 존재할 것이지만 그에 따른 처리량 저하는 대규모 트래픽을 받아내야하는 서비스에 치명적인 단점이 될 것이다. 그렇다면 메시지의 순서를 어떠한 방식으로 보장해줘야 할까?

여러가지 방법이 있겠지만 그 중 하나로 제로 페이로드(Zero payload) 방식을 사용할 수 있다. 제로 페이로드란 말그대로 메시지의 내용에 변경된 내역을 적는 등의 행위를 하지 않고 단순히 id값만 넘겨주는 방식이다. 예를 들어 RDB의 경우 PK값이 될 수도 있겠고, DDD라면 어그리거트 루트의 id가 될 수도 있겠다. 좀 더 쉽게 이야기하자면, 식별할 수 있는 특정한 값을 넣어주는 것이다.

예를 들어 RDB의 PK값을 넣어준다고 가정해보자. 유저가 글을 작성하면 article테이블의 PK값을 {"article_id": 1} 형태로 메시지에 담아 발행한다. 그러면 수신자측에서는 해당 값을 통해 article 테이블의 데이터를 가지고있는 서버에 REST 요청을 날려 최신 데이터를 받아와서 동기화한다. 이렇게 된다면 메시지의 순서가 뒤바껴도 항상 최신의 데이터를 동기화할 수 있는 장점이 존재한다. 물론 네트워크 콜이 발생하기 때문에 어느정도의 비용은 존재하겠지만 순서보장을 위해 신경쓸 여러가지 사항들을 생각해보면 그리 큰 비용은 아니라고 생각한다.

자세한 내용은 우아콘2020 김영한 개발자님이 발표하신 "마이크로서비스 여행기"를 참고한다. (https://www.youtube.com/watch?v=BnS6343GTkY)

암호화, 액세스 정책 등 기타 사항들은 본 포스팅의 주제와 어긋나므로 생략한다. 주제 생성 버튼을 통해 SNS 토픽을 생성해주자.

다음으로 AWS콘솔로 돌아와서 SQS를 검색한다. 그리고 Simple Queue Service라는 메뉴를 클릭하고 대기열 생성 버튼을 클릭한다. 그럼 위와 같은 화면을 볼 수 있는데, 마찬가지로 선입선출의 방식을 택할 지 표준을 택할 지 고를 수 있다. 아래쪽에 이름도 적어준다. 그리고 아래 세부 설정들은 마찬가지로 스킵한다. 오른쪽 아래에 있는 대기열 생성 버튼을 통해 SQS를 생성해준다.

Spring 의존성 설치

implementation 'org.springframework.cloud:spring-cloud-starter-aws-messaging:2.2.6.RELEASE'

build.gradle에 위 라인을 추가해준다. 버전같은 경우 기존에 사용하던 spring cloud와 같은 버전을 사용해줬다.

aws.yml 생성

cloud:
  aws:
    credentials:
      access-key: accesskey
      secret-key: secretkey
    region:
      static: ap-northeast-2
    stack:
      auto: false
    sns:
      arns:
        create-article: arn

aws 관련 설정값들을 저장해줄 aws.yml 파일을 하나 생성한다. 그리고 sns - arns - create-article 쪽에 우리가 생성한 SNS의 arn을 채워줘야 한다. 참고로 create-article은 원하는걸로 변경해서 사용하면 된다.

arn의 경우 생성한 SNS를 클릭하면 위처럼 확인할 수 있다.

AwsConfig 생성

@Configuration
@Slf4j
public class AwsConfig {
    @Value("${cloud.aws.credentials.access-key}")
    private String accessKey;

    @Value("${cloud.aws.credentials.secret-key}")
    private String secretKey;

    @Value("${cloud.aws.region.static}")
    private String region;

    private BasicAWSCredentials getCredentials() {
        return new BasicAWSCredentials(accessKey, secretKey);
    }

    @Bean
    public AmazonSNS amazonSNS() {
        log.info("Initializing AWS SNS");
        BasicAWSCredentials credentials = getCredentials();
        return AmazonSNSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(credentials))
                .withRegion(region)
                .build();
    }
}

AmazonSNSClientBuilder를 통해 인증정보와 리전을 설정해준다.

AwsSnsClient 생성

@Component
@RequiredArgsConstructor
public class AwsSnsClient {
    private final AmazonSNS amazonSNS;

    public void publish(String topicArn, String message) {
        amazonSNS.publish(topicArn, message);
    }
}

SNS에 메시지를 발행할 클라이언트를 하나 생성해준다. AwsConfig에서 빈으로 등록한 객체를 가져와서 단순히 해당 객체의 publish() 메소드를 통해 메시지를 발행하는 코드이다. 참고로 이렇게 클라이언트 클래스를 따로 만들어줄 필요는 없고 각자 유즈케이스에 맞게 바로 서비스에서 가져와서 사용해도 된다.

테스트

@PostMapping("/pub")
public void pub() {
    awsSnsClient.publish(createArticleArn, "test");
}

메시지를 SNS로 발행하기 위한 엔드포인트를 컨트롤러에 하나 생성한다.

@Component
@Slf4j
public class TestListener {
    @SqsListener(value = "ArticleCreate", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
    public void listen(String value, Acknowledgment ack) {
        log.info(value);
        ack.acknowledge();
    }
}

다음으로 메시지를 수신할 리스너 클래스를 하나 생성한다. @SqsListener 어노테이션을 통해 손쉽게 메시지를 땡겨올 수 있다. value에는 SQS의 이름을 적어주고 deletionPolicy에는 메시지의 정책을 통해 수신할 메시지의 처리 정책을 정해줄 수 있다.

- ALWAYS: 메시지를 수신하는 경우 무조건 삭제한다.

- NEVER: acknowledge()를 통해 삭제 요청을 보내고 해당 요청이 없다면 절대 삭제하지 않는다.

- NO_REDRIVE: Redrive 정책이 설정되어있지 않다면 삭제한다.

- ON_SUCCESS: @SqsListener 어노테이션이 붙은 메소드의 수행이 성공한다면 삭제한다.

위와 같이 설정하고 생성한 엔드포인트로 API콜을 요청한다면 정상적으로 리스너를 통해 메시지를 출력하는 모습을 확인할 수 있다.