프로젝트/CStar

[프로젝트] Redis 를 Queue로 활용했을때 성능 & 동시성 체크해보자

Don't stop 훈 2024. 9. 4. 07:18

🎯 문제점

현재 사용자의 선착순 정답 데이터를 Queue에 적재하고, 게임 엔진 서비스에서 사용자 정답 데이터를 꺼내서 정답인지 판단하고 응답해주고 있다.

 

정답 데이터 양식은 아래와 같다.

class AnswerResult(
    val answer: String,
    val quizId: Long,
    val roomId: Long,
    val playerId: Long,
    val nickname: String,
)

 

 

컬렉션 프레임워크의 Queue 자료구조를 사용했다면 동시성 패키지에서 제공해주는 스레드 안전한 자료구조를 사용했을 것이다.

하지만 Redis의 List 자료구조를 사용하기 때문에 동시성 문제가 발생하는지, 시간복잡도 등을 체크해보면 좋을 것 같다.

 

https://redis.io/docs/latest/develop/data-types/lists/

 

Redis lists

Introduction to Redis lists

redis.io

 

🎯 시간복잡도

공식 문서를 확인해보면 시간복잡도가 O(1)로 적절하다고 생각될 수 있다.

만약 데이터 N개를 동시에 삽입하는 경우라면 O(N)이지만 우리 로직은 1개씩 삽입하므로 효율적이라고 생각된다.

Redis 공식문서(List 자료구조)

 

 

 

🎯 동시성 문제

그렇다면 동시성 문제에서는 안전한가??

현재 Thread-1(StompServie)에서 사용자 정답데이터를 실시간에서 전송받고 Redis 자료구조에 RPUSH 하고있다.

그리고 Thread-2(GameEngineService)에서 Redis  Queue 자료구조에서 LPOP 하고있는데, 이러한 과정에서 데이터 누락은 없는지 동시성 발생 가능성을 체크해보려고 한다.

 

일반적으로 Redis는 단일 스레드로 동작하여 원자성을 보장하지만, 애플리케이션 로직에서 어떻게 활용하는가에 따라서 동시성 문제가 발생할 수 있기 때문에, 테스트를 해보면서 체크해보려고 한다.


<테스트 코드 확인하기>

1. 총 2개의 스레드중, 1개는 100개를 연속적으로 RPUSH, 1개는 100개를 연속적으로 LPOP 하여 100개중 누락된 데이터는 없는지 확인

2. 100개의 스레드를 통해 동시에 List 자료구조에 RPUSH 연산을 수행 -> 100개의 데이터가 List 자료구조에 존재함을 확인

더보기
@Test
fun `레디스 사용자 퀴즈 정답 큐 동시성 테스트`() {
    // given
    val numberOfThreads = 2
    val startLatch = CountDownLatch(1)
    val doneLatch = CountDownLatch(numberOfThreads)
    val executor = Executors.newFixedThreadPool(numberOfThreads)
    val receive = Collections.synchronizedList(mutableListOf<AnswerResult>())

    val answerResult = AnswerResult(
        answer = "정답",
        quizId = QUIZ_ID,
        roomId = ROOM_ID,
        playerId = 1,
        nickname = "nickname"
    )

    // when
    executor.submit {
        try {
            for (idx in 1..100) {
                gameAnswerQueueService.add(answerResult)
            }
        } catch (e: Exception) {
            println("Error: ${e.message}")
        } finally {
            doneLatch.countDown()
        }
    }

    executor.submit {
        try {
            for (idx in 1..100) {
                receive.add(gameAnswerQueueService.poll(ROOM_ID, QUIZ_ID));
            }
        } catch (e: Exception) {
            println("Error: ${e.message}")
        } finally {
            doneLatch.countDown()
        }
    }

    startLatch.countDown() // 모든 스레드 동시에 시작

    doneLatch.await() // 모든 스레드 종료 대기

    executor.shutdown()

    // then
    val pollSize = receive.filter { it != null }
        .size
    val remainPushSize = redisQueueRepository.getSize(KEY) ?: 0

    assertEquals(100, pollSize + remainPushSize)
}

 

@Test
fun `레디스 List 삽입 동시성 테스트`() {
    // given
    val numberOfThreads = 100
    val startLatch = CountDownLatch(1)
    val doneLatch = CountDownLatch(numberOfThreads)
    val executor = Executors.newFixedThreadPool(numberOfThreads)

    val answerResult = AnswerResult(
        answer = "정답",
        quizId = QUIZ_ID,
        roomId = ROOM_ID,
        playerId = 1,
        nickname = "nickname"
    )

    // when
    // 데이터 추가
    for (idx in 1..numberOfThreads) {
        executor.submit {
            try {
                gameAnswerQueueService.add(answerResult)
            } catch (e: Exception) {
                println("Error: ${e.message}")
            } finally {
                doneLatch.countDown()
            }
        }
    }

    startLatch.countDown() // 모든 스레드 동시에 시작

    doneLatch.await() // 모든 스레드 종료 대기

    executor.shutdown()

    // then
    val remainPushSize = redisQueueRepository.getSize(KEY) ?: 0

    assertEquals(100, remainPushSize)
}