Skip to content
This repository was archived by the owner on Mar 27, 2020. It is now read-only.

Commit 61bf395

Browse files
Merge pull request #190 from Particular/hotfix-2.1.6
Fix excessive queue length querying in ASQ
2 parents ee07999 + 50cc8fa commit 61bf395

1 file changed

Lines changed: 33 additions & 28 deletions

File tree

src/ServiceControl.Transports.AzureStorageQueues/QueueLengthProvider.cs

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@
1616

1717
public class QueueLengthProvider : IProvideQueueLength
1818
{
19-
ConcurrentDictionary<EndpointInputQueue, CloudQueue> queues = new ConcurrentDictionary<EndpointInputQueue, CloudQueue>();
20-
ConcurrentDictionary<CloudQueue, int> sizes = new ConcurrentDictionary<CloudQueue, int>();
21-
ConcurrentDictionary<CloudQueue, CloudQueue> problematicQueues = new ConcurrentDictionary<CloudQueue, CloudQueue>();
19+
ConcurrentDictionary<EndpointInputQueue, QueueLengthValue> queueLengths = new ConcurrentDictionary<EndpointInputQueue, QueueLengthValue>();
20+
ConcurrentDictionary<string, string> problematicQueuesNames = new ConcurrentDictionary<string, string>();
2221

2322
string connectionString;
2423
QueueLengthStore store;
@@ -38,19 +37,15 @@ public void Process(EndpointInstanceId endpointInstanceId, EndpointMetadataRepor
3837
var queueName = QueueNameSanitizer.Sanitize(metadataReport.LocalAddress);
3938

4039
var queueClient = CloudStorageAccount.Parse(connectionString).CreateCloudQueueClient();
41-
var queue = queueClient.GetQueueReference(queueName);
4240

43-
queues.AddOrUpdate(endpointInputQueue, _ => queue, (_, currentQueue) =>
41+
var emptyQueueLength = new QueueLengthValue
4442
{
45-
if (currentQueue.Name.Equals(queue.Name) == false)
46-
{
47-
sizes.TryRemove(currentQueue, out var _);
48-
}
49-
50-
return queue;
51-
});
43+
QueueName = queueName,
44+
Length = 0,
45+
QueueReference = queueClient.GetQueueReference(queueName)
46+
};
5247

53-
sizes.TryAdd(queue, 0);
48+
queueLengths.AddOrUpdate(endpointInputQueue, _ => emptyQueueLength, (_, existingQueueLength) => existingQueueLength);
5449
}
5550

5651
public void Process(EndpointInstanceId endpointInstanceId, TaggedLongValueOccurrence metricsReport)
@@ -100,28 +95,31 @@ void UpdateQueueLengthStore()
10095
{
10196
var nowTicks = DateTime.UtcNow.Ticks;
10297

103-
foreach (var tableNamePair in queues)
98+
foreach (var endpointQueueLengthPair in queueLengths)
10499
{
105-
store.Store(
106-
new[]{ new RawMessage.Entry
107-
{
108-
DateTicks = nowTicks,
109-
Value = sizes.TryGetValue(tableNamePair.Value, out var size) ? size : 0
110-
}},
111-
tableNamePair.Key);
100+
var queueLengthEntry = new RawMessage.Entry
101+
{
102+
DateTicks = nowTicks,
103+
Value = endpointQueueLengthPair.Value.Length
104+
};
105+
106+
store.Store(new[]{ queueLengthEntry }, endpointQueueLengthPair.Key);
112107
}
113108
}
114109

115-
Task FetchQueueSizes(CancellationToken token) => Task.WhenAll(sizes.Select(kvp => FetchLength(kvp.Key, token)));
110+
Task FetchQueueSizes(CancellationToken token) => Task.WhenAll(queueLengths.Select(kvp => FetchLength(kvp.Value, token)));
116111

117-
async Task FetchLength(CloudQueue queue, CancellationToken token)
112+
async Task FetchLength(QueueLengthValue queueLength, CancellationToken token)
118113
{
119114
try
120115
{
121-
await queue.FetchAttributesAsync(token).ConfigureAwait(false);
122-
sizes[queue] = queue.ApproximateMessageCount.GetValueOrDefault();
116+
var queueReference = queueLength.QueueReference;
117+
118+
await queueReference.FetchAttributesAsync(token).ConfigureAwait(false);
119+
120+
queueLength.Length = queueReference.ApproximateMessageCount.GetValueOrDefault();
123121

124-
problematicQueues.TryRemove(queue, out _);
122+
problematicQueuesNames.TryRemove(queueLength.QueueName, out _);
125123
}
126124
catch (OperationCanceledException)
127125
{
@@ -130,13 +128,20 @@ async Task FetchLength(CloudQueue queue, CancellationToken token)
130128
catch (Exception ex)
131129
{
132130
// simple "log once" approach to do not flood logs
133-
if (problematicQueues.TryAdd(queue, queue))
131+
if (problematicQueuesNames.TryAdd(queueLength.QueueName, queueLength.QueueName))
134132
{
135-
Logger.Error($"Obtaining Azure Storage Queue count failed for '{queue.Name}'", ex);
133+
Logger.Error($"Obtaining Azure Storage Queue count failed for '{queueLength.QueueName}'", ex);
136134
}
137135
}
138136
}
139137

138+
class QueueLengthValue
139+
{
140+
public string QueueName;
141+
public volatile int Length;
142+
public CloudQueue QueueReference;
143+
}
144+
140145
static TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200);
141146
static ILog Logger = LogManager.GetLogger<QueueLengthProvider>();
142147
}

0 commit comments

Comments
 (0)