This comprehensive guide will walk you through building a reactive Spring Boot application using WebFlux and Reactive MongoDB.
Table of Contents
- Project Setup
- Reactive MongoDB Configuration
- Domain Model
- Repository Layer
- Service Layer
- Controller Layer
- Error Handling
- Validation
- Testing
- Advanced Features
Project Setup
Let's begin by creating our Spring Boot project with the necessary dependencies.
Step 1: Generate Project Structure
Using Spring Initializr (https://start.spring.io/), select:
- Project: Maven
- Language: Java
- Spring Boot: Latest stable version
- Dependencies:
- Spring Reactive Web (WebFlux)
- Reactive MongoDB
- Lombok (for reducing boilerplate code)
- Validation (for bean validation)
Alternatively, add these dependencies to your pom.xml
:
<dependencies>
<!-- Spring Boot Starter WebFlux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Reactive MongoDB -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Validation -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Reactive MongoDB Configuration
Configure MongoDB connection in application.properties
:
# MongoDB configuration
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=reactive_db
spring.data.mongodb.auto-index-creation=true
# Enable detailed logging for debugging (optional)
logging.level.org.springframework.data.mongodb=DEBUG
logging.level.reactor=DEBUG
For more advanced configuration, create a MongoConfig
class:
@Configuration
public class MongoConfig {
@Value("${spring.data.mongodb.host}")
private String host;
@Value("${spring.data.mongodb.port}")
private int port;
@Value("${spring.data.mongodb.database}")
private String database;
@Bean
public ReactiveMongoClient reactiveMongoClient() {
return MongoClients.create(String.format("mongodb://%s:%d", host, port));
}
@Bean
public ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory() {
return new SimpleReactiveMongoDatabaseFactory(reactiveMongoClient(), database);
}
@Bean
public ReactiveMongoTemplate reactiveMongoTemplate() {
return new ReactiveMongoTemplate(reactiveMongoDatabaseFactory());
}
}
Domain Model
Let's create a sample domain model for our application. We'll use a "Product" entity as an example.
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Document(collection = "products")
public class Product {
@Id
private String id;
@NotBlank(message = "Product name is required")
@Size(min = 2, max = 100, message = "Product name must be between 2 and 100 characters")
private String name;
@NotBlank(message = "Product description is required")
private String description;
@NotNull(message = "Product price is required")
@DecimalMin(value = "0.0", inclusive = false, message = "Price must be greater than 0")
private Double price;
@NotNull(message = "Product quantity is required")
@Min(value = 0, message = "Quantity cannot be negative")
private Integer quantity;
private List<String> tags = new ArrayList<>();
@CreatedDate
private LocalDateTime createdAt;
@LastModifiedDate
private LocalDateTime updatedAt;
}
Repository Layer
Create a reactive repository interface for our Product entity:
public interface ProductRepository extends ReactiveMongoRepository<Product, String> {
Flux<Product> findByNameContainingIgnoreCase(String name);
Flux<Product> findByPriceBetween(Double minPrice, Double maxPrice);
Flux<Product> findByTagsIn(List<String> tags);
@Query("{ 'quantity': { $lt: ?0 } }")
Flux<Product> findByLowQuantity(Integer threshold);
}
Service Layer
Implement the business logic in a reactive service:
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductService {
private final ProductRepository productRepository;
public Mono<Product> createProduct(Product product) {
return productRepository.save(product)
.doOnSuccess(p -> log.info("Created product with ID: {}", p.getId()));
}
public Flux<Product> getAllProducts() {
return productRepository.findAll()
.switchIfEmpty(Flux.error(new ResourceNotFoundException("No products found")));
}
public Mono<Product> getProductById(String id) {
return productRepository.findById(id)
.switchIfEmpty(Mono.error(new ResourceNotFoundException("Product not found with ID: " + id)));
}
public Mono<Product> updateProduct(String id, Product product) {
return productRepository.findById(id)
.flatMap(existingProduct -> {
existingProduct.setName(product.getName());
existingProduct.setDescription(product.getDescription());
existingProduct.setPrice(product.getPrice());
existingProduct.setQuantity(product.getQuantity());
existingProduct.setTags(product.getTags());
return productRepository.save(existingProduct);
})
.switchIfEmpty(Mono.error(new ResourceNotFoundException("Product not found with ID: " + id)));
}
public Mono<Void> deleteProduct(String id) {
return productRepository.findById(id)
.flatMap(productRepository::delete)
.switchIfEmpty(Mono.error(new ResourceNotFoundException("Product not found with ID: " + id)));
}
public Flux<Product> searchProducts(String name, Double minPrice, Double maxPrice) {
if (name != null && minPrice != null && maxPrice != null) {
return productRepository.findByNameContainingIgnoreCase(name)
.filter(p -> p.getPrice() >= minPrice && p.getPrice() <= maxPrice);
} else if (name != null) {
return productRepository.findByNameContainingIgnoreCase(name);
} else if (minPrice != null && maxPrice != null) {
return productRepository.findByPriceBetween(minPrice, maxPrice);
}
return productRepository.findAll();
}
}
Controller Layer
Create a reactive REST controller:
@RestController
@RequestMapping("/api/products")
@RequiredArgsConstructor
public class ProductController {
private final ProductService productService;
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<Product> createProduct(@Valid @RequestBody Product product) {
return productService.createProduct(product);
}
@GetMapping
public Flux<Product> getAllProducts() {
return productService.getAllProducts();
}
@GetMapping("/{id}")
public Mono<Product> getProductById(@PathVariable String id) {
return productService.getProductById(id);
}
@PutMapping("/{id}")
public Mono<Product> updateProduct(@PathVariable String id, @Valid @RequestBody Product product) {
return productService.updateProduct(id, product);
}
@DeleteMapping("/{id}")
@ResponseStatus(HttpStatus.NO_CONTENT)
public Mono<Void> deleteProduct(@PathVariable String id) {
return productService.deleteProduct(id);
}
@GetMapping("/search")
public Flux<Product> searchProducts(
@RequestParam(required = false) String name,
@RequestParam(required = false) Double minPrice,
@RequestParam(required = false) Double maxPrice) {
return productService.searchProducts(name, minPrice, maxPrice);
}
}
Error Handling
Create a global error handler for reactive streams:
@RestControllerAdvice
public class GlobalErrorHandler {
@ExceptionHandler(ResourceNotFoundException.class)
@ResponseStatus(HttpStatus.NOT_FOUND)
public Mono<ErrorResponse> handleResourceNotFound(ResourceNotFoundException ex) {
return Mono.just(new ErrorResponse(HttpStatus.NOT_FOUND.value(), ex.getMessage()));
}
@ExceptionHandler(MethodArgumentNotValidException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
public Mono<ErrorResponse> handleValidationErrors(MethodArgumentNotValidException ex) {
return Mono.just(new ErrorResponse(
HttpStatus.BAD_REQUEST.value(),
"Validation error",
ex.getBindingResult().getAllErrors().stream()
.map(DefaultMessageSourceResolvable::getDefaultMessage)
.collect(Collectors.toList())
));
}
@ExceptionHandler(WebExchangeBindException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
public Mono<ErrorResponse> handleWebExchangeBindException(WebExchangeBindException ex) {
return Mono.just(new ErrorResponse(
HttpStatus.BAD_REQUEST.value(),
"Request validation failed",
ex.getBindingResult().getAllErrors().stream()
.map(DefaultMessageSourceResolvable::getDefaultMessage)
.collect(Collectors.toList())
));
}
@ExceptionHandler(Exception.class)
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Mono<ErrorResponse> handleAllExceptions(Exception ex) {
return Mono.just(new ErrorResponse(
HttpStatus.INTERNAL_SERVER_ERROR.value(),
"An unexpected error occurred: " + ex.getMessage()
));
}
@Data
@AllArgsConstructor
public static class ErrorResponse {
private int status;
private String message;
private List<String> details;
public ErrorResponse(int status, String message) {
this.status = status;
this.message = message;
}
}
}
Validation
We've already added basic validation annotations to our Product model. For custom validation:
@Documented
@Constraint(validatedBy = ProductValidator.class)
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ValidProduct {
String message() default "Invalid product";
Class<?>[] groups() default {};
Class<? extends Payload>[] payload() default {};
}
public class ProductValidator implements ConstraintValidator<ValidProduct, Product> {
@Override
public boolean isValid(Product product, ConstraintValidatorContext context) {
if (product.getPrice() != null && product.getQuantity() != null) {
if (product.getPrice() > 1000 && product.getQuantity() > 100) {
context.disableDefaultConstraintViolation();
context.buildConstraintViolationWithTemplate("High-value products cannot have quantity over 100")
.addPropertyNode("quantity")
.addConstraintViolation();
return false;
}
}
return true;
}
}
Then add @ValidProduct
annotation to your Product class.
Testing
Let's write comprehensive tests for our reactive application.
Repository Tests
@DataMongoTest
@ExtendWith(SpringExtension.class)
public class ProductRepositoryTest {
@Autowired
private ProductRepository productRepository;
@Autowired
private ReactiveMongoTemplate reactiveMongoTemplate;
@BeforeEach
public void setUp() {
reactiveMongoTemplate.remove(Product.class).all().block();
Product product1 = Product.builder()
.name("Laptop")
.description("High performance laptop")
.price(999.99)
.quantity(10)
.tags(List.of("electronics", "computers"))
.build();
Product product2 = Product.builder()
.name("Smartphone")
.description("Latest smartphone model")
.price(699.99)
.quantity(20)
.tags(List.of("electronics", "mobile"))
.build();
productRepository.saveAll(List.of(product1, product2)).blockLast();
}
@Test
public void whenFindByName_thenReturnProduct() {
StepVerifier.create(productRepository.findByNameContainingIgnoreCase("Laptop"))
.expectNextMatches(product -> product.getName().equals("Laptop"))
.verifyComplete();
}
@Test
public void whenFindByPriceBetween_thenReturnProducts() {
StepVerifier.create(productRepository.findByPriceBetween(500.0, 1000.0))
.expectNextCount(2)
.verifyComplete();
}
}
Service Tests
@ExtendWith(MockitoExtension.class)
public class ProductServiceTest {
@Mock
private ProductRepository productRepository;
@InjectMocks
private ProductService productService;
@Test
public void whenCreateProduct_thenReturnCreatedProduct() {
Product product = Product.builder()
.name("Test Product")
.description("Test Description")
.price(100.0)
.quantity(5)
.build();
when(productRepository.save(any(Product.class)))
.thenReturn(Mono.just(product.withId("123")));
StepVerifier.create(productService.createProduct(product))
.expectNextMatches(p -> p.getId().equals("123"))
.verifyComplete();
}
@Test
public void whenGetNonExistingProduct_thenThrowException() {
when(productRepository.findById("nonexistent"))
.thenReturn(Mono.empty());
StepVerifier.create(productService.getProductById("nonexistent"))
.expectError(ResourceNotFoundException.class)
.verify();
}
}
Controller Tests
@WebFluxTest(ProductController.class)
@Import({ProductService.class, GlobalErrorHandler.class})
public class ProductControllerTest {
@MockBean
private ProductRepository productRepository;
@Autowired
private WebTestClient webTestClient;
@Test
public void whenGetAllProducts_thenReturnProducts() {
Product product1 = Product.builder().id("1").name("Product 1").build();
Product product2 = Product.builder().id("2").name("Product 2").build();
when(productRepository.findAll()).thenReturn(Flux.just(product1, product2));
webTestClient.get().uri("/api/products")
.exchange()
.expectStatus().isOk()
.expectBodyList(Product.class)
.hasSize(2);
}
@Test
public void whenCreateInvalidProduct_thenReturnBadRequest() {
Product invalidProduct = Product.builder()
.name("") // Invalid name
.description("") // Invalid description
.price(-10.0) // Invalid price
.quantity(-5) // Invalid quantity
.build();
webTestClient.post().uri("/api/products")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(invalidProduct)
.exchange()
.expectStatus().isBadRequest()
.expectBody()
.jsonPath("$.status").isEqualTo(400)
.jsonPath("$.details").isArray();
}
}
Advanced Features
1. Pagination and Sorting
Add pagination support to your repository and controller:
// In ProductRepository
Flux<Product> findAllBy(Pageable pageable);
// In ProductService
public Flux<Product> getAllProducts(int page, int size, String sortBy, String direction) {
Sort.Direction sortDirection = Sort.Direction.fromString(direction);
Pageable pageable = PageRequest.of(page, size, sortDirection, sortBy);
return productRepository.findAllBy(pageable);
}
// In ProductController
@GetMapping
public Flux<Product> getAllProducts(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "10") int size,
@RequestParam(defaultValue = "name") String sortBy,
@RequestParam(defaultValue = "ASC") String direction) {
return productService.getAllProducts(page, size, sortBy, direction);
}
2. Server-Sent Events (SSE)
Add SSE endpoint for real-time product updates:
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Product> streamProducts() {
return productRepository.findAll()
.delayElements(Duration.ofSeconds(1)) // Emit every second for demo
.log();
}
3. Transaction Support
Add transactional operations:
@Transactional
public Mono<Product> updateProductQuantity(String productId, int quantityChange) {
return productRepository.findById(productId)
.flatMap(product -> {
int newQuantity = product.getQuantity() + quantityChange;
if (newQuantity < 0) {
return Mono.error(new BusinessException("Insufficient quantity"));
}
product.setQuantity(newQuantity);
return productRepository.save(product);
});
}
4. Custom Query with Aggregation
Add a complex query using aggregation:
// In ProductRepository
@Aggregation(pipeline = {
"{ $match: { price: { $gte: ?0 } } }",
"{ $group: { _id: null, averagePrice: { $avg: '$price' }, count: { $sum: 1 } } }"
})
Mono<ProductStats> getProductStats(Double minPrice);
// ProductStats class
@Data
@AllArgsConstructor
public class ProductStats {
private Double averagePrice;
private Long count;
}
// In ProductService
public Mono<ProductStats> getProductStats(Double minPrice) {
return productRepository.getProductStats(minPrice);
}
5. Rate Limiting
Add rate limiting to your endpoints:
@Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
return http
.authorizeExchange()
.pathMatchers("/api/products/**").permitAll()
.and()
.addFilterAt(rateLimiterFilter(), SecurityWebFiltersOrder.AUTHENTICATION)
.csrf().disable()
.build();
}
private WebFilter rateLimiterFilter() {
RateLimiterConfig config = RateLimiterConfig.custom()
.limitForPeriod(100)
.limitRefreshPeriod(Duration.ofMinutes(1))
.timeoutDuration(Duration.ofSeconds(5))
.build();
RateLimiter rateLimiter = RateLimiter.of("product-api", config);
return (exchange, chain) -> {
Mono<RateLimiter.Response> rateLimitResponse = rateLimiter.checkPermission(1);
return rateLimitResponse
.flatMap(response -> {
if (response.getTokensRemaining() < 0) {
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
});
};
}
Conclusion
This comprehensive guide has walked you through building a complete reactive Spring Boot application with WebFlux and Reactive MongoDB. We've covered:
- Project setup and configuration
- Domain modeling with reactive MongoDB
- Implementing reactive repositories, services, and controllers
- Comprehensive error handling and validation
- Testing reactive components
- Advanced features like pagination, SSE, transactions, and rate limiting
The reactive approach provides non-blocking, scalable solutions perfect for modern applications. Remember to:
- Always handle errors properly in reactive streams
- Test your reactive components thoroughly
- Monitor your application's performance
- Consider backpressure strategies for high-load scenarios
This implementation provides a solid foundation that you can extend with additional features like security, caching, or more complex business logic as needed.