MSA 구현하기(10) - 마이크로 서비스에도 WebFlux 적용하기 (2)
Skala 과정에서 마이크로 서비스 아키텍처 구조에 대해 새롭게 알게 되었습니다.
배운 것을 실제로 구현해보기 위해 이 여정을 시작하기로 했습니다.
처음 글부터 보러가기
해냈습니다! 지피티야 고마워 구글아 고마워
디비도 비동기식으로 동작하도록 R2DBC를 적용하고, 서비스, 컨트롤러 코드를 다 고치고 나니 JWT(그리고 Spring Security)와 커스텀 에러 코드도 WebFlux용 코드로 리팩토링 해주어야 했습니다…
천천히 진행해보겠습니다
이미 JDBC로 애플리케이션을 실행하여 테이블이 만들어진 환경에서 리팩토링을 진행했습니다
처음부터 R2DBC를 적용하시는 분들은 sql문을 따로 작성해주셔야 table이 생성됩니다.(Flyway 적용 시 자동으로 테이블이 생성됩니다)
DB 설정하기 (MariaDB R2DBC 드라이버 사용하기)
pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
.
.
.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>org.mariadb</groupId>
<artifactId>r2dbc-mariadb</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
</dependency>
.
.
.
r2dbcd용 의존성을 추가합니다!
MariaDB보다 MongoDB나 PostgreSQL을 쓰는 것이 최적화(?)에 더 좋을 것 같지만 실제 서비스는 아니고 공부용으로 테스트해보는 서버라
DB는 그대로 사용하고 MariaDB에서 지원하는 r2dbc 드라이버를 사용하겠습니다..!
추후에 Flyway 적용을 위해 JDBC 드라이버도 남겨줍니다.
기존에 JDBC로 연결을 해두셨다면 이 의존성 코드는 이미 있을 것이기 때문에 추가하지 않으셔도 괜찮습니다.
application.yml
1
2
3
4
5
6
7
8
9
10
.
.
.
spring:
sql:
init:
mode: always
.
.
.
Flyway 적용을 위한 코드입니다. 현재는 작성하지 않아도 괜찮습니다.
application.properties
1
2
3
4
5
6
.
.
.
spring.r2dbc.url=r2dbc:mariadb://localhost:3308/user?allowPublicKeyRetrieval=true&useSSL=false
spring.r2dbc.username={db 유저}
spring.r2dbc.password={db 비밀번호}
?allowPublicKeyRetrieval=true&useSSL=false 설정을 해주지 않으니까 DB에 연결을 할 수 없어서 추가해줬습니다!
DB에 접근하는 방법을 설정하는 구문입니다.
jdbc 설정을 같이 해주셨다면 spring.datasource.url 의 url 끝에도 해당 구문을 추가합니다.
User.java (Entity)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.springframework.data.annotation.Id; // 매우 중요한
import org.springframework.data.relational.core.mapping.Table; // 부분입니다..!!
@Table
@Getter
@AllArgsConstructor
@NoArgsConstructor
public class User {
@Id
private Long id;
@Enumerated(EnumType.STRING)
private UserRole userRole;
private String username;
private String password;
private String email;
private String phone;
.
.
.
}
R2DBC를 사용할 경우 @Entity 가 아니라 @Table 을 써야합니다.
사용법은 기존과 같이 테이블명과 클래스명이 다르다면 @Table(name="테이블 명") 으로 작성해야하면 됩니다.
공식 문서 보러가기
UserRepository.java (Repository)
1
2
3
4
5
import org.springframework.data.r2dbc.repository.R2dbcRepository;
public interface UserRepository extends R2dbcRepository<User, Long> {
Mono<User> findByEmail(String email);
}
레포지토리도 기존 JpaRepsitory를 상속받던 것을 R2dbcRepsitory를 상속받도록 만들어줍니다.
repository에 선언할 수 있는 함수 자체는 Jpa와 다르지 않습니다.
여기서는 바로 Mono나 Flux를 반환해주기 때문에 List나 Optional로 반환받지 않도록 주의해주세요!
실제 기능 수행 코드 수정하기 (Servcie, Controller 코드 수정)
UserService.java (Service)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Slf4j
@Service
@RequiredArgsConstructor
public class UserService {
private final UserRepository userRepository;
private final PasswordEncoder passwordEncoder;
private final KafkaProducer kafkaProducer;
public Flux<UserInfoResponse> getAll() {
return userRepository.findAll()
.map(this::getUserInfo);
}
public Mono<UserInfoResponse> getUser(Long userId) {
return userRepository.findById(userId)
.switchIfEmpty(Mono.error(new UserNotFoundException()))
.map(this::getUserInfo);
}
public Mono<UserInfoResponse> register(RegisterUserRequest request) {
User user = new User(
request.getUsername(),
passwordEncoder.encode(request.getPassword()),
request.getEmail(),
request.getPhone()
);
return userRepository.save(user)
.map(this::getUserInfo);
}
public Mono<UserInfoResponse> updateUser(Long userId, UserUpdateRequest request) {
return transactionalOperator.transactional(
userRepository.findById(userId)
.switchIfEmpty(Mono.error(new UserNotFoundException()))
.flatMap(user -> {
user.update(
request.getUsername(),
request.getEmail(),
request.getPhone()
);
return userRepository.save(user);
})
.map(this::getUserInfo)
);
}
private UserInfoResponse getUserInfo(User user) {
return new UserInfoResponse(
user.getId(),
user.getUsername(),
user.getEmail(),
user.getPhone()
);
}
}
User 객체를 UserInfoResponse DTO 객체로 변환해주는 getUserInfo() 함수를 선언했습니다.
Repository를 통해 Mono 혹은 Flux로 반환된 값을 조회, 수정하는 코드 그리고 저장하는 코드까지 거의 모든 예시가 담겨있습니다.
에러가 발생하였을 때 정상적으로 예외처리가 동작하도록 하기 위해 switchIdEmpty() 를 사용했습니다.
유저의 정보를 수정하는 함수에 @Transactional 어노테이션이 있었는데,
webflux에서 정상적으로 동작하도록 만들기 위해 transactianlOperator를 사용했습니다.
다음 글에서는 주문 기능에서 동시성 제어 문제에 대해서 다뤄보겠습니다!
Service 코드에서 Mono와 Flux로 이미 다 반환을 해줬기 때문에 이전 글에서 설정해뒀던 코드를 수정해야합니다.
UserController.java (Controller)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
.
.
.
/** [관리자]
* 모든 사용자 정보 가져오기
* @param userId 로그인 된 사용자 Id
* @param role 로그인 된 사용자 Role
* @return 모든 사용자 정보 리스트
*/
@GetMapping("/list")
public Flux<UserInfoResponse> getAllUsers(
@RequestHeader("x-user-id") Long userId,
@RequestHeader("x-user-role") UserRole role
) {
log.info("user id({}) accessed to find all users", userId);
RoleCheck.isAdmin(role);
return userService.getAll();
}
.
.
.
이전에 Mono.fromSupplier() 혹은 Flux.defer() 등으로 반환하던 코드들이 간소화되었습니다.
Custom Error 코드 WebFlux 방식으로 수정하기
CustomException.java
1
2
3
4
5
6
7
8
9
.
.
.
public static CustomException of(Throwable ex) {
return new UnknownException(ex);
}
.
.
.
기존 코드에서 Exception 을 인자로 받던 of() 함수를 Throwable 을 받도록 수정합니다.
ErrorHandlerFilter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Component
@RequiredArgsConstructor
@Slf4j
public class ErrorHandleFilter implements WebExceptionHandler {
private final ObjectMapper objectMapper;
private final MessageSource messageSource;
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().set(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8");
Locale locale = exchange.getLocaleContext().getLocale();
ErrorResponse dto;
int statusCode;
if (ex instanceof CustomException customEx) {
dto = ErrorResponse.of(customEx, messageSource, locale);
statusCode = customEx.getStatus().value();
log.error("A problem has occurred in filter: [id={}]", dto.getTrackingId(), customEx);
} else {
dto = ErrorResponse.of(CustomException.of(ex), messageSource, locale);
statusCode = HttpStatus.INTERNAL_SERVER_ERROR.value();
log.error("Unexpected exception has occurred in filter: [id={}]", dto.getTrackingId(), ex);
}
response.setStatusCode(HttpStatus.valueOf(statusCode));
try {
byte[] bytes = objectMapper.writeValueAsBytes(dto);
DataBuffer buffer = response.bufferFactory().wrap(bytes);
return response.writeWith(Mono.just(buffer));
} catch (Exception e) {
log.error("Error writing error response", e);
return response.setComplete(); // fallback
}
}
}
이 클래스는 거의 모든 코드가 바뀌었습니다,,
기존 MVC 방식으로 동작하던 코드가 WebFlux 방식으로 정상적으로 동작할 수 있도록 바꿔줬습니다.
레퍼런스
Jwt 코드 WebFlux 방식으로 수정하기
JwtAuthenticationFilter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
@RequiredArgsConstructor
public class JwtAuthenticationFilter implements WebFilter {
private final JwtProvider jwtProvider;
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String token = jwtProvider.getAccessTokenFromHeader(request);
if (token != null) {
Authentication authentication = jwtProvider.getAuthentication(token);
return chain.filter(exchange)
.contextWrite(ReactiveSecurityContextHolder.withAuthentication(authentication));
}
return chain.filter(exchange);
}
}
OncePerRequestFilter 대신 WebFilter 인터페이스를 구현하도록 하고,
WebFlux 방식대로 동작하도록 작성해주었습니다.
JwtAuthEntryPoint.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class JwtAuthEntryPoint implements ServerAuthenticationEntryPoint {
@Override
public Mono<Void> commence(ServerWebExchange exchange, AuthenticationException ex) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.UNAUTHORIZED);
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
String body = "{\"message\": \"" + ex.getMessage() + "\"}";
DataBuffer buffer = response.bufferFactory().wrap(body.getBytes(StandardCharsets.UTF_8));
return response.writeWith(Mono.just(buffer));
}
}
AuthenticationEntryPoint 대신 ServerAuthenticationEntryPoint 인터페이스를 구현하고 WebFlux 방식으로 정상적으로 동작할 수 있도록 만들어주었습니다.
레퍼런스
JwtProvider.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
.
.
.
public String getAccessTokenFromHeader(ServerHttpRequest request) {
// 1. 쿠키에서 access-token 확인
MultiValueMap<String, HttpCookie> cookies = request.getCookies();
HttpCookie cookie = cookies.getFirst("access-token");
if (cookie != null) {
return cookie.getValue();
}
// 2. Authorization 헤더에서 Bearer 토큰 추출
String header = request.getHeaders().getFirst(HttpHeaders.AUTHORIZATION);
if (header != null) {
if (!header.toLowerCase().startsWith(BEARER)) {
log.info("Invalid header format: {}", header);
throw new RuntimeException("Invalid Authorization header");
}
return header.substring(7); // "Bearer " 제거
}
log.info("Authorization header is missing!");
return null;
}
.
.
.
HttpServletRequest 대신 ServerHttpRequest 를 인자로 받도록 하고,
쿠키 관련 코드 및 에러 코드가 업데이트 되었습니다. (추후 커스텀 에러로 바꿀 예정입니다,,까먹었어요)
SecurityConfig.java (Jwt를 적용할 Spring Security 설정 수정)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Configuration
@EnableWebFluxSecurity
@EnableMethodSecurity(securedEnabled = true)
@RequiredArgsConstructor
public class SecurityConfig {
private final ObjectMapper objectMapper;
private final JwtAuthenticationFilter jwtAuthenticationFilter;
@Bean
public MessageSource messageSource() {
ReloadableResourceBundleMessageSource messageSource = new ReloadableResourceBundleMessageSource();
messageSource.setBasename("messages");
messageSource.setDefaultEncoding("UTF-8");
messageSource.setCacheSeconds(60);
messageSource.setDefaultLocale(Locale.KOREA);
messageSource.setUseCodeAsDefaultMessage(true);
return messageSource;
}
@Bean(name = "messageSourceAccessor")
public MessageSourceAccessor messageSourceAccessor(MessageSource messageSource) {
return new MessageSourceAccessor(messageSource, Locale.getDefault());
}
@Bean
public JwtAuthEntryPoint jwtAuthEntryPoint() {
return new JwtAuthEntryPoint();
}
@Bean
public PasswordEncoder passwordEncoder() {
return new BCryptPasswordEncoder();
}
@Bean
public ErrorHandleFilter errorHandleFilter() {
return new ErrorHandleFilter(objectMapper, messageSource());
}
@Bean
public CorsWebFilter corsWebFilter() {
CorsConfiguration config = new CorsConfiguration();
config.setAllowCredentials(true);
config.addAllowedOrigin("*");
config.addAllowedHeader("*");
config.addAllowedMethod("*");
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
source.registerCorsConfiguration("/**", config);
return new CorsWebFilter(source);
}
@Bean
public SecurityWebFilterChain filterChain(ServerHttpSecurity http) throws Exception {
return http
.cors(httpSecurityCorsConfigurer -> corsWebFilter())
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.formLogin(ServerHttpSecurity.FormLoginSpec::disable)
.httpBasic(ServerHttpSecurity.HttpBasicSpec::disable)
.authorizeExchange(exchange -> exchange
.pathMatchers("/**").permitAll()
.anyExchange().authenticated()
)
.addFilterAt(jwtAuthenticationFilter, SecurityWebFiltersOrder.AUTHENTICATION)
.build();
}
}
이 코드도 거의 전반적으로 수정되었는데
@EnableWebSecurity 대신 @EnableWebFluxSecurity 를 통해 WebFlux 기반 Security 설정을 할 것을 알립니다.
webflux 기반으로 동작하는 Spring Security는 생각보다 자동으로 찾아주는 기능이 많아서 오히려 삭제한 빈이 있습니다. (다중 언어 적용 관련 빈은 거의 모두 삭제했습니다)
Cors 에러 관련한 함수도 사용하는 객체가 달라
CorsConfigurationSource 에서 CorsWebFilter 를 반환하도록 바꾸었습니다.
해당 설정 시 UrlBasedCorsConfigurationSource 의 import 경로에 reactive 가 포함되어 있는지 유의해주세요..!
가장 중요한 filterChain 함수가 SecurityWebFilterChain 을 반환하고 ServerHttpSecurity 를 인자로 받도록 설정합니다.
기존에는 빈으로 등록했던 JwtAuthenticationFilter 를 여기서 등록하는 방식으로 바꿔주었습니다.
나머지 다른 기능들도 WebFlux 방식에 맞게 ServerHttpSecurity 를 통해 설정해주면
Security 구성도 끝입니다!
마치며
생각보다 쉬웠고, 생각보다 복잡했던 WebFlux 적용기였습니다,,
챗지피티와 함께라면,,새로운 기술,,,어렵지 않을지도,,,,,,
이게 왜 되지? 라는 생각이 들지 않도록 더 열심히 공부해야겠습니다..!
다음에는 동시성에 대해 다뤄보겠습니다
🐜💦
참고 자료