📌 서버를 확장한다면 synchronized를 사용하면 안 될까?
이 전에 단일 서버 환경에서 synchronized를 활용하여 동시성 문제해결에 접근해 보았다.
만약 현재 상태에서 서버를 확장한다면 어떻게 될까?
아래와 같이 하나의 프로세스 내에서는 synchronized로 인해서 1개의 스레드만 redis에 접근할 수 있다.
그러나 여러 프로세스 상에서는 동시에 redis에 접근할 수 있기 때문에 redis와 관련된 연산을 원자적으로 처리할 수 있도록 해줘야 한다.
만약 아래와 같이 redis가 아니라 MySQL과 같은 Database로 바로 접근하는 경우라면 synchronized와 MySQL의 테이블락을 활용하여 시스템을 구성해 볼 수 있을 것 같다.
이런 경우에는 각 서버에서 synchronized를 활용하여 DB에 접근하는 스레드 수를 제한하여 DB 부하를 줄일 수 있을 것이라고 생각한다.
현재 프로젝트는 redis를 적용한 상태이고 redis환경에서 동시성 문제를 처리하는 방법들을 학습해보고자 한다.
redis에서 동시성 문제가 발생하지 않도록 하기 위해서 크게 아래의 2가지 방법을 고려하였다.
1. 원자적 연산 INCR, DECR 활용하기
2. redis의 락기법 활용하기
2가지 방법의 사용법, 그리고 작동방식, 동시성 문제가 실제로 해결되는가 에 중점을 두고 진행해 볼 예정이다.
📌 INCR, DECR을 활용하여 원자적 연산하기
아래와 같이 increase( = incr), decrease( = decr)를 활용하여 재고 증가/감소 로직을 작성하였다.
public void addCase0(final Long productId, final Stock productStock) {
try {
Long incrementResult = redisStockRepository.increase(productId, productStock.getStockCount());
log.info("재고 증가 후 : " + incrementResult);
} catch (Exception e) {
throw new IllegalArgumentException("redis 장애 발생");
}
}
public void subtractCase0(final Long productId, final Stock productStock) {
try {
if (redisStockRepository.decrease(productId, productStock.getStockCount()) < 0) {
Long restoreResult = redisStockRepository.increase(productId, productStock.getStockCount());
log.info("복구 후 : " + restoreResult);
throw new GlobalException(HttpStatus.CONFLICT, "[ERROR] 재고 수량이 부족 합니다.");
}
} catch (GlobalException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException("redis 장애 발생");
}
}
실제로 동시성 문제가 해결되는지 테스트해 보기 위하여 아래와 같이 테스트 코드를 작성하였고, 통과되는 것을 확인하였다.
@DisplayName("INCR, DECR를 통한 재고 동시성 테스트 : 재고 10000개에 대하여 동시에 5000개 감소, 5000개 증가 요청시 남은 수량이 10000개이다.")
@Test
public void Case0_원자적연산_동시성_문제_해결_테스트_1() throws InterruptedException {
// given
int threadCount = 10000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
// when
for (int i = 0; i < threadCount; i++) {
if (i % 2 == 0) {
executorService.submit(() -> {
try {
stockService.subtractCase0(1L, new Stock(1L, 1));
} finally {
latch.countDown();
}
});
} else {
executorService.submit(() -> {
try {
stockService.addCase0(
1L, new Stock(1L, 1));
} finally {
latch.countDown();
}
});
}
}
latch.await();
Integer stockCount = redisStockRepository.getValue(1L);
// then
assertThat(stockCount).isEqualTo(10000);
}
@DisplayName("INCR, DECR를 통한 재고 동시성 테스트 : 재고 10000개에 대하여 동시에 20000개 감소 시 남은수량이 0개이다.")
@Test
public void Case0_원자적연산_동시성_문제_해결_테스트_2() throws InterruptedException {
// given
int threadCount = 20000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
// when
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
stockService.subtractCase0(1L, new Stock(1L, 1));
} finally {
latch.countDown();
}
});
}
latch.await();
Integer stockCount = redisStockRepository.getValue(1L);
// then
assertThat(stockCount).isEqualTo(0);
}
아래는 멀티쓰레드 방식을 활용하여 재고 300개에 대하여 1000개의 시나리오 요청을 하도록 하는 코드이다.
시나리오는 1. 상품재고 조회 -> 2. 주문 -> 2-1. 주문취소(20%) -> 3. 결제완료 -> 3-1. 결제실패(20%)의 순으로 한 명의 사용자에게 발생할 수 있는 시나리오이다.
import time
import requests
import random
import threading
from concurrent.futures import ThreadPoolExecutor
# 한명이 수행하는 시나리오를 순서대로 작성한다.(결제 프로세스)
def sinario(user_id):
# 예약 상품 남은 수량 조회 (완료) -> GET 예약상품 재고 수 조회 v1/reservation-products/{reservation_id}/stock
product_id = "43"
url = "http://localhost:8083/v1/stocks/products/" + product_id
try:
response = requests.get(url)
print(f"재고 수량 - {response.json()['data']['stockCount']}", end=" ")
print(f"사용자 : {user_id}")
# print(f"Request to {url} completed with status code {response.json()['data']['stockCount']}")
except Exception as e:
print(f"Error sending request to {url}: {e}")
return
# 결제화면 들어가기 버튼 클릭 (완료) -> POST 주문 생성 요청 : v1/orders
url = "http://localhost:8083/v1/orders"
response = None
try:
body = {
"productId" : 43,
"productType" : "reservationProduct",
"quantity" : 1,
"memberId" : user_id,
"address" : "서울"
}
response = requests.post(url, json=body)
print(f"주문(결제화면들어가기) {response.json()['desc']}, 누가? {user_id}")
# print(f"Request to {url} completed with status code {response.json()['desc']}")
except Exception as e:
# print("error")
print(f"Error sending request to {url}: {e}")
return
# # 20%는 고객 변심 이탈 (완료) -> DELETE 주문 취소 요청 : v1/orders
random_number = random.randint(0, 99)
order_id = response.json()['data']
if random_number < 20:
url = "http://localhost:8083/v1/orders/" + str(order_id)
try:
response = requests.delete(url)
print(f"고객 변심 이탈 : {response.json()['desc']}")
# print(f"Request to {url} completed with status code {response.json()['desc']}")
except Exception as e:
print("error")
# print(f"Error sending request to {url}: {e}")
print("고객 변심 이탈 재고 + 1")
return
# 결제 버튼 클릭(결제 요청 API) (완료) -> POST 결제하기 요청 : v1/payments
url = "http://localhost:8083/v1/payments"
try:
body = {
"orderId" : order_id,
"memberId" : user_id
}
response = requests.post(url, json=body)
print(f"결제 {response.json()['desc']}")
# print(f"Request to {url} completed with status code {response.json()['desc']}")
except Exception as e:
print("error")
# print(f"Error sending request to {url}: {e}")
return
def main():
# 1만명의 사람들이 결제 시나오리를 수행한다.
num_requests = 1000
with ThreadPoolExecutor(max_workers=100) as executor:
tasks = [executor.submit(sinario, user_id) for user_id in range(1, num_requests + 1)]
for future in tasks:
future.result()
if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
execution_time = end_time - start_time
print("코드 실행 시간:", execution_time, "초")
위의 시나리오 요청 후 Database의 주문, 결제 결과를 살펴보면 아래와 같이 300개가 성공적으로 수행된 것을 확인할 수 있다.
(동시성 문제 해결법을 적용하기 전에는 재고수량은 300개였지만 300개 초과의 내역이 주문, 결제 처리 되어 있었다.)
📌 분산락 활용해 보기
다름은 redis의 락킹메커니즘을 활용하여 동시성 문제에 접근해 보자.
spring boot환경에서 redis를 사용하기 위해 크게 2가지 라이브러리가 존재하였다.
- Lettuce
- setnx 명령어를 활용하여 분산락을 구현한다.
- spin lock 방식사용(계속해서 락이 풀렸는지 물어보는 방식 - 성능에 문제가 있다)
- 락이 해제되지 않고 애플리케이션이 종료되어 버리는 문제 발생 가능성 존재.
- Redisson
- pub-sub 기반으로 Lock을 구현한다.
- 타임아웃 기능으로 인해 락이 해제되지 않는 문제가 발생하지 않는다.
- Lua script를 활용하여 redis에 보내는 요청수를 줄임으로써 Lettuce에 비해 성능을 높였다.
위의 차이점으로 인해서 Lettuce보다는 Redisson라이브러리를 사용하는 것이 좋을 것이라고 판단하였다.
아래는 redis를 통해 락을 걸고 동시성해결 확인을 위하여 GET, SET 방식으로 재고 감소/증가 로직을 구현하였다.
public void addCase1(final Long productId, final Stock productStock) {
// 분산락
RLock lock = redissonClient.getLock(productId.toString());
try {
boolean available = lock.tryLock(10, 1, TimeUnit.SECONDS);
if(!available) {
log.info("락 획득 실패");
return;
}
int count = redisStockRepository.getValue(productId);
redisStockRepository.setKey(productId, new Stock(productId, productStock.getStockCount() + count));
log.info("재고 증가");
} catch (InterruptedException e) {
throw new IllegalArgumentException("redis 장애 발생");
} catch (GlobalException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException("redis 장애 발생");
} finally {
lock.unlock();
}
}
public void subtractCase1(final Long productId, final Stock productStock) {
RLock lock = redissonClient.getLock(productId.toString());
try {
boolean available = lock.tryLock(10, 1, TimeUnit.SECONDS);
if(!available) {
log.info("락 획득 실패");
return;
}
int count = redisStockRepository.getValue(productId);
if (count < productStock.getStockCount()) {
throw new GlobalException(HttpStatus.CONFLICT, "[ERROR] 재고 수량이 부족 합니다.");
}
redisStockRepository.setKey(productId, new Stock(productId, count - productStock.getStockCount()));
} catch (InterruptedException e) {
throw new IllegalArgumentException("redis 장애 발생");
} catch (GlobalException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException("redis 장애 발생");
} finally {
lock.unlock();
}
}
위처럼 redis에 저장하지 않고, redis를 통해 락을 건 후 테이블에 직접 변경 사항을 저장하는 방식을 사용할 수 도 있다.
실제로 동시성 문제가 해결되는지 테스트해 보기 위하여 아래와 같이 테스트 코드를 작성하였고, 통과되는 것을 확인하였다.
@DisplayName("분산락을 통한 재고 동시성 테스트 : 재고 10000개에 대하여 동시에 5000개 감소, 5000개 증가 요청시 남은 수량이 10000개이다.")
@Test
public void Case1_분산락을_이용한_동시성_문제_해결_테스트_1() throws InterruptedException {
// given
int threadCount = 10000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
// when
for (int i = 0; i < threadCount; i++) {
if (i % 2 == 0) {
executorService.submit(() -> {
try {
stockService.subtractCase1(1L, new Stock(1L, 1));
} finally {
latch.countDown();
}
});
} else {
executorService.submit(() -> {
try {
stockService.addCase1(
1L, new Stock(1L, 1));
} finally {
latch.countDown();
}
});
}
}
latch.await();
Integer stockCount = redisStockRepository.getValue(1L);
// then
assertThat(stockCount).isEqualTo(10000);
}
@DisplayName("분산락을 통한 재고 동시성 테스트 : 재고 10000개에 대하여 동시에 20000개 감소 시 남은수량이 0개이다.")
@Test
public void Case1_분산락_이용한_동시성_문제_해결_테스트_2() throws InterruptedException {
// given
int threadCount = 20000;
ExecutorService executorService = Executors.newFixedThreadPool(32);
CountDownLatch latch = new CountDownLatch(threadCount);
// when
for (int i = 0; i < threadCount; i++) {
executorService.submit(() -> {
try {
stockService.subtractCase1(1L, new Stock(1L, 1));
} finally {
latch.countDown();
}
});
}
latch.await();
Integer stockCount = redisStockRepository.getValue(1L);
// then
assertThat(stockCount).isEqualTo(0);
}
이 전과 마찬가지로 300개 재고수량에 대해 1000개의 시나리오 요청을 보냈을 때 300개만 주문, 결제 처리됨을 확인하였다.(동시성 문제가 발생하지 않음)
📌 마무리
INCR, DECR의 원자적 연산을 통해서 동시성 문제를 해결할 수 있다면 곧바로 사용하면 될 것 같다.
redis락이라고 하더라도 결국은 임계영역에 대해 락을 거는 메커니즘이고 병목현상이 발생할 수 있기 때문에 사용하지 않을 수 있다면 사용하지 않는 것이 좋을 것이라고 생각한다.
추가적으로 원자적 연산을 구현하기 위해 script를 통해서 직접 작성해 주는 방법도 있다고 한다. Reddison라이브러리가 내부적으로 Lua script를 활용하여 원자적 연산을 사용한다고 한다.
추후에 redis에서 제공해 주는 연산 혹은 락을 통해 해결할 수 없는 문제가 발생한다면 script를 활용하는 방식도 고려해 보면 좋을 것 같다.(물론 Lua script 도 연산이 엄청 길어지지 않도록 짧게 작성해야 한다는 주의점이..!)
'프로젝트 > 예약상품' 카테고리의 다른 글
[회고] 예약 상품 프로젝트 (0) | 2024.03.04 |
---|---|
[프로젝트] 회복탄력성(CircuitBreaker와 Retry) (0) | 2024.03.02 |
[프로젝트] 상품 오픈시간에 같은 사용자의 여러 요청을 대비하자 (0) | 2024.02.26 |
[프로젝트] 재고정보를 판매오픈전 캐싱할 때 고려사항 (0) | 2024.02.26 |
[프로젝트] synchronized 영역 최소화하기 (0) | 2024.02.24 |