Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V3/feature/88 #132

Merged
merged 4 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions src/Andy.X.App/Controllers/ClustersController.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
using Buildersoft.Andy.X.Core.Abstractions.Services.Clusters;
using Buildersoft.Andy.X.Core.Abstractions.Repositories.Clusters;
using Buildersoft.Andy.X.Core.Abstractions.Services.Clusters;
using Buildersoft.Andy.X.Extensions;
using Buildersoft.Andy.X.Model.Clusters;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using System.Net.Mime;
using System.Security.Claims;

namespace Buildersoft.Andy.X.Controllers
{
Expand All @@ -19,11 +19,15 @@ public class ClustersController : ControllerBase
{
private readonly ILogger<ClustersController> _logger;
private readonly IClusterService _tenantService;
private readonly IClusterRepository _clusterRepository;

public ClustersController(ILogger<ClustersController> logger, IClusterService tenantService)
public ClustersController(ILogger<ClustersController> logger,
IClusterService tenantService,
IClusterRepository clusterRepository)
{
_logger = logger;
_tenantService = tenantService;
_clusterRepository = clusterRepository;
}

[ProducesResponseType(StatusCodes.Status200OK)]
Expand All @@ -45,5 +49,25 @@ public ActionResult<Cluster> GetClusters()

return Ok(clusters);
}

[ProducesResponseType(StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status404NotFound)]
[HttpGet("nodes/current")]
[Authorize(Roles = "admin,readonly")]
public ActionResult<Replica> GetCurrentNode()
{
_logger.LogApiCallFrom(HttpContext);


var isFromCli = HttpContext.Request.Headers["x-called-by"].ToString();
if (isFromCli != "")
_logger.LogInformation($"{isFromCli} GET '{HttpContext.Request.Path}' is called");
else
_logger.LogInformation($"GET '{HttpContext.Request.Path}' is called");

var clusters = _clusterRepository.GetCurrentReplica();

return Ok(clusters);
}
}
}
7 changes: 3 additions & 4 deletions src/Andy.X.App/Controllers/ComponentsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ public ActionResult<string> CreateComponent(string tenant, string product, strin
if (componentDetails is not null)
return BadRequest($"Component already exists");

var isCreated = _coreService.CreateComponent(tenant, product, component, description,
var isCreated = _coreService.CreateComponent(tenant, product, component, description,
componentSettings.IsTopicAutomaticCreationAllowed, componentSettings.EnforceSchemaValidation,
componentSettings.IsAuthorizationEnabled, componentSettings.IsSubscriptionAutomaticCreationAllowed,
componentSettings.IsProducerAutomaticCreationAllowed);

if (isCreated == true)
{
_tenantStateService.AddComponent(tenant, product, component, _tenantFactory.CreateComponent(component, description, componentSettings));
_tenantStateService.AddComponent(tenant, product, component, _tenantFactory.CreateComponent(component, description, componentSettings), false);
return Ok("Component has been created");
}

Expand Down Expand Up @@ -224,11 +224,10 @@ public ActionResult<string> UpdateComponentSettings(string tenant, string produc
if (componentDetails is null)
return NotFound($"Component {component} does not exists in {tenant}/{product}");

var isUpdated = _coreService.UpdateComponentSettings(tenant, product, component, componentSettings.IsTopicAutomaticCreationAllowed, componentSettings.EnforceSchemaValidation, componentSettings.IsAuthorizationEnabled, componentSettings.IsSubscriptionAutomaticCreationAllowed);
var isUpdated = _coreService.UpdateComponentSettings(tenant, product, component, componentSettings.IsTopicAutomaticCreationAllowed, componentSettings.EnforceSchemaValidation, componentSettings.IsAuthorizationEnabled, componentSettings.IsSubscriptionAutomaticCreationAllowed, componentSettings.IsProducerAutomaticCreationAllowed);
if (isUpdated == true)
return Ok("Component settings have been updated, product in the tenant is marked to refresh settings, this may take a while");


return BadRequest("Something went wrong, component settings couldnot be updated");
}

Expand Down
2 changes: 1 addition & 1 deletion src/Andy.X.App/Controllers/ProductsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public ActionResult<string> CreateProduct(string tenant, string product, [FromQu
var isCreated = _coreService.CreateProduct(tenant, product, description, productSettings.IsAuthorizationEnabled);
if (isCreated == true)
{
_tenantStateService.AddProduct(tenant, product, _tenantFactory.CreateProduct(product, description));
_tenantStateService.AddProduct(tenant, product, _tenantFactory.CreateProduct(product, description), false);
return Ok("Product has been created");
}

Expand Down
2 changes: 1 addition & 1 deletion src/Andy.X.App/Controllers/TopicsController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public ActionResult<string> CreateTopic(string tenant, string product, string co
var isCreated = _coreService.CreateTopic(tenant, product, component, topic, description, topicSettings);
if (isCreated == true)
{
_tenantStateService.AddTopic(tenant, product, component, topic, _tenantFactory.CreateTopic(topic, description));
_tenantStateService.AddTopic(tenant, product, component, topic, _tenantFactory.CreateTopic(topic, description), false);
return Ok($"Topic {topic} has been created");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Buildersoft.Andy.X.Core.Abstractions.Services.Clusters;
using Buildersoft.Andy.X.Core.Repositories;
using Buildersoft.Andy.X.Core.Services.Clusters;
using Buildersoft.Andy.X.Core.Services.Outbound;
using Buildersoft.Andy.X.Router.Repositories.Clusters;
using Buildersoft.Andy.X.Router.Services.Clusters;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -25,5 +26,10 @@ public static void AddClusterHubService(this IServiceCollection services)
{
services.AddSingleton<IClusterHubService, ClusterHubService>();
}

public static void AddClusterOutboundService(this IServiceCollection services)
{
services.AddSingleton<OutboundClusterMessageService>();
}
}
}
34 changes: 4 additions & 30 deletions src/Andy.X.App/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,6 @@ public void ConfigureServices(IServiceCollection services)
services.AddAuthentication("Andy.X_Authorization")
.AddScheme<AuthenticationSchemeOptions, BasicAuthenticationHandler>("Andy.X_Authorization", null);

//services.AddAuthorization(options =>
//{
// options.AddPolicy("readonly", policy =>
// policy.RequireClaim("readonly"));
// options.AddPolicy("admin", policy =>
// policy.RequireClaim("admin"));
//});

// Persistency Core State
services.AddCoreRepository();
services.AddCoreService();
Expand All @@ -115,6 +107,7 @@ public void ConfigureServices(IServiceCollection services)
services.AddClusterRepository();
services.AddClusterHubService();
services.AddClusterService();
services.AddClusterOutboundService();

services.AddAppFactoryMethods();
services.AddProducerFactoryMethods();
Expand Down Expand Up @@ -152,33 +145,13 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IService
app.UseApplicationService(serviceProvider);
app.UseTenantMemoryRepository(serviceProvider);

// This part is not needed. keep it for some time when we will add support for haproxy.
//app.Use(async (context, next) =>
//{
// var host = context.Request.Headers["Host"];
// var userAgent = context.Request.Headers["User-Agent"];
// var realIP = context.Request.Headers["X-Real-IP"];
// var forwardeds = context.Request.Headers["X-Forwarded-For"];
// var connectedInfo = new Dictionary<string, string>()
// {
// {"Host", host},
// {"UserAgent", userAgent},
// {"Real-IP", realIP},
// {"Forward-For", forwardeds},
// };
// await next.Invoke();
//});


app.UseHttpsRedirection();
app.UseRouting();
app.UseAuthentication();
app.UseAuthorization();


var transportConfiguration = serviceProvider.GetRequiredService<TransportConfiguration>();


app.UseEndpoints(endpoints =>
{
// Mapping Rest endpoints
Expand All @@ -192,13 +165,14 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IService
opt.TransportMaxBufferSize = transportConfiguration.TransportMaxBufferSizeInBytes;
});


// Mapping SignalR Hubs
// Mapping SignalR Hub for Producer
endpoints.MapHub<ProducerHub>("/realtime/v3/producer", opt =>
{
opt.ApplicationMaxBufferSize = transportConfiguration.ApplicationMaxBufferSizeInBytes;
opt.TransportMaxBufferSize = transportConfiguration.TransportMaxBufferSizeInBytes;
});

// Mapping SignalR Hub for Consumer
endpoints.MapHub<ConsumerHub>("/realtime/v3/consumer", opt =>
{
opt.ApplicationMaxBufferSize = transportConfiguration.ApplicationMaxBufferSizeInBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
using Buildersoft.Andy.X.Model.App.Products;
using Buildersoft.Andy.X.Model.App.Tenants;
using Buildersoft.Andy.X.Model.App.Topics;
using System;
using System.Collections.Generic;

namespace Buildersoft.Andy.X.Core.Abstractions.Factories.Tenants
{
Expand Down
35 changes: 29 additions & 6 deletions src/Andy.X.Core/Abstractions/Hubs/Clusters/IClusterHub.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Buildersoft.Andy.X.Model.Clusters.Events;
using Buildersoft.Andy.X.Model.Consumers.Events;
using Buildersoft.Andy.X.Model.Producers.Events;
using Buildersoft.Andy.X.Model.Entities.Clusters;
using System.Threading.Tasks;

namespace Buildersoft.Andy.X.Core.Abstractions.Hubs.Clusters
Expand All @@ -19,29 +18,50 @@ public interface IClusterHub
Task TenantUpdatedAsync(TenantUpdatedArgs tenantUpdatedArgs);
Task TenantDeletedAsync(TenantDeletedArgs tenantDeletedArgs);

Task TenantRetentionCreatedAsync(TenantRetentionCreatedArgs tenantRetentionArgs);
Task TenantRetentionUpdatedAsync(TenantRetentionUpdatedArgs tenantRetentionArgs);
Task TenantRetentionDeletedAsync(TenantRetentionDeletedArgs tenantRetentionArgs);

Task ProductCreatedAsync(ProductCreatedArgs productCreatedArgs);
Task ProductUpdatedAsync(ProductUpdatedArgs productUpdatedArgs);
Task ProductDeletedAsync(ProductDeletedArgs productDeletedArgs);

Task ProductRetentionCreatedAsync(ProductRetentionCreatedArgs productRetentionArgs);
Task ProductRetentionUpdatedAsync(ProductRetentionUpdatedArgs productRetentionArgs);
Task ProductRetentionDeletedAsync(ProductRetentionDeletedArgs productRetentionArgs);

Task ComponentCreatedAsync(ComponentCreatedArgs componentCreatedArgs);
Task ComponentUpdatedAsync(ComponentUpdatedArgs componentUpdatedArgs);
Task ComponentDeletedAsync(ComponentDeletedArgs componentDeletedArgs);

Task ComponentRetentionCreatedAsync(ComponentRetentionCreatedArgs componentRetentionArgs);
Task ComponentRetentionUpdatedAsync(ComponentRetentionUpdatedArgs componentRetentionArgs);
Task ComponentRetentionDeletedAsync(ComponentRetentionDeletedArgs componentRetentionArgs);

Task TopicCreatedAsync(TopicCreatedArgs topicCreatedArgs);
Task TopicUpdatedAsync(TopicUpdatedArgs topicUpdatedArgs);
Task TopicDeletedAsync(TopicDeletedArgs topicDeletedArgs);

Task TenantTokenCreatedAsync(TokenCreatedArgs tokenCreatedArgs);
Task TenantTokenRevokedAsync(TokenRevokedArgs tokenRevokedArgs);
Task ComponentTokenCreatedAsync(TokenCreatedArgs tokenCreatedArgs);
Task ComponentTokenRevokedAsync(TokenRevokedArgs tokenRevokedArgs);
Task TenantTokenCreatedAsync(TenantTokenCreatedArgs tokenCreatedArgs);
Task TenantTokenRevokedAsync(TenantTokenRevokedArgs tokenRevokedArgs);
Task TenantTokenDeletedAsync(TenantTokenDeletedArgs tokenDeletedArgs);

Task ProductTokenCreatedAsync(ProductTokenCreatedArgs productTokenCreatedArgs);
Task ProductTokenRevokedAsync(ProductTokenRevokedArgs productTokenRevokedArgs);
Task ProductTokenDeletedAsync(ProductTokenDeletedArgs productTokenDeletedArgs);

Task ComponentTokenCreatedAsync(ComponentTokenCreatedArgs tokenCreatedArgs);
Task ComponentTokenRevokedAsync(ComponentTokenRevokedArgs tokenRevokedArgs);
Task ComponentTokenDeletedAsync(ComponentTokenDeletedArgs tokenDeletedArgs);

Task SubscriptionCreatedAsync(SubscriptionCreatedArgs subscriptionCreatedArgs);
Task SubscriptionUpdatedAsync(SubscriptionUpdatedArgs subscriptionUpdatedArgs);
Task SubscriptionDeletedAsync(SubscriptionDeletedArgs subscriptionDeletedArgs);

// will all nodes
Task ProducerConnectedAsync(ProducerConnectedArgs producerConnectedArgs);
Task ProducerCreatedAsync(ProducerCreatedArgs producerCreatedArgs);
Task ProducerDeletedAsync(ProducerDeletedArgs producerDeletedArgs);
Task ProducerDisconnectedAsync(ProducerDisconnectedArgs producerDisconnectedArgs);

Task ConsumerConnectedAsync(ConsumerConnectedArgs consumerConnectedArgs);
Expand All @@ -51,5 +71,8 @@ public interface IClusterHub
// only with replicas
Task SubscriptionPositionUpdatedAsync(SubscriptionPositionUpdatedArgs subscriptionPositionUpdatedArgs);
Task CurrentEntryPositionUpdatedAsync(CurrentEntryPositionUpdatedArgs currentEntryPositionUpdatedArgs);

// Distributed Clusters, communication between main nodes
Task SendMessageToMainShardAsync(ClusterChangeLog clusterChangeLog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ public interface IProducerHub
Task ProducerConnected(ProducerConnectedDetails producerConnectedDetails);
Task ProducerDisconnected(ProducerDisconnectedDetails producerDisconnectedDetails);

Task MessageStored(object messageStoredDetails);
Task MessageAccepted(MessageAcceptedDetails messageAcceptedDetails);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using Buildersoft.Andy.X.Core.Abstractions.Services.Data;
using Buildersoft.Andy.X.Model.App.Topics;
using Buildersoft.Andy.X.Model.Clusters;
using Buildersoft.Andy.X.Model.Entities.Clusters;
using Buildersoft.Andy.X.Model.Entities.Storages;
using System.Collections.Concurrent;

namespace Buildersoft.Andy.X.Core.Abstractions.Orchestrators
{
Expand All @@ -22,5 +25,10 @@ public interface IOrchestratorService
// Read Subscription Unacknowledged Message to RocksDb
ITopicReadonlyDataService<UnacknowledgedMessage> GetSubscriptionUnackedReadonlyDataService(string subscriptionKey);
bool InitializeSubscriptionUnackedReadonlyDataService(string tenant, string product, string component, string topic, string subscription);

// Data services for Cluster
ITopicDataService<ClusterChangeLog> GetClusterDataService(string nodeId);
ConcurrentDictionary<string, ITopicDataService<ClusterChangeLog>> GetClusterDataServices();
void InitializeClusterDataService(Replica replica);
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
using Buildersoft.Andy.X.Model.Clusters;
using System.Collections.Generic;

namespace Buildersoft.Andy.X.Core.Abstractions.Repositories.Clusters
{
public interface IClusterRepository
{
void ConnectNode(string nodeId);
void DisconnectNode(string nodeId);
List<ReplicaShardConnection> GetReplicaShardConnections();
ReplicaShardConnection GetMainReplicaConnection(string nodeId);
ReplicaShardConnection GetMainReplicaConnectionByIndex(int index);
Cluster GetCluster();
Shard NewShard();
bool AddReplicaInLastShard(Replica replica);

bool AddReplicaConnectionToShard(string nodeId, string nodeConnectionId);
bool RemoveReplicaConnectionToShard(string nodeId);
bool RemoveReplicaConnectionFromShard(string nodeId);

Shard GetCurrentShard();
Replica GetCurrentReplica();
Expand Down
Loading