Skip to content

Commit a0689c3

Browse files
committed
fix a reentrancy problem, add a TestClient to run integration tests, bug fixes
1 parent 77e78ab commit a0689c3

File tree

17 files changed

+418
-67
lines changed

17 files changed

+418
-67
lines changed

src/03-GrainInterfaces/GrainInterfaces/Inventories/IInventories.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ namespace GrainInterfaces.Inventories
88
public interface IInventories : Orleans.IGrainWithGuidKey
99
{
1010
Task<Inventory[]> GetAll();
11-
Task<Guid> GetBestForProduct(Guid productGuid);
12-
Task<Inventory> Get(Guid warehouseCode);
13-
Task<bool> Exists(Guid warehouseCode);
11+
Task<bool> Exists(Guid inventoryGuid);
1412
Task<Inventory> Add(Inventory inventory);
13+
Task<Guid> GetBestForProduct(Guid productGuid);
14+
Task<Inventory> GetFromWarehouse(Guid warehouseCode);
15+
Task<bool> ExistsWarehouse(Guid warehouseCode);
1516
}
1617
}

src/03-GrainInterfaces/GrainInterfaces/Inventories/ProductStock.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public decimal SupplyingRequiredQuantity
6666
{
6767
return 0;
6868
}
69-
return SafetyStockQuantity + BookedQuantity - CurrentStockQuantity;
69+
return BookedQuantity > 0 ? SafetyStockQuantity + BookedQuantity : 0;
7070
}
7171
}
7272
}

src/03-GrainInterfaces/GrainInterfaces/Orders/IOrders.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ namespace GrainInterfaces.Orders
88
public interface IOrders : Orleans.IGrainWithGuidKey
99
{
1010
Task<Order[]> GetAll();
11-
Task<Order[]> GetAllNotDispatched();
11+
Task<Guid[]> GetAllNotDispatched();
1212
Task SetAsDispatched(Guid orderGuid);
1313
Task<bool> Exists(Guid id);
14-
Task<Order> Add(Order product);
14+
Task<Order> Add(Order order);
1515
}
1616
}

src/03-GrainInterfaces/GrainInterfaces/Orders/Order.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ public class Order
2121
public List<OrderItem> Items;
2222
[ProtoMember(6)]
2323
public bool Dispatched;
24+
[ProtoMember(7)]
25+
public Guid AssignedInventory = Guid.Empty;
26+
2427
}
2528

2629
[ProtoContract]

src/04-Grains/Grains/Inventories/InventoriesGrain.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public InventoriesGrain(ILogger<InventoriesGrain> logger)
2727
_logger = logger;
2828
}
2929

30-
async Task<Inventory[]> IInventories.GetAll()
30+
public async Task<Inventory[]> GetAll()
3131
{
3232
var inventories = new List<Task<Inventory>>();
3333
foreach (var kvp in this.State.Inventories)
@@ -38,7 +38,7 @@ async Task<Inventory[]> IInventories.GetAll()
3838
return await Task.WhenAll(inventories);
3939
}
4040

41-
async Task<Guid> IInventories.GetBestForProduct(Guid productGuid)
41+
public async Task<Guid> GetBestForProduct(Guid productGuid)
4242
{
4343
(Guid inventoryGuid, decimal qty) result = (Guid.Empty, decimal.MinValue);
4444

@@ -56,7 +56,7 @@ async Task<Guid> IInventories.GetBestForProduct(Guid productGuid)
5656
return result.inventoryGuid;
5757
}
5858

59-
async Task<Inventory> IInventories.Add(Inventory data)
59+
public async Task<Inventory> Add(Inventory data)
6060
{
6161
data.Id = Guid.NewGuid();
6262
data.CreationDate = DateTimeOffset.Now;
@@ -68,14 +68,21 @@ async Task<Inventory> IInventories.Add(Inventory data)
6868
return result;
6969
}
7070

71-
Task<bool> IInventories.Exists(Guid warehouseGuid)
71+
public Task<bool> Exists(Guid inventoryGuid)
72+
{
73+
var result = State.Inventories.Any(x => x.Key == inventoryGuid);
74+
_logger.LogInformation($"Inventory with id {inventoryGuid} exists => {result}");
75+
return Task.FromResult(result);
76+
}
77+
78+
public Task<bool> ExistsWarehouse(Guid warehouseGuid)
7279
{
7380
var result = State.Inventories.Values.Any(x => x == warehouseGuid);
7481
_logger.LogInformation($"Inventory with warehouse id {warehouseGuid} exists => {result}");
7582
return Task.FromResult(result);
7683
}
7784

78-
async Task<Inventory> IInventories.Get(Guid warehouseGuid)
85+
public async Task<Inventory> GetFromWarehouse(Guid warehouseGuid)
7986
{
8087
var id = State.Inventories.First(x => x.Value == warehouseGuid).Key;
8188
var g = GrainFactory.GetGrain<IInventory>(id);

src/04-Grains/Grains/Orders/OrderGrain.cs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,29 @@ public async Task<Order> Create(Order order)
3232
order.TotalAmount = order.Items.Sum(item => item.Quantity * item.Product.Price);
3333

3434
State = order;
35-
36-
await this.TryDispatch(true);
35+
await base.WriteStateAsync();
3736
return State;
3837
}
3938

4039
public async Task<Order> TryDispatch(bool isNewOrder)
4140
{
42-
// find best inventory
41+
_logger.Info($"Order try to dispatch - Id: {State.Id} New: {isNewOrder}");
42+
4343
var gis = GrainFactory.GetGrain<IInventories>(Guid.Empty);
44-
var inventoryGuid = await gis.GetBestForProduct(State.Items.First().ProductId);
45-
var gi = GrainFactory.GetGrain<IInventory>(inventoryGuid);
44+
45+
if (!await gis.Exists(State.AssignedInventory))
46+
{
47+
State.AssignedInventory = Guid.Empty;
48+
}
49+
50+
if (State.AssignedInventory == Guid.Empty)
51+
{
52+
// find best inventory and store it, booked quantity is for inventory
53+
var inventoryGuid = await gis.GetBestForProduct(State.Items.First().ProductId);
54+
State.AssignedInventory = inventoryGuid;
55+
}
56+
57+
var gi = GrainFactory.GetGrain<IInventory>(State.AssignedInventory);
4658

4759
// try to dispatch all items, otherwise set order as "not processable"
4860
var isOrderProcessable = true;
@@ -74,7 +86,6 @@ public async Task<Order> TryDispatch(bool isNewOrder)
7486
{
7587
var produckStockRemaining = await gi.Deduct(item.ProductId, item.Quantity);
7688
var isItemProcessable = (produckStockRemaining >= 0);
77-
if (!isItemProcessable) { }
7889
isOrderProcessable &= isItemProcessable;
7990
}
8091
}
@@ -91,6 +102,9 @@ public async Task<Order> TryDispatch(bool isNewOrder)
91102
_logger.Info($"Order {State.Id} set as dispatched");
92103
var gorders = GrainFactory.GetGrain<IOrders>(Guid.Empty);
93104
await gorders.SetAsDispatched(State.Id);
105+
} else
106+
{
107+
_logger.Info($"Order not processable - Id: {State.Id} New: {isNewOrder}");
94108
}
95109
await base.WriteStateAsync();
96110
return State;

src/04-Grains/Grains/Orders/OrdersGrain.cs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public OrdersGrain(ILogger<OrdersGrain> logger)
2626
_logger = logger;
2727
}
2828

29-
async Task<Order[]> IOrders.GetAll()
29+
public async Task<Order[]> GetAll()
3030
{
3131
var orders = new List<Task<Order>>();
3232
foreach (var id in this.State.Orders)
@@ -37,47 +37,41 @@ async Task<Order[]> IOrders.GetAll()
3737
return await Task.WhenAll(orders);
3838
}
3939

40-
async Task<Order[]> IOrders.GetAllNotDispatched()
40+
public Task<Guid[]> GetAllNotDispatched()
4141
{
42-
var orders = new List<Task<Order>>();
43-
foreach (var id in this.State.OrdersNotDispatched)
44-
{
45-
var order = GrainFactory.GetGrain<IOrder>(id);
46-
orders.Add(order.GetState());
47-
}
48-
return await Task.WhenAll(orders);
42+
return Task.FromResult(this.State.OrdersNotDispatched.ToArray());
4943
}
5044

51-
async Task IOrders.SetAsDispatched(Guid orderGuid)
45+
public async Task SetAsDispatched(Guid orderGuid)
5246
{
53-
if(State.OrdersNotDispatched.Contains(orderGuid))
47+
_logger.LogInformation($"Order set as dispatched and removed from list => Id: {orderGuid}");
48+
49+
if (State.OrdersNotDispatched.Contains(orderGuid))
5450
{
5551
State.OrdersNotDispatched.Remove(orderGuid);
5652
await base.WriteStateAsync();
5753
}
5854
}
5955

60-
async Task<Order> IOrders.Add(Order info)
56+
public async Task<Order> Add(Order info)
6157
{
6258
info.Id = Guid.NewGuid();
6359
info.Date = DateTimeOffset.Now;
6460
var g = GrainFactory.GetGrain<IOrder>(info.Id);
6561
var order = await g.Create(info);
66-
_logger.LogInformation($"Order created => Id: {order.Id} Dispatched: {order.Dispatched}");
62+
_logger.LogInformation($"Order created => Id: {order.Id} Name: {order.Name} Dispatched: {order.Dispatched}");
6763

68-
if (!order.Dispatched)
69-
{
70-
// add order in the orders-pending list
71-
State.OrdersNotDispatched.Add(order.Id);
72-
}
7364

65+
State.OrdersNotDispatched.Add(order.Id);
7466
State.Orders.Add(order.Id);
75-
7667
await base.WriteStateAsync();
68+
69+
order = await g.TryDispatch(true);
70+
7771
return order;
7872
}
7973

80-
Task<bool> IOrders.Exists(Guid id)
74+
public Task<bool> Exists(Guid id)
8175
{
8276
var result = State.Orders.Contains(id);
8377
_logger.LogInformation($"Order exists {id} => {result}");

src/04-Grains/Grains/Scheduled/OrderScheduledProcessingGrain.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using Orleans.Runtime;
77
using System;
88
using System.Collections.Generic;
9+
using System.Linq;
910
using System.Threading.Tasks;
1011

1112
namespace OrleansSilo.Scheduled
@@ -47,11 +48,12 @@ async Task IRemindable.ReceiveReminder(string reminderName, TickStatus status)
4748
var tasks = new List<Task<Order>>();
4849
var g = GrainFactory.GetGrain<IOrders>(Guid.Empty);
4950
var orders = await g.GetAllNotDispatched();
50-
foreach (var order in orders)
51-
{
52-
var orderGuid = order.Id;
5351

54-
_logger.Info($"Order try-to-dispatch required for order {orderGuid}");
52+
var i = 0;
53+
foreach (var orderGuid in orders)
54+
{
55+
i++;
56+
_logger.Info($"Order try-to-dispatch required for order {orderGuid} {i}/{orders.Length}");
5557

5658
var go = GrainFactory.GetGrain<IOrder>(orderGuid);
5759
var tryTask = go.TryDispatch(false);

src/06-Frontends/Client/Program.cs

Lines changed: 74 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
11
using GrainInterfaces;
2+
using GrainInterfaces.Orders;
23
using GrainInterfaces.Products;
4+
using GrainInterfaces.Scheduled;
5+
using GrainInterfaces.Serialization.ProtobufNet;
6+
using GrainInterfaces.Warehouses;
37
using Microsoft.Extensions.Logging;
48
using Orleans;
59
using Orleans.Configuration;
10+
using ProtoBuf.Meta;
611
using System;
12+
using System.Collections.Async;
13+
using System.Collections.Concurrent;
14+
using System.Collections.Generic;
15+
using System.Diagnostics;
16+
using System.Linq;
17+
using System.Reflection;
718
using System.Threading.Tasks;
19+
using TestClient;
820

921
namespace OrleansSilo
1022
{
@@ -21,7 +33,8 @@ private static async Task<int> RunMainAsync()
2133
{
2234
using (var client = await ConnectClient())
2335
{
24-
await DoClientWork(client);
36+
// await RunClientTests(client);
37+
await RunClientTestsSetup(client);
2538
Console.ReadKey();
2639
}
2740

@@ -42,25 +55,78 @@ private static async Task<IClusterClient> ConnectClient()
4255
IClusterClient client;
4356
client = new ClientBuilder()
4457
.UseLocalhostClustering()
58+
.Configure<SerializationProviderOptions>(_ =>
59+
{
60+
_.SerializationProviders.Add(typeof(Orleans.Serialization.ProtobufNet.ProtobufNetSerializer).GetTypeInfo());
61+
RuntimeTypeModel.Default.Add(typeof(DateTimeOffset), false).SetSurrogate(typeof(DateTimeOffsetSurrogate));
62+
})
4563
.Configure<ClusterOptions>(options =>
4664
{
4765
options.ClusterId = "dev";
48-
options.ServiceId = "OrleansBasics";
66+
options.ServiceId = "TestInventoryService";
4967
})
68+
.Configure<ProcessExitHandlingOptions>(options => options.FastKillOnProcessExit = false)
5069
.ConfigureLogging(logging => logging.AddConsole())
5170
.Build();
5271

5372
await client.Connect();
54-
Console.WriteLine("Client successfully connected to silo host \n");
73+
Console.WriteLine("Client successfully connected to silo host");
5574
return client;
5675
}
5776

58-
private static async Task DoClientWork(IClusterClient client)
77+
private static async Task StartReminders(IClusterClient client)
78+
{
79+
// startup the global reminders
80+
{
81+
var g = client.GetGrain<IInventoryAutoSupplying>(Guid.Empty);
82+
await g.Start();
83+
}
84+
{
85+
var g = client.GetGrain<IOrderScheduledProcessing>(Guid.Empty);
86+
await g.Start();
87+
}
88+
}
89+
90+
private static async Task RunClientTests(IClusterClient client)
5991
{
60-
// example of calling grains from the initialized client
61-
var products = client.GetGrain<IProducts>(Guid.Empty);
62-
var response = await products.GetAll();
63-
Console.WriteLine("\n\n find {0} products\n\n", response.Length);
92+
await Test_Products.GetAll(client);
93+
await Test_Products.AddBatch(client, 100, 100);
94+
95+
await Test_Orders.WaitForAllDispatched(client);
96+
await Test_Orders.AddBatch(client, 100, 10);
97+
await Test_Orders.WaitForAllDispatched(client);
98+
}
99+
100+
private static async Task RunClientTestsSetup(IClusterClient client)
101+
{
102+
int numWarehouses = 2;
103+
int numProducts = 100;
104+
int numOrders = 1000;
105+
106+
var warehouses = await Test_Warehouses.GetAll(client);
107+
for(int n = warehouses.Length; n < numWarehouses; n++)
108+
{
109+
var name = (n + 1).ToString("000");
110+
await Test_Warehouses.Add(client, name);
111+
}
112+
113+
var products = await Test_Products.GetAll(client);
114+
for (int n = products.Length; n < numProducts; n++)
115+
{
116+
var name = (n + 1).ToString("000");
117+
await Test_Products.Add(client, name);
118+
}
119+
120+
// await Test_Orders.WaitForAllDispatched(client);
121+
var orders = await Test_Orders.GetAll(client);
122+
if (orders.Length < numOrders)
123+
{
124+
await Test_Orders.AddBatch(client, numOrders - orders.Length, 100);
125+
126+
await StartReminders(client);
127+
128+
await Test_Orders.WaitForAllDispatched(client);
129+
}
64130
}
65131
}
66132
}

src/06-Frontends/Client/Client.csproj renamed to src/06-Frontends/Client/TestClient.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
</PropertyGroup>
77

88
<ItemGroup>
9+
<PackageReference Include="AsyncEnumerator" Version="2.2.2" />
910
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.2.0" />
1011
<PackageReference Include="Microsoft.Orleans.Client" Version="2.3.1" />
1112
<PackageReference Include="Microsoft.Orleans.ProtobufNet" Version="2.3.1" />

0 commit comments

Comments
 (0)