CODE WITH SIBIN

Solving Real Problems with Real Code


Spring Boot Reactive REST API: WebFlux + MongoDB Complete Tutorial

This comprehensive guide will walk you through building a reactive Spring Boot application using WebFlux and Reactive MongoDB.

Table of Contents

  1. Project Setup
  2. Reactive MongoDB Configuration
  3. Domain Model
  4. Repository Layer
  5. Service Layer
  6. Controller Layer
  7. Error Handling
  8. Validation
  9. Testing
  10. Advanced Features

Project Setup

Let's begin by creating our Spring Boot project with the necessary dependencies.

Step 1: Generate Project Structure

Using Spring Initializr (https://start.spring.io/), select:

  • Project: Maven
  • Language: Java
  • Spring Boot: Latest stable version
  • Dependencies:
    • Spring Reactive Web (WebFlux)
    • Reactive MongoDB
    • Lombok (for reducing boilerplate code)
    • Validation (for bean validation)

Alternatively, add these dependencies to your pom.xml:

<dependencies>
    <!-- Spring Boot Starter WebFlux -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    
    <!-- Reactive MongoDB -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    </dependency>
    
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    
    <!-- Validation -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-validation</artifactId>
    </dependency>
    
    <!-- Test dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

Reactive MongoDB Configuration

Configure MongoDB connection in application.properties:

# MongoDB configuration
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=reactive_db
spring.data.mongodb.auto-index-creation=true

# Enable detailed logging for debugging (optional)
logging.level.org.springframework.data.mongodb=DEBUG
logging.level.reactor=DEBUG

For more advanced configuration, create a MongoConfig class:

@Configuration
public class MongoConfig {

    @Value("${spring.data.mongodb.host}")
    private String host;
    
    @Value("${spring.data.mongodb.port}")
    private int port;
    
    @Value("${spring.data.mongodb.database}")
    private String database;
    
    @Bean
    public ReactiveMongoClient reactiveMongoClient() {
        return MongoClients.create(String.format("mongodb://%s:%d", host, port));
    }
    
    @Bean
    public ReactiveMongoDatabaseFactory reactiveMongoDatabaseFactory() {
        return new SimpleReactiveMongoDatabaseFactory(reactiveMongoClient(), database);
    }
    
    @Bean
    public ReactiveMongoTemplate reactiveMongoTemplate() {
        return new ReactiveMongoTemplate(reactiveMongoDatabaseFactory());
    }
}

Domain Model

Let's create a sample domain model for our application. We'll use a "Product" entity as an example.

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Document(collection = "products")
public class Product {
    
    @Id
    private String id;
    
    @NotBlank(message = "Product name is required")
    @Size(min = 2, max = 100, message = "Product name must be between 2 and 100 characters")
    private String name;
    
    @NotBlank(message = "Product description is required")
    private String description;
    
    @NotNull(message = "Product price is required")
    @DecimalMin(value = "0.0", inclusive = false, message = "Price must be greater than 0")
    private Double price;
    
    @NotNull(message = "Product quantity is required")
    @Min(value = 0, message = "Quantity cannot be negative")
    private Integer quantity;
    
    private List<String> tags = new ArrayList<>();
    
    @CreatedDate
    private LocalDateTime createdAt;
    
    @LastModifiedDate
    private LocalDateTime updatedAt;
}

Repository Layer

Create a reactive repository interface for our Product entity:

public interface ProductRepository extends ReactiveMongoRepository<Product, String> {
    
    Flux<Product> findByNameContainingIgnoreCase(String name);
    
    Flux<Product> findByPriceBetween(Double minPrice, Double maxPrice);
    
    Flux<Product> findByTagsIn(List<String> tags);
    
    @Query("{ 'quantity': { $lt: ?0 } }")
    Flux<Product> findByLowQuantity(Integer threshold);
}

Service Layer

Implement the business logic in a reactive service:

@Service
@RequiredArgsConstructor
@Slf4j
public class ProductService {
    
    private final ProductRepository productRepository;
    
    public Mono<Product> createProduct(Product product) {
        return productRepository.save(product)
                .doOnSuccess(p -> log.info("Created product with ID: {}", p.getId()));
    }
    
    public Flux<Product> getAllProducts() {
        return productRepository.findAll()
                .switchIfEmpty(Flux.error(new ResourceNotFoundException("No products found")));
    }
    
    public Mono<Product> getProductById(String id) {
        return productRepository.findById(id)
                .switchIfEmpty(Mono.error(new ResourceNotFoundException("Product not found with ID: " + id)));
    }
    
    public Mono<Product> updateProduct(String id, Product product) {
        return productRepository.findById(id)
                .flatMap(existingProduct -> {
                    existingProduct.setName(product.getName());
                    existingProduct.setDescription(product.getDescription());
                    existingProduct.setPrice(product.getPrice());
                    existingProduct.setQuantity(product.getQuantity());
                    existingProduct.setTags(product.getTags());
                    return productRepository.save(existingProduct);
                })
                .switchIfEmpty(Mono.error(new ResourceNotFoundException("Product not found with ID: " + id)));
    }
    
    public Mono<Void> deleteProduct(String id) {
        return productRepository.findById(id)
                .flatMap(productRepository::delete)
                .switchIfEmpty(Mono.error(new ResourceNotFoundException("Product not found with ID: " + id)));
    }
    
    public Flux<Product> searchProducts(String name, Double minPrice, Double maxPrice) {
        if (name != null && minPrice != null && maxPrice != null) {
            return productRepository.findByNameContainingIgnoreCase(name)
                    .filter(p -> p.getPrice() >= minPrice && p.getPrice() <= maxPrice);
        } else if (name != null) {
            return productRepository.findByNameContainingIgnoreCase(name);
        } else if (minPrice != null && maxPrice != null) {
            return productRepository.findByPriceBetween(minPrice, maxPrice);
        }
        return productRepository.findAll();
    }
}

Controller Layer

Create a reactive REST controller:

@RestController
@RequestMapping("/api/products")
@RequiredArgsConstructor
public class ProductController {
    
    private final ProductService productService;
    
    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<Product> createProduct(@Valid @RequestBody Product product) {
        return productService.createProduct(product);
    }
    
    @GetMapping
    public Flux<Product> getAllProducts() {
        return productService.getAllProducts();
    }
    
    @GetMapping("/{id}")
    public Mono<Product> getProductById(@PathVariable String id) {
        return productService.getProductById(id);
    }
    
    @PutMapping("/{id}")
    public Mono<Product> updateProduct(@PathVariable String id, @Valid @RequestBody Product product) {
        return productService.updateProduct(id, product);
    }
    
    @DeleteMapping("/{id}")
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public Mono<Void> deleteProduct(@PathVariable String id) {
        return productService.deleteProduct(id);
    }
    
    @GetMapping("/search")
    public Flux<Product> searchProducts(
            @RequestParam(required = false) String name,
            @RequestParam(required = false) Double minPrice,
            @RequestParam(required = false) Double maxPrice) {
        return productService.searchProducts(name, minPrice, maxPrice);
    }
}

Error Handling

Create a global error handler for reactive streams:

@RestControllerAdvice
public class GlobalErrorHandler {
    
    @ExceptionHandler(ResourceNotFoundException.class)
    @ResponseStatus(HttpStatus.NOT_FOUND)
    public Mono<ErrorResponse> handleResourceNotFound(ResourceNotFoundException ex) {
        return Mono.just(new ErrorResponse(HttpStatus.NOT_FOUND.value(), ex.getMessage()));
    }
    
    @ExceptionHandler(MethodArgumentNotValidException.class)
    @ResponseStatus(HttpStatus.BAD_REQUEST)
    public Mono<ErrorResponse> handleValidationErrors(MethodArgumentNotValidException ex) {
        return Mono.just(new ErrorResponse(
            HttpStatus.BAD_REQUEST.value(),
            "Validation error",
            ex.getBindingResult().getAllErrors().stream()
                .map(DefaultMessageSourceResolvable::getDefaultMessage)
                .collect(Collectors.toList())
        ));
    }
    
    @ExceptionHandler(WebExchangeBindException.class)
    @ResponseStatus(HttpStatus.BAD_REQUEST)
    public Mono<ErrorResponse> handleWebExchangeBindException(WebExchangeBindException ex) {
        return Mono.just(new ErrorResponse(
            HttpStatus.BAD_REQUEST.value(),
            "Request validation failed",
            ex.getBindingResult().getAllErrors().stream()
                .map(DefaultMessageSourceResolvable::getDefaultMessage)
                .collect(Collectors.toList())
        ));
    }
    
    @ExceptionHandler(Exception.class)
    @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
    public Mono<ErrorResponse> handleAllExceptions(Exception ex) {
        return Mono.just(new ErrorResponse(
            HttpStatus.INTERNAL_SERVER_ERROR.value(),
            "An unexpected error occurred: " + ex.getMessage()
        ));
    }
    
    @Data
    @AllArgsConstructor
    public static class ErrorResponse {
        private int status;
        private String message;
        private List<String> details;
        
        public ErrorResponse(int status, String message) {
            this.status = status;
            this.message = message;
        }
    }
}

Validation

We've already added basic validation annotations to our Product model. For custom validation:

@Documented
@Constraint(validatedBy = ProductValidator.class)
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ValidProduct {
    String message() default "Invalid product";
    Class<?>[] groups() default {};
    Class<? extends Payload>[] payload() default {};
}
public class ProductValidator implements ConstraintValidator<ValidProduct, Product> {
    
    @Override
    public boolean isValid(Product product, ConstraintValidatorContext context) {
        if (product.getPrice() != null && product.getQuantity() != null) {
            if (product.getPrice() > 1000 && product.getQuantity() > 100) {
                context.disableDefaultConstraintViolation();
                context.buildConstraintViolationWithTemplate("High-value products cannot have quantity over 100")
                       .addPropertyNode("quantity")
                       .addConstraintViolation();
                return false;
            }
        }
        return true;
    }
}

Then add @ValidProduct annotation to your Product class.

Testing

Let's write comprehensive tests for our reactive application.

Repository Tests

@DataMongoTest
@ExtendWith(SpringExtension.class)
public class ProductRepositoryTest {
    
    @Autowired
    private ProductRepository productRepository;
    
    @Autowired
    private ReactiveMongoTemplate reactiveMongoTemplate;
    
    @BeforeEach
    public void setUp() {
        reactiveMongoTemplate.remove(Product.class).all().block();
        
        Product product1 = Product.builder()
                .name("Laptop")
                .description("High performance laptop")
                .price(999.99)
                .quantity(10)
                .tags(List.of("electronics", "computers"))
                .build();
        
        Product product2 = Product.builder()
                .name("Smartphone")
                .description("Latest smartphone model")
                .price(699.99)
                .quantity(20)
                .tags(List.of("electronics", "mobile"))
                .build();
        
        productRepository.saveAll(List.of(product1, product2)).blockLast();
    }
    
    @Test
    public void whenFindByName_thenReturnProduct() {
        StepVerifier.create(productRepository.findByNameContainingIgnoreCase("Laptop"))
                .expectNextMatches(product -> product.getName().equals("Laptop"))
                .verifyComplete();
    }
    
    @Test
    public void whenFindByPriceBetween_thenReturnProducts() {
        StepVerifier.create(productRepository.findByPriceBetween(500.0, 1000.0))
                .expectNextCount(2)
                .verifyComplete();
    }
}

Service Tests

@ExtendWith(MockitoExtension.class)
public class ProductServiceTest {
    
    @Mock
    private ProductRepository productRepository;
    
    @InjectMocks
    private ProductService productService;
    
    @Test
    public void whenCreateProduct_thenReturnCreatedProduct() {
        Product product = Product.builder()
                .name("Test Product")
                .description("Test Description")
                .price(100.0)
                .quantity(5)
                .build();
        
        when(productRepository.save(any(Product.class)))
                .thenReturn(Mono.just(product.withId("123")));
        
        StepVerifier.create(productService.createProduct(product))
                .expectNextMatches(p -> p.getId().equals("123"))
                .verifyComplete();
    }
    
    @Test
    public void whenGetNonExistingProduct_thenThrowException() {
        when(productRepository.findById("nonexistent"))
                .thenReturn(Mono.empty());
        
        StepVerifier.create(productService.getProductById("nonexistent"))
                .expectError(ResourceNotFoundException.class)
                .verify();
    }
}

Controller Tests

@WebFluxTest(ProductController.class)
@Import({ProductService.class, GlobalErrorHandler.class})
public class ProductControllerTest {
    
    @MockBean
    private ProductRepository productRepository;
    
    @Autowired
    private WebTestClient webTestClient;
    
    @Test
    public void whenGetAllProducts_thenReturnProducts() {
        Product product1 = Product.builder().id("1").name("Product 1").build();
        Product product2 = Product.builder().id("2").name("Product 2").build();
        
        when(productRepository.findAll()).thenReturn(Flux.just(product1, product2));
        
        webTestClient.get().uri("/api/products")
                .exchange()
                .expectStatus().isOk()
                .expectBodyList(Product.class)
                .hasSize(2);
    }
    
    @Test
    public void whenCreateInvalidProduct_thenReturnBadRequest() {
        Product invalidProduct = Product.builder()
                .name("") // Invalid name
                .description("") // Invalid description
                .price(-10.0) // Invalid price
                .quantity(-5) // Invalid quantity
                .build();
        
        webTestClient.post().uri("/api/products")
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(invalidProduct)
                .exchange()
                .expectStatus().isBadRequest()
                .expectBody()
                .jsonPath("$.status").isEqualTo(400)
                .jsonPath("$.details").isArray();
    }
}

Advanced Features

1. Pagination and Sorting

Add pagination support to your repository and controller:

// In ProductRepository
Flux<Product> findAllBy(Pageable pageable);

// In ProductService
public Flux<Product> getAllProducts(int page, int size, String sortBy, String direction) {
    Sort.Direction sortDirection = Sort.Direction.fromString(direction);
    Pageable pageable = PageRequest.of(page, size, sortDirection, sortBy);
    return productRepository.findAllBy(pageable);
}

// In ProductController
@GetMapping
public Flux<Product> getAllProducts(
        @RequestParam(defaultValue = "0") int page,
        @RequestParam(defaultValue = "10") int size,
        @RequestParam(defaultValue = "name") String sortBy,
        @RequestParam(defaultValue = "ASC") String direction) {
    return productService.getAllProducts(page, size, sortBy, direction);
}

2. Server-Sent Events (SSE)

Add SSE endpoint for real-time product updates:

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Product> streamProducts() {
    return productRepository.findAll()
            .delayElements(Duration.ofSeconds(1)) // Emit every second for demo
            .log();
}

3. Transaction Support

Add transactional operations:

@Transactional
public Mono<Product> updateProductQuantity(String productId, int quantityChange) {
    return productRepository.findById(productId)
            .flatMap(product -> {
                int newQuantity = product.getQuantity() + quantityChange;
                if (newQuantity < 0) {
                    return Mono.error(new BusinessException("Insufficient quantity"));
                }
                product.setQuantity(newQuantity);
                return productRepository.save(product);
            });
}

4. Custom Query with Aggregation

Add a complex query using aggregation:

// In ProductRepository
@Aggregation(pipeline = {
    "{ $match: { price: { $gte: ?0 } } }",
    "{ $group: { _id: null, averagePrice: { $avg: '$price' }, count: { $sum: 1 } } }"
})
Mono<ProductStats> getProductStats(Double minPrice);

// ProductStats class
@Data
@AllArgsConstructor
public class ProductStats {
    private Double averagePrice;
    private Long count;
}

// In ProductService
public Mono<ProductStats> getProductStats(Double minPrice) {
    return productRepository.getProductStats(minPrice);
}

5. Rate Limiting

Add rate limiting to your endpoints:

@Bean
public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
    return http
            .authorizeExchange()
            .pathMatchers("/api/products/**").permitAll()
            .and()
            .addFilterAt(rateLimiterFilter(), SecurityWebFiltersOrder.AUTHENTICATION)
            .csrf().disable()
            .build();
}

private WebFilter rateLimiterFilter() {
    RateLimiterConfig config = RateLimiterConfig.custom()
            .limitForPeriod(100)
            .limitRefreshPeriod(Duration.ofMinutes(1))
            .timeoutDuration(Duration.ofSeconds(5))
            .build();
    
    RateLimiter rateLimiter = RateLimiter.of("product-api", config);
    
    return (exchange, chain) -> {
        Mono<RateLimiter.Response> rateLimitResponse = rateLimiter.checkPermission(1);
        return rateLimitResponse
                .flatMap(response -> {
                    if (response.getTokensRemaining() < 0) {
                        exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
                        return exchange.getResponse().setComplete();
                    }
                    return chain.filter(exchange);
                });
    };
}

Conclusion

This comprehensive guide has walked you through building a complete reactive Spring Boot application with WebFlux and Reactive MongoDB. We've covered:

  1. Project setup and configuration
  2. Domain modeling with reactive MongoDB
  3. Implementing reactive repositories, services, and controllers
  4. Comprehensive error handling and validation
  5. Testing reactive components
  6. Advanced features like pagination, SSE, transactions, and rate limiting

The reactive approach provides non-blocking, scalable solutions perfect for modern applications. Remember to:

  • Always handle errors properly in reactive streams
  • Test your reactive components thoroughly
  • Monitor your application's performance
  • Consider backpressure strategies for high-load scenarios

This implementation provides a solid foundation that you can extend with additional features like security, caching, or more complex business logic as needed.

Leave a Reply

Your email address will not be published. Required fields are marked *