Skip to content

Commit 531834c

Browse files
committed
Fix live candle subscription leak on unsubscribe
CandleBuilderManager created a live candle subscription via LoopBack during hist+live upgrade but never unsubscribed it when the user called UnSubscribe. Added LiveCandleTransactionId tracking to SeriesInfo and LoopBack unsubscribe in the teardown path. Added 6 tests (sync + async) verifying unsubscribe reaches the adapter for candles, ticks, and level1 subscriptions.
1 parent cd6e61a commit 531834c

2 files changed

Lines changed: 281 additions & 1 deletion

File tree

Algo/Candles/Compression/CandleBuilderManager.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ public MarketDataMessage Current
8484

8585
public VolumeProfileBuilder VolumeProfile { get; set; }
8686

87+
public long? LiveCandleTransactionId { get; set; }
88+
8789
public bool IsCountExhausted => Count <= 0;
8890
}
8991

@@ -422,13 +424,26 @@ await InnerAdapter.TryGetCandlesBuildFromAsync(original, _candleBuilderProvider,
422424
if (series is null)
423425
return ([mdMsg], []);
424426

427+
var extraOut = new List<Message>();
428+
425429
var unsubscribe = series.Current.TypedClone();
426430

427431
unsubscribe.OriginalTransactionId = unsubscribe.TransactionId;
428432
unsubscribe.TransactionId = transactionId;
429433
unsubscribe.IsSubscribe = false;
430434

431-
return ([unsubscribe], []);
435+
if (series.LiveCandleTransactionId is long liveTxId)
436+
{
437+
var liveUnsub = series.Original.TypedClone();
438+
liveUnsub.OriginalTransactionId = liveTxId;
439+
liveUnsub.TransactionId = _idGenerator.GetNextId();
440+
liveUnsub.IsSubscribe = false;
441+
442+
liveUnsub.LoopBack(_adapter);
443+
extraOut.Add(liveUnsub);
444+
}
445+
446+
return ([unsubscribe], extraOut.ToArray());
432447
}
433448
}
434449

@@ -781,7 +796,10 @@ async ValueTask FinishAsync()
781796
liveCandleSub.IsFinishedOnly = true;
782797

783798
using (await _sync.LockAsync(cancellationToken))
799+
{
784800
_replaceId.Add(liveCandleSub.TransactionId, series.Id);
801+
series.LiveCandleTransactionId = liveCandleSub.TransactionId;
802+
}
785803

786804
liveCandleSub.LoopBack(_adapter);
787805
extraOut.Add(liveCandleSub);

Tests/ConnectorBasketTests.cs

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -844,13 +844,15 @@ await Task.Run(async () =>
844844
private sealed class CandleMockAdapter : MessageAdapter
845845
{
846846
public ConcurrentQueue<MarketDataMessage> RecordedSubscriptions { get; } = [];
847+
public ConcurrentQueue<MarketDataMessage> RecordedUnsubscriptions { get; } = [];
847848

848849
public CandleMockAdapter(IdGenerator transactionIdGenerator) : base(transactionIdGenerator)
849850
{
850851
this.AddMarketDataSupport();
851852
this.AddTransactionalSupport();
852853
this.AddSupportedMarketDataType(TimeSpan.FromMinutes(5).TimeFrame());
853854
this.AddSupportedMarketDataType(DataType.Ticks);
855+
this.AddSupportedMarketDataType(DataType.Level1);
854856
}
855857

856858
public override bool IsAllDownloadingSupported(DataType dataType)
@@ -881,6 +883,7 @@ protected override async ValueTask OnSendInMessageAsync(Message message, Cancell
881883
}
882884
else
883885
{
886+
RecordedUnsubscriptions.Enqueue(mdMsg.TypedClone());
884887
await SendOutMessageAsync(mdMsg.CreateResponse(), cancellationToken);
885888
}
886889
break;
@@ -1011,5 +1014,264 @@ await Task.Run(async () =>
10111014
runCts.Cancel();
10121015
}
10131016

1017+
[TestMethod]
1018+
[Timeout(15_000, CooperativeCancellation = true)]
1019+
public async Task CandleHistLive_UnsubscribeShouldReachAdapter()
1020+
{
1021+
// After hist+live candle subscription is fully established (history→ticks+live candle),
1022+
// calling UnSubscribe should send unsubscribe messages to the adapter
1023+
// for ALL active subscriptions (ticks build + live candle).
1024+
1025+
var (connector, adapter, _) = CreateConnectorForCandleTest();
1026+
await connector.ConnectAsync(CancellationToken);
1027+
1028+
var security = new Security { Id = "AAPL@TEST" };
1029+
await connector.SendOutMessageAsync(security.ToMessage(), CancellationToken);
1030+
1031+
var sub = new Subscription(TimeSpan.FromMinutes(5).TimeFrame(), security)
1032+
{
1033+
From = DateTime.UtcNow.AddDays(-1),
1034+
};
1035+
1036+
connector.Subscribe(sub);
1037+
1038+
// Wait for full pipeline cycle (ticks subscription = proof history finished and compress started)
1039+
await Task.Run(async () =>
1040+
{
1041+
while (!adapter.RecordedSubscriptions.Any(m => m.DataType2 == DataType.Ticks && !m.IsHistoryOnly()))
1042+
await Task.Delay(10, CancellationToken);
1043+
}, CancellationToken);
1044+
1045+
await Task.Delay(500, CancellationToken);
1046+
1047+
// Now unsubscribe via sync API
1048+
connector.UnSubscribe(sub);
1049+
1050+
// Wait for unsubscribe messages to arrive
1051+
await Task.Run(async () =>
1052+
{
1053+
while (adapter.RecordedUnsubscriptions.Count < 2)
1054+
await Task.Delay(10, CancellationToken);
1055+
}, CancellationToken);
1056+
1057+
await Task.Delay(200, CancellationToken);
1058+
1059+
var unsubs = adapter.RecordedUnsubscriptions.ToList();
1060+
1061+
// Should have unsubscribed the ticks build subscription
1062+
var tickUnsub = unsubs.FirstOrDefault(m => m.DataType2 == DataType.Ticks);
1063+
IsNotNull(tickUnsub,
1064+
$"Expected ticks unsubscribe at adapter. Unsubs: [{string.Join("; ", unsubs.Select(m => m.DataType2.ToString()))}]");
1065+
1066+
// Should have unsubscribed the live candle subscription
1067+
var candleUnsub = unsubs.FirstOrDefault(m => m.DataType2.IsTFCandles);
1068+
IsNotNull(candleUnsub,
1069+
$"Expected candle unsubscribe at adapter. Unsubs: [{string.Join("; ", unsubs.Select(m => m.DataType2.ToString()))}]");
1070+
}
1071+
1072+
[TestMethod]
1073+
[Timeout(15_000, CooperativeCancellation = true)]
1074+
public async Task TicksLive_UnsubscribeShouldReachAdapter()
1075+
{
1076+
// Subscribe to live ticks via sync API → UnSubscribe → adapter should receive unsubscribe.
1077+
1078+
var (connector, adapter, _) = CreateConnectorForCandleTest();
1079+
await connector.ConnectAsync(CancellationToken);
1080+
1081+
var security = new Security { Id = "AAPL@TEST" };
1082+
await connector.SendOutMessageAsync(security.ToMessage(), CancellationToken);
1083+
1084+
var sub = new Subscription(DataType.Ticks, security);
1085+
1086+
connector.Subscribe(sub);
1087+
1088+
// Wait for subscription to reach adapter
1089+
await Task.Run(async () =>
1090+
{
1091+
while (!adapter.RecordedSubscriptions.Any(m => m.DataType2 == DataType.Ticks))
1092+
await Task.Delay(10, CancellationToken);
1093+
}, CancellationToken);
1094+
1095+
await Task.Delay(200, CancellationToken);
1096+
1097+
// Now unsubscribe via sync API
1098+
connector.UnSubscribe(sub);
1099+
1100+
// Wait for unsubscribe
1101+
await Task.Run(async () =>
1102+
{
1103+
while (!adapter.RecordedUnsubscriptions.Any(m => m.DataType2 == DataType.Ticks))
1104+
await Task.Delay(10, CancellationToken);
1105+
}, CancellationToken);
1106+
1107+
var unsub = adapter.RecordedUnsubscriptions.First(m => m.DataType2 == DataType.Ticks);
1108+
unsub.IsSubscribe.AssertFalse();
1109+
}
1110+
1111+
[TestMethod]
1112+
[Timeout(15_000, CooperativeCancellation = true)]
1113+
public async Task Level1Live_UnsubscribeShouldReachAdapter()
1114+
{
1115+
// Subscribe to live Level1 via sync API → UnSubscribe → adapter should receive unsubscribe.
1116+
1117+
var (connector, adapter, _) = CreateConnectorForCandleTest();
1118+
await connector.ConnectAsync(CancellationToken);
1119+
1120+
var security = new Security { Id = "AAPL@TEST" };
1121+
await connector.SendOutMessageAsync(security.ToMessage(), CancellationToken);
1122+
1123+
var sub = new Subscription(DataType.Level1, security);
1124+
1125+
connector.Subscribe(sub);
1126+
1127+
// Wait for subscription to reach adapter
1128+
await Task.Run(async () =>
1129+
{
1130+
while (!adapter.RecordedSubscriptions.Any(m => m.DataType2 == DataType.Level1))
1131+
await Task.Delay(10, CancellationToken);
1132+
}, CancellationToken);
1133+
1134+
await Task.Delay(200, CancellationToken);
1135+
1136+
// Now unsubscribe via sync API
1137+
connector.UnSubscribe(sub);
1138+
1139+
// Wait for unsubscribe
1140+
await Task.Run(async () =>
1141+
{
1142+
while (!adapter.RecordedUnsubscriptions.Any(m => m.DataType2 == DataType.Level1))
1143+
await Task.Delay(10, CancellationToken);
1144+
}, CancellationToken);
1145+
1146+
var unsub = adapter.RecordedUnsubscriptions.First(m => m.DataType2 == DataType.Level1);
1147+
unsub.IsSubscribe.AssertFalse();
1148+
}
1149+
1150+
[TestMethod]
1151+
[Timeout(15_000, CooperativeCancellation = true)]
1152+
public async Task CandleHistLive_UnsubscribeAsync_ShouldReachAdapter()
1153+
{
1154+
// Same scenario as sync version but using SubscribeAsync + CancellationToken.
1155+
1156+
var (connector, adapter, _) = CreateConnectorForCandleTest();
1157+
await connector.ConnectAsync(CancellationToken);
1158+
1159+
var security = new Security { Id = "AAPL@TEST" };
1160+
await connector.SendOutMessageAsync(security.ToMessage(), CancellationToken);
1161+
1162+
var sub = new Subscription(TimeSpan.FromMinutes(5).TimeFrame(), security)
1163+
{
1164+
From = DateTime.UtcNow.AddDays(-1),
1165+
};
1166+
1167+
using var runCts = new CancellationTokenSource();
1168+
var run = connector.SubscribeAsync(sub, runCts.Token);
1169+
1170+
await Task.Run(async () =>
1171+
{
1172+
while (!adapter.RecordedSubscriptions.Any(m => m.DataType2 == DataType.Ticks && !m.IsHistoryOnly()))
1173+
await Task.Delay(10, CancellationToken);
1174+
}, CancellationToken);
1175+
1176+
await Task.Delay(500, CancellationToken);
1177+
1178+
runCts.Cancel();
1179+
try { await run; } catch (OperationCanceledException) { }
1180+
1181+
await Task.Run(async () =>
1182+
{
1183+
while (adapter.RecordedUnsubscriptions.Count < 2)
1184+
await Task.Delay(10, CancellationToken);
1185+
}, CancellationToken);
1186+
1187+
await Task.Delay(200, CancellationToken);
1188+
1189+
var unsubs = adapter.RecordedUnsubscriptions.ToList();
1190+
1191+
var tickUnsub = unsubs.FirstOrDefault(m => m.DataType2 == DataType.Ticks);
1192+
IsNotNull(tickUnsub,
1193+
$"Expected ticks unsubscribe. Unsubs: [{string.Join("; ", unsubs.Select(m => m.DataType2.ToString()))}]");
1194+
1195+
var candleUnsub = unsubs.FirstOrDefault(m => m.DataType2.IsTFCandles);
1196+
IsNotNull(candleUnsub,
1197+
$"Expected candle unsubscribe. Unsubs: [{string.Join("; ", unsubs.Select(m => m.DataType2.ToString()))}]");
1198+
}
1199+
1200+
[TestMethod]
1201+
[Timeout(15_000, CooperativeCancellation = true)]
1202+
public async Task TicksLive_UnsubscribeAsync_ShouldReachAdapter()
1203+
{
1204+
// SubscribeAsync + CancellationToken cancel for ticks.
1205+
1206+
var (connector, adapter, _) = CreateConnectorForCandleTest();
1207+
await connector.ConnectAsync(CancellationToken);
1208+
1209+
var security = new Security { Id = "AAPL@TEST" };
1210+
await connector.SendOutMessageAsync(security.ToMessage(), CancellationToken);
1211+
1212+
var sub = new Subscription(DataType.Ticks, security);
1213+
1214+
using var runCts = new CancellationTokenSource();
1215+
var run = connector.SubscribeAsync(sub, runCts.Token);
1216+
1217+
await Task.Run(async () =>
1218+
{
1219+
while (!adapter.RecordedSubscriptions.Any(m => m.DataType2 == DataType.Ticks))
1220+
await Task.Delay(10, CancellationToken);
1221+
}, CancellationToken);
1222+
1223+
await Task.Delay(200, CancellationToken);
1224+
1225+
runCts.Cancel();
1226+
try { await run; } catch (OperationCanceledException) { }
1227+
1228+
await Task.Run(async () =>
1229+
{
1230+
while (!adapter.RecordedUnsubscriptions.Any(m => m.DataType2 == DataType.Ticks))
1231+
await Task.Delay(10, CancellationToken);
1232+
}, CancellationToken);
1233+
1234+
var unsub = adapter.RecordedUnsubscriptions.First(m => m.DataType2 == DataType.Ticks);
1235+
unsub.IsSubscribe.AssertFalse();
1236+
}
1237+
1238+
[TestMethod]
1239+
[Timeout(15_000, CooperativeCancellation = true)]
1240+
public async Task Level1Live_UnsubscribeAsync_ShouldReachAdapter()
1241+
{
1242+
// SubscribeAsync + CancellationToken cancel for Level1.
1243+
1244+
var (connector, adapter, _) = CreateConnectorForCandleTest();
1245+
await connector.ConnectAsync(CancellationToken);
1246+
1247+
var security = new Security { Id = "AAPL@TEST" };
1248+
await connector.SendOutMessageAsync(security.ToMessage(), CancellationToken);
1249+
1250+
var sub = new Subscription(DataType.Level1, security);
1251+
1252+
using var runCts = new CancellationTokenSource();
1253+
var run = connector.SubscribeAsync(sub, runCts.Token);
1254+
1255+
await Task.Run(async () =>
1256+
{
1257+
while (!adapter.RecordedSubscriptions.Any(m => m.DataType2 == DataType.Level1))
1258+
await Task.Delay(10, CancellationToken);
1259+
}, CancellationToken);
1260+
1261+
await Task.Delay(200, CancellationToken);
1262+
1263+
runCts.Cancel();
1264+
try { await run; } catch (OperationCanceledException) { }
1265+
1266+
await Task.Run(async () =>
1267+
{
1268+
while (!adapter.RecordedUnsubscriptions.Any(m => m.DataType2 == DataType.Level1))
1269+
await Task.Delay(10, CancellationToken);
1270+
}, CancellationToken);
1271+
1272+
var unsub = adapter.RecordedUnsubscriptions.First(m => m.DataType2 == DataType.Level1);
1273+
unsub.IsSubscribe.AssertFalse();
1274+
}
1275+
10141276
#endregion
10151277
}

0 commit comments

Comments
 (0)