Skip to content

Commit

Permalink
feat: add support for publishing batch announcements for non-DSNP sch…
Browse files Browse the repository at this point in the history
…emas (#720)

# Description
- asset controller modified to allow upload of
application/vnd.apache.parquet assets
- content controller endpoint added:
- /v2/content/batchAnnouncement endpoint takes schemaId & referenceId
and enqueues a job to post the batch file to the chain
- "worker" app adds processing to take enqueued job and post to the
chain

Closes #719
  • Loading branch information
JoeCap08055 authored Feb 22, 2025
1 parent db1b61a commit e2e077b
Show file tree
Hide file tree
Showing 15 changed files with 263 additions and 42 deletions.
6 changes: 3 additions & 3 deletions apps/content-publishing-api/k6-test/script.k6.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export default function () {
{
let url = BASE_URL + `/v1/content/${msaId}`;
const body = {
targetContentHash: 'bdyqdua4t4pxgy37mdmjyqv3dejp5betyqsznimpneyujsur23yubzna',
targetContentHash: 'bafybeibrueoxoturxz4vfmnc7geejiiqmnygk7os2of32ic3bnr5t6twiy',
targetAnnouncementType: 'broadcast',
content: validContentNoUploadedAssets,
};
Expand All @@ -58,7 +58,7 @@ export default function () {
{
let url = BASE_URL + `/v1/content/${msaId}`;
const body = {
targetContentHash: 'bdyqdua4t4pxgy37mdmjyqv3dejp5betyqsznimpneyujsur23yubzna',
targetContentHash: 'bafybeibrueoxoturxz4vfmnc7geejiiqmnygk7os2of32ic3bnr5t6twiy',
targetAnnouncementType: 'broadcast',
content: createContentWithAsset(BASE_URL),
};
Expand All @@ -74,7 +74,7 @@ export default function () {
{
let url = BASE_URL + `/v1/content/${msaId}`;
let body = {
targetContentHash: 'bdyqdua4t4pxgy37mdmjyqv3dejp5betyqsznimpneyujsur23yubzna',
targetContentHash: 'bafybeibrueoxoturxz4vfmnc7geejiiqmnygk7os2of32ic3bnr5t6twiy',
targetAnnouncementType: 'broadcast',
};
let params = { headers: { 'Content-Type': 'application/json', Accept: 'application/json' } };
Expand Down
53 changes: 41 additions & 12 deletions apps/content-publishing-api/src/api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
isImage,
UploadResponseDto,
OnChainContentDto,
isParquet,
BatchFileDto,
} from '#types/dtos/content-publishing';
import { IRequestJob, IAssetMetadata, IAssetJob, IPublisherJob } from '#types/interfaces/content-publishing';
import { ContentPublishingQueues as QueueConstants } from '#types/constants/queue.constants';
Expand All @@ -34,6 +36,7 @@ export class ApiService {
@InjectQueue(QueueConstants.REQUEST_QUEUE_NAME) private requestQueue: Queue,
@InjectQueue(QueueConstants.ASSET_QUEUE_NAME) private assetQueue: Queue,
@InjectQueue(QueueConstants.PUBLISH_QUEUE_NAME) private publishQueue: Queue,
@InjectQueue(QueueConstants.BATCH_QUEUE_NAME) private readonly batchAnnouncerQueue: Queue,
) {
this.logger = new Logger(this.constructor.name);
}
Expand Down Expand Up @@ -86,6 +89,21 @@ export class ApiService {
};
}

public async enqueueBatchRequest(batchFile: BatchFileDto): Promise<AnnouncementResponseDto> {
const data = {
id: '',
...batchFile,
};
data.id = this.calculateJobId(data);
const job = await this.batchAnnouncerQueue.add(`Batch Request Job - ${data.id}`, data, {
jobId: data.id,
removeOnFail: false,
removeOnComplete: 2000,
}); // TODO: should come from queue configs
this.logger.debug(`Enqueued Batch Request Job: ${job.id}`);
return { referenceId: data.id };
}

async validateAssetsAndFetchMetadata(
content: AssetIncludedRequestDto,
): Promise<IRequestJob['assetToMimeType'] | undefined> {
Expand All @@ -103,6 +121,10 @@ export class ApiService {
}),
),
);
} else if (content.batchFiles) {
content.batchFiles.forEach((batchFile) =>
checkingList.push({ onlyImage: false, referenceId: batchFile.referenceId }),
);
}

const redisResults = await Promise.all(
Expand Down Expand Up @@ -147,18 +169,25 @@ export class ApiService {
STORAGE_EXPIRE_UPPER_LIMIT_SECONDS,
f.buffer,
);
const type = ((m) => {
switch (m) {
case 'image':
return AttachmentType.IMAGE;
case 'audio':
return AttachmentType.AUDIO;
case 'video':
return AttachmentType.VIDEO;
default:
throw new Error('Invalid MIME type');
}
})(f.mimetype.split('/')[0]);

let type: AttachmentType;
this.logger.debug(`File mime type is: ${f.mimetype}`);
if (isParquet(f.mimetype)) {
type = AttachmentType.PARQUET;
} else {
type = ((m) => {
switch (m) {
case 'image':
return AttachmentType.IMAGE;
case 'audio':
return AttachmentType.AUDIO;
case 'video':
return AttachmentType.VIDEO;
default:
throw new Error('Invalid MIME type');
}
})(f.mimetype.split('/')[0]);
}

const assetCache: IAssetMetadata = {
ipfsCid: references[index],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import {
Param,
UnauthorizedException,
Inject,
HttpStatus,
} from '@nestjs/common';
import { ApiOperation, ApiResponse, ApiTags } from '@nestjs/swagger';
import { ApiService } from '../../api.service';
import { AnnouncementResponseDto, OnChainContentDto } from '#types/dtos/content-publishing';
import { AnnouncementResponseDto, BatchFilesDto, OnChainContentDto } from '#types/dtos/content-publishing';
import { BlockchainRpcQueryService } from '#blockchain/blockchain-rpc-query.service';
import { MsaIdDto } from '#types/dtos/common';
import apiConfig, { IContentPublishingApiConfig } from '#content-publishing-api/api.config';
Expand All @@ -26,7 +27,6 @@ export class ContentControllerV2 {
// eslint-disable-next-line no-empty-function
) {}

// eslint-disable-next-line class-methods-use-this
@Post(':msaId/on-chain')
@ApiOperation({ summary: 'Create on-chain content for a given schema' })
@HttpCode(202)
Expand Down Expand Up @@ -62,4 +62,13 @@ export class ContentControllerV2 {
}
return this.apiService.enqueueContent(onBehalfOf, contentDto) as Promise<AnnouncementResponseDto>;
}

@Post('batchAnnouncement')
@ApiOperation({ summary: 'Create off-chain batch content announcements' })
@HttpCode(HttpStatus.ACCEPTED)
@ApiResponse({ status: '2XX', type: AnnouncementResponseDto })
async postBatches(@Body() batchContentDto: BatchFilesDto): Promise<AnnouncementResponseDto[]> {
const promises = batchContentDto.batchFiles.map((batchFile) => this.apiService.enqueueBatchRequest(batchFile));
return Promise.all(promises);
}
}
4 changes: 2 additions & 2 deletions apps/content-publishing-api/test/mockRequestData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ export const validBroadCastNoUploadedAssets = {

export const validReplyNoUploadedAssets = {
content: validContentNoUploadedAssets,
inReplyTo: 'dsnp://78187493520/bdyqdua4t4pxgy37mdmjyqv3dejp5betyqsznimpneyujsur23yubzna',
inReplyTo: 'dsnp://78187493520/bafybeibrueoxoturxz4vfmnc7geejiiqmnygk7os2of32ic3bnr5t6twiy',
};

export const validReaction = {
emoji: '🤌🏼',
apply: 5,
inReplyTo: 'dsnp://78187493520/bdyqdua4t4pxgy37mdmjyqv3dejp5betyqsznimpneyujsur23yubzna',
inReplyTo: 'dsnp://78187493520/bafybeibrueoxoturxz4vfmnc7geejiiqmnygk7os2of32ic3bnr5t6twiy',
};

export const validProfileNoUploadedAssets = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { SchedulerRegistry } from '@nestjs/schedule';
import { BatchAnnouncer } from './batch.announcer';
import { BaseConsumer } from '#consumer';
import { ContentPublishingQueues as QueueConstants, CAPACITY_EPOCH_TIMEOUT_NAME } from '#types/constants';
import { IBatchAnnouncerJobData } from '../interfaces';
import { IBatchAnnouncerJob, isExistingBatch } from '../interfaces';

@Injectable()
@Processor(QueueConstants.BATCH_QUEUE_NAME, {
Expand All @@ -30,10 +30,12 @@ export class BatchAnnouncementService extends BaseConsumer implements OnModuleDe
await super.onModuleDestroy();
}

async process(job: Job<IBatchAnnouncerJobData, any, string>): Promise<any> {
async process(job: Job<IBatchAnnouncerJob, any, string>): Promise<any> {
this.logger.log(`Processing job ${job.id} of type ${job.name}`);
try {
const publisherJob = await this.ipfsPublisher.announce(job.data);
const publisherJob = await (isExistingBatch(job.data)
? this.ipfsPublisher.announceExistingBatch(job.data)
: this.ipfsPublisher.announce(job.data));
// eslint-disable-next-line no-promise-executor-return
await this.publishQueue.add(publisherJob.id, publisherJob, {
jobId: publisherJob.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { IBatchAnnouncerJobData } from '../interfaces';
import ipfsConfig, { getIpfsCidPlaceholder, IIpfsConfig } from '#storage/ipfs/ipfs.config';
import { IpfsService } from '#storage';
import { STORAGE_EXPIRE_UPPER_LIMIT_SECONDS } from '#types/constants';
import { IPublisherJob } from '#types/interfaces/content-publishing';
import { IBatchFile, IPublisherJob } from '#types/interfaces/content-publishing';

@Injectable()
export class BatchAnnouncer {
Expand Down Expand Up @@ -69,6 +69,17 @@ export class BatchAnnouncer {
return { id: batchId, schemaId, data: { cid, payloadLength: size } };
}

public async announceExistingBatch(batch: IBatchFile): Promise<IPublisherJob> {
// Get previously uploaded file from IPFS
const { Key: cid, Size: size } = await this.ipfsService.getInfo(batch.referenceId);

const ipfsUrl = await this.formIpfsUrl(cid);
const response = { id: batch.referenceId, schemaId: batch.schemaId, data: { cid, payloadLength: size } };
this.logger.debug(`Created job to announce existing batch: ${JSON.stringify(response)}`);
this.logger.debug(`IPFS URL: ${ipfsUrl}`);
return response;
}

private async bufferPublishStream(publishStream: PassThrough): Promise<Buffer> {
this.logger.debug('Buffering publish stream');
return new Promise<Buffer>((resolve, reject) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import { IBatchFile } from '#types/interfaces';
import { Announcement } from '#types/interfaces/content-publishing/dsnp';

export interface IBatchAnnouncerJobData {
batchId: string;
schemaId: number;
announcements: Announcement[];
}

export type IBatchAnnouncerJob = IBatchAnnouncerJobData | IBatchFile;

export function isExistingBatch(data: IBatchAnnouncerJob): data is IBatchFile {
return 'referenceId' in data;
}
2 changes: 1 addition & 1 deletion docs/content-publishing/index.html

Large diffs are not rendered by default.

37 changes: 35 additions & 2 deletions libs/storage/src/ipfs/ipfs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ import { extension as getExtension } from 'mime-types';
import { FilePin } from '#storage/ipfs/pin.interface';
import { calculateDsnpMultiHash } from '#utils/common/common.utils';

export interface IpfsBlockStatResponse {
Key?: string;
Size?: number;
Message?: string;
Code?: number;
Type?: string;
}

@Injectable()
export class IpfsService {
logger: Logger;
Expand Down Expand Up @@ -51,6 +59,26 @@ export class IpfsService {
return data;
}

public async getInfo(cid: string, checkExistence = true): Promise<IpfsBlockStatResponse> {
if (checkExistence && !(await this.isPinned(cid))) {
return Promise.resolve({ Message: 'Requested resource does not exist', Type: 'error' });
}

const ipfsGet = `${this.config.ipfsEndpoint}/api/v0/block/stat?arg=${cid}`;
const ipfsAuthUser = this.config.ipfsBasicAuthUser;
const ipfsAuthSecret = this.config.ipfsBasicAuthSecret;
const ipfsAuth =
ipfsAuthUser && ipfsAuthSecret
? `Basic ${Buffer.from(`${ipfsAuthUser}:${ipfsAuthSecret}`).toString('base64')}`
: '';

const headers = { Accept: '*/*', Connection: 'keep-alive', authorization: ipfsAuth };

const response = await axios.post(ipfsGet, null, { headers, responseType: 'json' });
this.logger.debug(`IPFS response: ${JSON.stringify(response.data)}`);
return response.data as IpfsBlockStatResponse;
}

public async isPinned(cid: string): Promise<boolean> {
const parsedCid = CID.parse(cid);
const v0Cid = parsedCid.toV0().toString();
Expand Down Expand Up @@ -79,9 +107,14 @@ export class IpfsService {

public async ipfsPin(mimeType: string, file: Buffer, calculateDsnpHash = true): Promise<FilePin> {
const fileName = calculateDsnpHash ? await calculateDsnpMultiHash(file) : randomUUID().toString();
const extension = getExtension(mimeType);
let extension = getExtension(mimeType);
// NOTE: 'application/vnd.apache.parquet' has been officially accepted by IANA, but the 'mime-db' package has not been updated
if (extension === false) {
throw new Error(`unknown mimetype: ${mimeType}`);
if (mimeType === 'application/vnd.apache.parquet') {
extension = 'parquet';
} else {
throw new Error(`unknown mimetype: ${mimeType}`);
}
}
const ipfs = await this.ipfsPinBuffer(`${fileName}.${extension}`, mimeType, file);
return { ...ipfs, hash: calculateDsnpHash ? fileName : '' };
Expand Down
Loading

0 comments on commit e2e077b

Please sign in to comment.