Skip to content

Commit cb26cd2

Browse files
committed
Add auto-create-topics support to RabbitMQ provider as well.
1 parent 8e6b69f commit cb26cd2

File tree

9 files changed

+168
-28
lines changed

9 files changed

+168
-28
lines changed

docs/configuration.rst

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ An example configuration in `appsettings.json`:
4949
"Port": 5672,
5050
"Username": "guest",
5151
"Password": "guest",
52-
"VirtualHost": "/"
52+
"VirtualHost": "/",
53+
"AutoCreateTopics": true
5354
},
5455
"Kafka": {
5556
"BootstrapServers": "localhost:9092",
@@ -141,14 +142,18 @@ OpenDDD.NET supports multiple messaging providers:
141142
port: 5672,
142143
username: "guest",
143144
password: "guest",
144-
virtualHost: "/"
145+
virtualHost: "/",
146+
autoCreateTopics: true
145147
);
146148
147149
**Kafka**:
148150

149151
.. code-block:: csharp
150152
151-
options.UseKafka("localhost:9092");
153+
options.UseKafka(
154+
"localhost:9092",
155+
autoCreateTopics: true
156+
);
152157
153158
**Azure Service Bus**:
154159

docs/userguide.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,8 @@ Add the following configuration to your `appsettings.json` file to customize Ope
599599
"Port": 5672,
600600
"Username": "guest",
601601
"Password": "guest",
602-
"VirtualHost": "/"
602+
"VirtualHost": "/",
603+
"AutoCreateTopics": true
603604
},
604605
"Kafka": {
605606
"BootstrapServers": "localhost:9092",

samples/Bookstore/src/Bookstore/appsettings.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
"Port": 5672,
3333
"Username": "guest",
3434
"Password": "guest",
35-
"VirtualHost": "/"
35+
"VirtualHost": "/",
36+
"AutoCreateTopics": true
3637
},
3738
"Kafka": {
3839
"BootstrapServers": "localhost:9092",

src/OpenDDD.Tests/Integration/Infrastructure/Events/RabbitMq/RabbitMqMessagingProviderTests.cs

Lines changed: 80 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Concurrent;
2+
using FluentAssertions;
23
using Microsoft.Extensions.Logging;
34
using Xunit.Abstractions;
45
using OpenDDD.Infrastructure.Events.RabbitMq;
@@ -38,7 +39,11 @@ public RabbitMqMessagingProviderTests(ITestOutputHelper testOutputHelper)
3839
};
3940

4041
_consumerFactory = new RabbitMqConsumerFactory(_logger);
41-
_messagingProvider = new RabbitMqMessagingProvider(_connectionFactory, _consumerFactory, _logger);
42+
_messagingProvider = new RabbitMqMessagingProvider(
43+
_connectionFactory,
44+
_consumerFactory,
45+
autoCreateTopics: true,
46+
_logger);
4247
}
4348

4449
public async Task InitializeAsync()
@@ -126,29 +131,65 @@ private async Task CleanupExchangesAndQueuesAsync()
126131
public async Task AutoCreateTopic_ShouldCreateTopicOnSubscribe_WhenSettingEnabled()
127132
{
128133
// Arrange
129-
await VerifyExchangeAndQueueDoNotExist();
134+
var topicName = $"test-topic-{Guid.NewGuid()}";
135+
var consumerGroup = "test-consumer-group";
136+
137+
var messagingProvider = new RabbitMqMessagingProvider(
138+
_connectionFactory,
139+
_consumerFactory,
140+
autoCreateTopics: true, // Auto-create enabled
141+
_logger);
142+
143+
var exchangeExistsBefore = await ExchangeExistsAsync(topicName, _cts.Token);
144+
exchangeExistsBefore.Should().BeFalse("The exchange should not exist before subscribing.");
130145

131146
// Act
132-
await _messagingProvider.SubscribeAsync(_testTopic, _testConsumerGroup, (msg, token) => Task.CompletedTask);
147+
await messagingProvider.SubscribeAsync(topicName, consumerGroup, async (msg, token) =>
148+
await Task.CompletedTask, _cts.Token);
149+
150+
var timeout = TimeSpan.FromSeconds(30);
151+
var pollingInterval = TimeSpan.FromMilliseconds(500);
152+
var startTime = DateTime.UtcNow;
133153

134-
// Assert
135-
try
154+
bool exchangeExists = false;
155+
while (DateTime.UtcNow - startTime < timeout)
136156
{
137-
await _channel!.ExchangeDeclarePassiveAsync(_testTopic, CancellationToken.None);
138-
}
139-
catch (OperationInterruptedException)
140-
{
141-
Assert.Fail($"Exchange '{_testTopic}' does not exist.");
157+
if (await ExchangeExistsAsync(topicName, _cts.Token))
158+
{
159+
exchangeExists = true;
160+
break;
161+
}
162+
await Task.Delay(pollingInterval, _cts.Token);
142163
}
143164

144-
try
145-
{
146-
await _channel!.QueueDeclarePassiveAsync($"{_testConsumerGroup}.{_testTopic}", CancellationToken.None);
147-
}
148-
catch (OperationInterruptedException)
165+
// Assert
166+
exchangeExists.Should().BeTrue("RabbitMQ should create the exchange automatically when subscribing.");
167+
}
168+
169+
[Fact]
170+
public async Task AutoCreateTopic_ShouldNotCreateTopicOnSubscribe_WhenSettingDisabled()
171+
{
172+
// Arrange
173+
var topicName = $"test-topic-{Guid.NewGuid()}";
174+
var consumerGroup = "test-consumer-group";
175+
176+
var messagingProvider = new RabbitMqMessagingProvider(
177+
_connectionFactory,
178+
_consumerFactory,
179+
autoCreateTopics: false, // Auto-create disabled
180+
_logger);
181+
182+
var exchangeExistsBefore = await ExchangeExistsAsync(topicName, _cts.Token);
183+
exchangeExistsBefore.Should().BeFalse("The exchange should not exist before subscribing.");
184+
185+
// Act & Assert
186+
var exception = await Assert.ThrowsAsync<InvalidOperationException>(async () =>
149187
{
150-
Assert.Fail($"Queue '{_testConsumerGroup}.{_testTopic}' does not exist.");
151-
}
188+
await messagingProvider.SubscribeAsync(topicName, consumerGroup, async (msg, token) =>
189+
await Task.CompletedTask, _cts.Token);
190+
});
191+
192+
exception.Message.Should().Contain($"Topic '{topicName}' does not exist.");
152193
}
153194

154195
[Fact]
@@ -357,5 +398,27 @@ async Task MessageHandler(string msg, CancellationToken token)
357398
Assert.InRange(count, minAllowed, maxAllowed);
358399
}
359400
}
401+
402+
private async Task<bool> ExchangeExistsAsync(string exchange, CancellationToken cancellationToken)
403+
{
404+
try
405+
{
406+
// Use a temporary channel to check exchange existence, since old one might have stale topic data
407+
using var tempChannel = await _connection!.CreateChannelAsync(null, cancellationToken);
408+
await tempChannel.ExchangeDeclarePassiveAsync(exchange, cancellationToken);
409+
410+
return true;
411+
}
412+
catch (OperationInterruptedException ex) when (ex.ShutdownReason?.ReplyCode == 404)
413+
{
414+
_logger.LogDebug("Exchange '{Exchange}' does not exist yet.", exchange);
415+
return false;
416+
}
417+
catch (Exception ex)
418+
{
419+
_logger.LogError(ex, "Unexpected error while checking if exchange '{Exchange}' exists.");
420+
throw;
421+
}
422+
}
360423
}
361424
}

src/OpenDDD.Tests/Unit/Infrastructure/Events/RabbitMq/RabbitMqMessagingProviderTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public RabbitMqMessagingProviderTests()
2626
_provider = new RabbitMqMessagingProvider(
2727
_mockConnectionFactory.Object,
2828
_mockConsumerFactory.Object,
29+
autoCreateTopics: true,
2930
_mockLogger.Object
3031
);
3132
}
@@ -42,7 +43,7 @@ public void Constructor_ShouldThrowException_WhenDependenciesAreNull(
4243
var mockLogger = logger is null ? null! : _mockLogger.Object;
4344

4445
Assert.Throws<ArgumentNullException>(() =>
45-
new RabbitMqMessagingProvider(mockConnectionFactory, mockConsumerFactory, mockLogger));
46+
new RabbitMqMessagingProvider(mockConnectionFactory, mockConsumerFactory, autoCreateTopics: true, mockLogger));
4647
}
4748

4849
[Theory]

src/OpenDDD/API/Extensions/OpenDddServiceCollectionExtensions.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,11 @@ private static void AddRabbitMq(this IServiceCollection services)
323323

324324
var consumerFactory = new RabbitMqConsumerFactory(logger);
325325

326-
return new RabbitMqMessagingProvider(connectionFactory, consumerFactory, logger);
326+
return new RabbitMqMessagingProvider(
327+
connectionFactory,
328+
consumerFactory,
329+
rabbitMqOptions.AutoCreateTopics,
330+
logger);
327331
});
328332
}
329333

src/OpenDDD/API/Options/OpenDddOptions.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,13 @@ public OpenDddOptions UseInMemoryMessaging()
6262
return this;
6363
}
6464

65-
public OpenDddOptions UseRabbitMq(string hostName, int port, string username, string password, string virtualHost = "/")
65+
public OpenDddOptions UseRabbitMq(
66+
string hostName,
67+
int port,
68+
string username,
69+
string password,
70+
string virtualHost = "/",
71+
bool autoCreateTopics = true)
6672
{
6773
MessagingProvider = "RabbitMq";
6874
RabbitMq = new OpenDddRabbitMqOptions
@@ -71,12 +77,13 @@ public OpenDddOptions UseRabbitMq(string hostName, int port, string username, st
7177
Port = port,
7278
Username = username,
7379
Password = password,
74-
VirtualHost = virtualHost
80+
VirtualHost = virtualHost,
81+
AutoCreateTopics = autoCreateTopics
7582
};
7683
return this;
7784
}
7885

79-
public OpenDddOptions UseKafka(string bootstrapServers)
86+
public OpenDddOptions UseKafka(string bootstrapServers, bool autoCreateTopics = true)
8087
{
8188
MessagingProvider = "Kafka";
8289
Kafka = new OpenDddKafkaOptions { BootstrapServers = bootstrapServers };

src/OpenDDD/Infrastructure/Events/RabbitMq/Options/OpenDddRabbitMqOptions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ public class OpenDddRabbitMqOptions
77
public string Username { get; set; } = "guest";
88
public string Password { get; set; } = "guest";
99
public string VirtualHost { get; set; } = "/";
10+
public bool AutoCreateTopics { get; set; } = true;
1011
}
1112
}

src/OpenDDD/Infrastructure/Events/RabbitMq/RabbitMqMessagingProvider.cs

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using OpenDDD.Infrastructure.Events.Base;
55
using OpenDDD.Infrastructure.Events.RabbitMq.Factories;
66
using RabbitMQ.Client;
7+
using RabbitMQ.Client.Exceptions;
78

89
namespace OpenDDD.Infrastructure.Events.RabbitMq
910
{
@@ -12,17 +13,20 @@ public class RabbitMqMessagingProvider : IMessagingProvider, IAsyncDisposable
1213
private readonly IConnectionFactory _connectionFactory;
1314
private readonly IRabbitMqConsumerFactory _consumerFactory;
1415
private readonly ILogger<RabbitMqMessagingProvider> _logger;
16+
private readonly bool _autoCreateTopics;
1517
private IConnection? _connection;
1618
private IChannel? _channel;
1719
private readonly ConcurrentDictionary<string, RabbitMqSubscription> _subscriptions = new();
1820

1921
public RabbitMqMessagingProvider(
2022
IConnectionFactory factory,
2123
IRabbitMqConsumerFactory consumerFactory,
24+
bool autoCreateTopics,
2225
ILogger<RabbitMqMessagingProvider> logger)
2326
{
2427
_connectionFactory = factory ?? throw new ArgumentNullException(nameof(factory));
2528
_consumerFactory = consumerFactory ?? throw new ArgumentNullException(nameof(consumerFactory));
29+
_autoCreateTopics = autoCreateTopics;
2630
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
2731
}
2832

@@ -41,6 +45,22 @@ public async Task<ISubscription> SubscribeAsync(string topic, string consumerGro
4145

4246
if (_channel is null) throw new InvalidOperationException("RabbitMQ channel is not available.");
4347

48+
bool exchangeExists = await ExchangeExistsAsync(topic, cancellationToken);
49+
50+
if (!exchangeExists)
51+
{
52+
if (_autoCreateTopics)
53+
{
54+
await _channel.ExchangeDeclareAsync(topic, ExchangeType.Fanout, durable: true, autoDelete: false, cancellationToken: cancellationToken);
55+
_logger.LogInformation("Auto-created exchange (topic): {Topic}", topic);
56+
}
57+
else
58+
{
59+
_logger.LogError("Cannot subscribe to non-existent topic: {Topic}", topic);
60+
throw new InvalidOperationException($"Topic '{topic}' does not exist.");
61+
}
62+
}
63+
4464
await _channel.ExchangeDeclareAsync(topic, ExchangeType.Fanout, durable: true, autoDelete: false, cancellationToken: cancellationToken);
4565
var queueName = $"{consumerGroup}.{topic}";
4666
await _channel.QueueDeclareAsync(queueName, durable: true, exclusive: false, autoDelete: false, cancellationToken: cancellationToken);
@@ -87,7 +107,21 @@ public async Task PublishAsync(string topic, string message, CancellationToken c
87107

88108
if (_channel is null) throw new InvalidOperationException("RabbitMQ channel is not available.");
89109

90-
await _channel.ExchangeDeclareAsync(topic, ExchangeType.Fanout, durable: true, autoDelete: false, cancellationToken: cancellationToken);
110+
bool exchangeExists = await ExchangeExistsAsync(topic, cancellationToken);
111+
112+
if (!exchangeExists)
113+
{
114+
if (_autoCreateTopics)
115+
{
116+
await _channel.ExchangeDeclareAsync(topic, ExchangeType.Fanout, durable: true, autoDelete: false, cancellationToken: cancellationToken);
117+
_logger.LogInformation("Auto-created exchange (topic): {Topic}", topic);
118+
}
119+
else
120+
{
121+
_logger.LogError("Cannot publish to non-existent topic: {Topic}", topic);
122+
throw new InvalidOperationException($"Topic '{topic}' does not exist.");
123+
}
124+
}
91125

92126
var body = Encoding.UTF8.GetBytes(message);
93127
await _channel.BasicPublishAsync(topic, "", body, cancellationToken: cancellationToken);
@@ -102,6 +136,29 @@ private async Task EnsureConnectedAsync(CancellationToken cancellationToken)
102136
_connection = await _connectionFactory.CreateConnectionAsync(cancellationToken);
103137
_channel = await _connection.CreateChannelAsync(null, cancellationToken);
104138
}
139+
140+
private async Task<bool> ExchangeExistsAsync(string exchange, CancellationToken cancellationToken)
141+
{
142+
try
143+
{
144+
await _channel!.ExchangeDeclarePassiveAsync(exchange, cancellationToken);
145+
return true;
146+
}
147+
catch (OperationInterruptedException ex) when (ex.ShutdownReason?.ReplyCode == 404)
148+
{
149+
_logger.LogDebug("Exchange '{Exchange}' does not exist.", exchange);
150+
151+
// Since the channel was closed, reopen it before returning
152+
await EnsureConnectedAsync(cancellationToken);
153+
154+
return false;
155+
}
156+
catch (Exception ex)
157+
{
158+
_logger.LogError(ex, "Unexpected error while checking if exchange '{Exchange}' exists.", exchange);
159+
throw;
160+
}
161+
}
105162

106163
public async ValueTask DisposeAsync()
107164
{

0 commit comments

Comments
 (0)