|
10 | 10 | namespace Microsoft.OneFuzz.Service; |
11 | 11 | public interface IQueue { |
12 | 12 | Async.Task SendMessage(string name, string message, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null); |
13 | | - Async.Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null); |
| 13 | + Async.Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null, JsonSerializerOptions? serializerOptions = null); |
14 | 14 | Task<Uri> GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration = null); |
15 | 15 | ResourceIdentifier GetResourceId(string queueName, StorageType storageType); |
16 | 16 | Task<IList<T>> PeekQueue<T>(string name, StorageType storageType); |
@@ -57,23 +57,40 @@ public async Task<QueueClient> GetQueueClient(string name, StorageType storageTy |
57 | 57 | public Task<QueueServiceClient> GetQueueClientService(StorageType storageType) |
58 | 58 | => _storage.GetQueueServiceClientForAccount(_storage.GetPrimaryAccount(storageType)); |
59 | 59 |
|
60 | | - public async Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) { |
| 60 | + public async Task<bool> QueueObject<T>(string name, T obj, StorageType storageType, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null, JsonSerializerOptions? serializerOptions = null) { |
61 | 61 | var queueClient = await GetQueueClient(name, storageType); |
| 62 | + serializerOptions ??= EntityConverter.GetJsonSerializerOptions(); |
62 | 63 | try { |
63 | | - var serialized = JsonSerializer.Serialize(obj, EntityConverter.GetJsonSerializerOptions()); |
64 | | - var res = await queueClient.SendMessageAsync(serialized, visibilityTimeout: visibilityTimeout, timeToLive); |
65 | | - if (res.GetRawResponse().IsError) { |
66 | | - _log.Error($"Failed to send {serialized:Tag:Message} in {name:Tag:QueueName} due to {res.GetRawResponse().ReasonPhrase:Tag:Error}"); |
67 | | - return false; |
68 | | - } else { |
69 | | - return true; |
70 | | - } |
| 64 | + return await QueueObjectInternal(obj, queueClient, serializerOptions, visibilityTimeout, timeToLive); |
71 | 65 | } catch (Exception ex) { |
72 | 66 | _log.Exception(ex, $"Failed to queue message in {name:Tag:QueueName}"); |
| 67 | + if (IsMessageTooLargeException(ex) && |
| 68 | + obj is ITruncatable<T> truncatable) { |
| 69 | + obj = truncatable.Truncate(1000); |
| 70 | + try { |
| 71 | + return await QueueObjectInternal(obj, queueClient, serializerOptions, visibilityTimeout, timeToLive); |
| 72 | + } catch (Exception ex2) { |
| 73 | + _log.Exception(ex2, $"Failed to queue message in {name:Tag:QueueName} after truncation"); |
| 74 | + } |
| 75 | + } |
73 | 76 | return false; |
74 | 77 | } |
75 | 78 | } |
76 | 79 |
|
| 80 | + private async Task<bool> QueueObjectInternal<T>(T obj, QueueClient queueClient, JsonSerializerOptions serializerOptions, TimeSpan? visibilityTimeout = null, TimeSpan? timeToLive = null) { |
| 81 | + var serialized = JsonSerializer.Serialize(obj, serializerOptions); |
| 82 | + var res = await queueClient.SendMessageAsync(serialized, visibilityTimeout: visibilityTimeout, timeToLive); |
| 83 | + if (res.GetRawResponse().IsError) { |
| 84 | + _log.Error($"Failed to send {serialized:Tag:Message} in {queueClient.Name:Tag:QueueName} due to {res.GetRawResponse().ReasonPhrase:Tag:Error}"); |
| 85 | + return false; |
| 86 | + } else { |
| 87 | + return true; |
| 88 | + } |
| 89 | + } |
| 90 | + |
| 91 | + private static bool IsMessageTooLargeException(Exception ex) => |
| 92 | + ex is RequestFailedException rfe && rfe.Message.Contains("The request body is too large"); |
| 93 | + |
77 | 94 | public async Task<Uri> GetQueueSas(string name, StorageType storageType, QueueSasPermissions permissions, TimeSpan? duration) { |
78 | 95 | var queueClient = await GetQueueClient(name, storageType); |
79 | 96 | var now = DateTimeOffset.UtcNow; |
|
0 commit comments