Skip to content

Commit

Permalink
Merge pull request #1902 from alkem-io/server-1929
Browse files Browse the repository at this point in the history
Subscriptions for Aspects
  • Loading branch information
me-andre authored May 5, 2022
2 parents 5562472 + 5c46bc8 commit c6507ce
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 13 deletions.
2 changes: 2 additions & 0 deletions src/common/constants/providers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ export const SUBSCRIPTION_UPDATE_MESSAGE =
'alkemio-subscriptions-update-message';
export const SUBSCRIPTION_CANVAS_CONTENT =
'alkemio-subscriptions-canvas-content';
export const SUBSCRIPTION_CONTEXT_ASPECT_CREATED =
'alkemio-subscriptions-context-aspect-created';
export const SUBSCRIPTION_PROFILE_VERIFIED_CREDENTIAL =
'alkemio-subscriptions-profile-verified-credential';
export const NOTIFICATIONS_SERVICE = 'alkemio-notifications';
Expand Down
1 change: 1 addition & 0 deletions src/common/enums/messaging.queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ export enum MessagingQueue {
SUBSCRIPTION_UPDATE_MESSAGE = 'alkemio-subscriptions-update-message',
SUBSCRIPTION_CANVAS_CONTENT = 'alkemio-subscriptions-canvas-content',
SUBSCRIPTION_PROFILE_VERIFIED_CREDENTIAL = 'alkemio-subscriptions-profile-verified-credential',
SUBSCRIPTION_CONTEXT_ASPECT_CREATED = 'alkemio-subscriptions-context-aspect-created',
}
1 change: 1 addition & 0 deletions src/common/enums/subscription.type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ export enum SubscriptionType {
COMMUNICATION_COMMENTS_MESSAGE_RECEIVED = 'communicationCommentsMessageReceived',
CANVAS_CONTENT_UPDATED = 'canvasContentUpdated',
PROFILE_VERIFIED_CREDENTIAL = 'profileVerifiedCredential',
CONTEXT_ASPECT_CREATED = 'contextAspectCreated',
}
8 changes: 8 additions & 0 deletions src/core/microservices/microservices.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
SUBSCRIPTION_DISCUSSION_MESSAGE,
SUBSCRIPTION_UPDATE_MESSAGE,
SUBSCRIPTION_CANVAS_CONTENT,
SUBSCRIPTION_CONTEXT_ASPECT_CREATED,
WALLET_MANAGEMENT_SERVICE,
SUBSCRIPTION_DISCUSSION_UPDATED,
SUBSCRIPTION_PROFILE_VERIFIED_CREDENTIAL,
Expand All @@ -29,6 +30,7 @@ import { subscriptionCanvasContentFactory } from './subscription.canvas.content.
import { subscriptionUpdateMessageFactory } from './subscription.update.message.factory';
import { subscriptionDiscussionUpdatedFactory } from './subscription.discussion.updated.factory';
import { subscriptionProfileVerifiedCredentialFactory } from './subscription.profile.verified.credential.factory';
import { subscriptionContextAspectCreatedFactory } from '@core/microservices/subscription.context.aspect.created.factory';

@Global()
@Module({
Expand Down Expand Up @@ -64,6 +66,11 @@ import { subscriptionProfileVerifiedCredentialFactory } from './subscription.pro
useFactory: subscriptionCanvasContentFactory,
inject: [WINSTON_MODULE_NEST_PROVIDER, ConfigService],
},
{
provide: SUBSCRIPTION_CONTEXT_ASPECT_CREATED,
useFactory: subscriptionContextAspectCreatedFactory,
inject: [WINSTON_MODULE_NEST_PROVIDER, ConfigService],
},
{
provide: SUBSCRIPTION_PROFILE_VERIFIED_CREDENTIAL,
useFactory: subscriptionProfileVerifiedCredentialFactory,
Expand All @@ -85,6 +92,7 @@ import { subscriptionProfileVerifiedCredentialFactory } from './subscription.pro
SUBSCRIPTION_DISCUSSION_UPDATED,
SUBSCRIPTION_UPDATE_MESSAGE,
SUBSCRIPTION_CANVAS_CONTENT,
SUBSCRIPTION_CONTEXT_ASPECT_CREATED,
SUBSCRIPTION_PROFILE_VERIFIED_CREDENTIAL,
NOTIFICATIONS_SERVICE,
WALLET_MANAGEMENT_SERVICE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { ConfigurationTypes, LogContext } from '@common/enums';
import { LoggerService } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { AMQPPubSub } from 'graphql-amqp-subscriptions';
import { PubSubEngine } from 'graphql-subscriptions';
import amqp from 'amqplib';
import { MessagingQueue } from '@common/enums/messaging.queue';

export async function subscriptionContextAspectCreatedFactory(
logger: LoggerService,
configService: ConfigService
): Promise<PubSubEngine | undefined> {
const rabbitMqOptions = configService.get(
ConfigurationTypes.MICROSERVICES
)?.rabbitmq;
const connectionOptions = rabbitMqOptions.connection;
const connectionString = `amqp://${connectionOptions.user}:${connectionOptions.password}@${connectionOptions.host}:${connectionOptions.port}?heartbeat=30`;

return amqp
.connect(connectionString)
.then(conn => {
return new AMQPPubSub({
connection: conn,
exchange: {
// RabbitMQ subscriptions exchange name
name: 'alkemio-graphql-subscriptions',
// RabbitMQ exchange type. There are 4 exchange types:
// TOPIC - Topic exchanges route messages to one or many queues based on matching between a message routing key and the pattern that was used to bind a queue to an exchange.
// The topic exchange type is often used to implement various publish/subscribe pattern variations. Topic exchanges are commonly used for the multicast routing of messages.
// DIRECT - A direct exchange delivers messages to queues based on the message routing key.
// A direct exchange is ideal for the unicast routing of messages (although they can be used for multicast routing as well).
// HEADERS - A headers exchange is designed for routing on multiple attributes that are more easily expressed as message headers than a routing key. Headers exchanges ignore the routing key attribute.
// Instead, the attributes used for routing are taken from the headers attribute. A message is considered matching if the value of the header equals the value specified upon binding.
// FANOUT - A fanout exchange routes messages to all of the queues that are bound to it and the routing key is ignored.
// If N queues are bound to a fanout exchange, when a new message is published to that exchange a copy of the message is delivered to all N queues.
// Fanout exchanges are ideal for the broadcast routing of messages.
type: 'topic',
options: {
// the exchange will survive a broker restart
durable: true,
// exchange is deleted when last queue is unbound from it
autoDelete: false,
},
},
queue: {
name: MessagingQueue.SUBSCRIPTION_CONTEXT_ASPECT_CREATED,
options: {
// used by only one connection and the queue will be deleted when that connection closes
exclusive: false,
// the queue will survive a broker restart
durable: true,
// queue that has had at least one consumer is deleted when last consumer unsubscribes
autoDelete: false,
},
// Unbind from the RabbitMQ queue when disposing the pubsub connection
unbindOnDispose: false,
// Delete the RabbitMQ queue when disposing the pubsub connection
deleteOnDispose: false,
},
});
})
.catch(err => {
logger.error(
`Could not connect to RabbitMQ: ${err}, logging in...`,
LogContext.SUBSCRIPTIONS
);
return undefined;
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,30 +48,30 @@ export class CommentsResolverSubscriptions {
async filter(
this: CommentsResolverSubscriptions,
payload: CommentsMessageReceived,
variables: any,
context: any
variables: { commentsID: string },
context: { req: { user: AgentInfo } }
) {
const agentInfo = context.req?.user;
const agentInfo = context.req.user;
const logMsgPrefix = `[User (${agentInfo.email}) Comments] - `;
const commentsIDs: string[] = variables.commentsIDs;
this.logger.verbose?.(
`${logMsgPrefix} Filtering event '${payload.eventID}'`,
`${logMsgPrefix} Filtering event '${payload.eventID}'`,
LogContext.SUBSCRIPTIONS
);

const inList = commentsIDs.includes(payload.commentsID);
const isSameCommentsInstance =
payload.commentsID === variables.commentsID;
this.logger.verbose?.(
`${logMsgPrefix} Filter result is ${inList}`,
`${logMsgPrefix} Filter result is ${isSameCommentsInstance}`,
LogContext.SUBSCRIPTIONS
);
return inList;
return isSameCommentsInstance;
},
})
async communicationUpdateMessageReceived(
async communicationCommentsMessageReceived(
@CurrentUser() agentInfo: AgentInfo,
@Args({
name: 'commentsID',
type: () => [UUID],
type: () => UUID,
description: 'The ID of the Comments to subscribe to.',
nullable: false,
})
Expand Down
2 changes: 2 additions & 0 deletions src/domain/context/context/context.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { ReferenceModule } from '@domain/common/reference/reference.module';
import { ContextResolverMutations } from './context.resolver.mutations';
import { ContextResolverSubscriptions } from './context.resolver.subscriptions';
import { ContextService } from '@domain/context/context/context.service';
import { Context } from '@domain/context/context';
import { EcosystemModelModule } from '@domain/context/ecosystem-model/ecosystem-model.module';
Expand All @@ -28,6 +29,7 @@ import { NamingModule } from '@services/domain/naming/naming.module';
],
providers: [
ContextResolverMutations,
ContextResolverSubscriptions,
ContextResolverFields,
ContextService,
ContextAuthorizationService,
Expand Down
25 changes: 22 additions & 3 deletions src/domain/context/context/context.resolver.mutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ import { DeleteCanvasOnContextInput } from './dto/context.dto.delete.canvas';
import { AspectAuthorizationService } from '../aspect/aspect.service.authorization';
import { CreateAspectOnContextInput } from './dto/context.dto.create.aspect';
import { IAspect } from '../aspect/aspect.interface';
import { PubSubEngine } from 'graphql-subscriptions';
import { SubscriptionType } from '@common/enums/subscription.type';
import { ContextAspectCreated } from '@domain/context/context/dto/context.dto.event.aspect.created';
import { Inject } from '@nestjs/common';
import { SUBSCRIPTION_CONTEXT_ASPECT_CREATED } from '@src/common/constants/providers';

@Resolver()
export class ContextResolverMutations {
constructor(
Expand All @@ -25,7 +31,9 @@ export class ContextResolverMutations {
private authorizationService: AuthorizationService,
private canvasAuthorizationService: CanvasAuthorizationService,
private aspectAuthorizationService: AspectAuthorizationService,
private contextService: ContextService
private contextService: ContextService,
@Inject(SUBSCRIPTION_CONTEXT_ASPECT_CREATED)
private aspectCreatedSubscription: PubSubEngine
) {}

@UseGuards(GraphqlGuard)
Expand Down Expand Up @@ -73,14 +81,25 @@ export class ContextResolverMutations {
AuthorizationPrivilege.CREATE_ASPECT,
`create aspect on context: ${context.id}`
);
const aspect = await this.contextService.createAspect(
let aspect = await this.contextService.createAspect(
aspectData,
agentInfo.userID
);
return await this.aspectAuthorizationService.applyAuthorizationPolicy(
aspect = await this.aspectAuthorizationService.applyAuthorizationPolicy(
aspect,
context.authorization
);
const aspectCreatedEvent: ContextAspectCreated = {
eventID: `context-aspect-created-${Math.round(Math.random() * 100)}`,
contextID: context.id,
aspect,
};
await this.aspectCreatedSubscription.publish(
SubscriptionType.CONTEXT_ASPECT_CREATED,
aspectCreatedEvent
);

return aspect;
}

@UseGuards(GraphqlGuard)
Expand Down
95 changes: 95 additions & 0 deletions src/domain/context/context/context.resolver.subscriptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { CurrentUser } from '@common/decorators/current-user.decorator';
import { SubscriptionType } from '@common/enums/subscription.type';
import { AgentInfo } from '@core/authentication/agent-info';
import { GraphqlGuard } from '@core/authorization';
import { Inject, LoggerService, UseGuards } from '@nestjs/common';
import { Args, Resolver, Subscription } from '@nestjs/graphql';
import { WINSTON_MODULE_NEST_PROVIDER } from 'nest-winston';
import { PubSubEngine } from 'graphql-subscriptions';
import { LogContext } from '@common/enums/logging.context';
import { UUID } from '@domain/common/scalars/scalar.uuid';
import { AuthorizationService } from '@core/authorization/authorization.service';
import { AuthorizationPrivilege } from '@common/enums/authorization.privilege';
import { SUBSCRIPTION_CONTEXT_ASPECT_CREATED } from '@common/constants/providers';
import { ContextService } from '@domain/context/context/context.service';
import { ContextAspectCreated } from '@domain/context/context/dto/context.dto.event.aspect.created';

@Resolver()
export class ContextResolverSubscriptions {
constructor(
@Inject(WINSTON_MODULE_NEST_PROVIDER)
private readonly logger: LoggerService,
@Inject(SUBSCRIPTION_CONTEXT_ASPECT_CREATED)
private subscriptionAspectCreated: PubSubEngine,
private contextService: ContextService,
private authorizationService: AuthorizationService
) {}

@UseGuards(GraphqlGuard)
@Subscription(() => ContextAspectCreated, {
description:
'Receive new Update messages on Communities the currently authenticated User is a member of.',
async resolve(
this: ContextResolverSubscriptions,
value: ContextAspectCreated,
_: unknown,
context: { req: { user: AgentInfo } }
): Promise<ContextAspectCreated> {
const agentInfo = context.req.user;
const logMsgPrefix = `[User (${agentInfo.email}) Context Aspects] - `;
this.logger.verbose?.(
`${logMsgPrefix} sending out event for Aspects on Context: ${value.contextID} `,
LogContext.SUBSCRIPTIONS
);
return value;
},
async filter(
this: ContextResolverSubscriptions,
payload: ContextAspectCreated,
variables: { contextID: string },
context: { req: { user: AgentInfo } }
) {
const agentInfo = context.req.user;
const logMsgPrefix = `[User (${agentInfo.email}) Context Aspects] - `;
this.logger.verbose?.(
`${logMsgPrefix} Filtering event '${payload.eventID}'`,
LogContext.SUBSCRIPTIONS
);

const isSameContext = payload.contextID === variables.contextID;
this.logger.verbose?.(
`${logMsgPrefix} Filter result is ${isSameContext}`,
LogContext.SUBSCRIPTIONS
);
return isSameContext;
},
})
async contextAspectCreated(
@CurrentUser() agentInfo: AgentInfo,
@Args({
name: 'contextID',
type: () => UUID,
description: 'The ID of the Context to subscribe to.',
nullable: false,
})
contextID: string
) {
const logMsgPrefix = `[User (${agentInfo.email}) Context Aspects] - `;
this.logger.verbose?.(
`${logMsgPrefix} Subscribing to the following Context Aspects: ${contextID}`,
LogContext.SUBSCRIPTIONS
);
// check the user has the READ privilege
const context = await this.contextService.getContextOrFail(contextID);
await this.authorizationService.grantAccessOrFail(
agentInfo,
context.authorization,
AuthorizationPrivilege.READ,
`subscription to new Aspects on Context: ${context.id}`
);

return this.subscriptionAspectCreated.asyncIterator(
SubscriptionType.CONTEXT_ASPECT_CREATED
);
}
}
20 changes: 20 additions & 0 deletions src/domain/context/context/dto/context.dto.event.aspect.created.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { IAspect } from '@src/domain';
import { Field, ObjectType } from '@nestjs/graphql';

@ObjectType('ContextAspectCreated')
export class ContextAspectCreated {
eventID!: string;

@Field(() => String, {
nullable: false,
description:
'The identifier for the Context on which the aspect was created.',
})
contextID!: string;

@Field(() => IAspect, {
nullable: false,
description: 'The aspect that has been created.',
})
aspect!: IAspect;
}

0 comments on commit c6507ce

Please sign in to comment.