Skip to content

Commit

Permalink
Added new endpoints + fixed fetching data in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
maikReal committed Jul 17, 2024
1 parent d63e57d commit 0590ebc
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 91 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ CREATE TABLE public.warpdrive_streaks (
The main table in the V2 version is **users_all_historical_data**

```sql
CREATE TABLE public.users_all_historical_data (
CREATE TABLE public.users_casts_historical_data (
fid int4 NULL,
cast_timestamp timestamp NULL,
cast_text text NULL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ export const GET = async (
const currentHeaders = headers();
const { searchParams } = new URL(request.url);

const typePeriod = searchParams.get("period") as HistoricalDataPeriods | null;
let typePeriod: string | null = searchParams.get("period");
const parsedPeriod: HistoricalDataPeriods | null = typePeriod
? (parseInt(typePeriod) as HistoricalDataPeriods)
: null;

console.log(
`[DEBUG - api/db/get-stats-by-period] Trying to get info for the following time period: ${typePeriod}`
`[DEBUG - ${logsFilenamePath}] Trying to get info for the following time period: ${typePeriod}`
);

if (!typePeriod) {
Expand All @@ -45,29 +48,14 @@ export const GET = async (

const historialDataProcessor = new FarcasterHistoricalDataProcessor(
params.fid,
typePeriod
parsedPeriod
);

try {
console.log("Fetching historical data...");
await historialDataProcessor.fetchHistoricalData();
// const historicalFidData = historialDataProcessor.getHistoricalData();
const historicalFidData = historialDataProcessor.getDataWithReactions();

// const comparisonAnanlytics =
// await historialDataProcessor.getComparisonAnalytics();
// const response = await axios.get(
// `${process.env.NEXT_PUBLIC_NODE_ADDRESS}/v1/castsByFid?fid=${params.fid}&reverse=1&pageSize=3&nextPageToken=BqA1F2RVlF1ftb/Y4VezGBwakFwbxRxd`
// );

// console.log(`API Returned HTTP status ${response.status}`);
// console.log(`Data recieved!`);
// // console.log(`Data: ${response}`);
// console.log(response.data.messages);
// return generateApiResponse({ status: 200 }, response.data);

// console.log(historicalFidData);

return generateApiResponse({ status: 200 }, historicalFidData);
} catch (e) {
// Handle errors
Expand Down
61 changes: 61 additions & 0 deletions src/app/api/v2/get-fid-comparison-analytics/[fid]/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { NextRequest } from "next/server";
import { headers } from "next/headers";
import {
nonAuthHttpResponse,
internalServerErrorHttpResponse,
successHttpResponse,
verifyAuth,
generateApiResponse,
getCurrentFilePath,
unprocessableHttpResponse,
} from "@/utils/helpers";
import { AnalyticalManager } from "@/utils/v2/database/analyticalManager";
import { logInfo } from "@/utils/v2/logs/sentryLogger";

const logsFilenamePath = getCurrentFilePath();

export type ComparisonAnalyticsDataPeriods = 7 | 14 | 28;

/**
* The route to get a fid's comparison analytics for last 60 days of his posts. Likes, recasts, replies,
* and the number of casts will be included to this statisti
*
* @param request
* @param params
* @returns
*/
export const GET = async (
request: NextRequest,
{ params }: { params: { fid: number } }
) => {
const currentHeaders = headers();
const { searchParams } = new URL(request.url);

let typePeriod: string | null = searchParams.get("period");
const parsedPeriod: ComparisonAnalyticsDataPeriods | null = typePeriod
? (parseInt(typePeriod) as ComparisonAnalyticsDataPeriods)
: null;

logInfo(
`[DEBUG - ${logsFilenamePath}] Trying to get comparison analytics for the following time period: ${typePeriod}`
);

if (!parsedPeriod) {
return unprocessableHttpResponse();
}

if (!verifyAuth(currentHeaders)) {
return nonAuthHttpResponse();
}

try {
const analyticalManager = new AnalyticalManager(params.fid);

const result = await analyticalManager.getComparisonAnalytics(parsedPeriod);

return generateApiResponse({ status: 200 }, result);
} catch (err) {
console.log(err);
return internalServerErrorHttpResponse(err);
}
};
59 changes: 59 additions & 0 deletions src/app/api/v2/get-fid-followers/[fid]/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { NextRequest, NextResponse } from "next/server";
import neynarClient from "@/clients/neynar";
import { internalServerErrorHttpResponse } from "@/utils/helpers";
import { DatabaseManager } from "@/utils/v2/database/databaseManager";

export interface UserInfo {
fid: number;
username: string;
display_name: string;
pfp_url: string;
follower_count: number;
following_count: number;
verifications: object;
}

export async function GET(
request: NextRequest,
{ params }: { params: { fid: string } }
) {
try {
const fid = parseInt(params.fid);

const { users } = await neynarClient.fetchBulkUsers([fid]);

if (users && users.length > 0) {
let userData = users[0] as UserInfo;

const dbManager = DatabaseManager.getInstance();
dbManager.initialize();

const query = `
INSERT INTO users_info (fid, username, display_name, pfp_url, followers, following, verified_address)
VALUES ($1, $2, $3, $4, $5, $6, $7)
`;

const queryParams = [
fid,
userData.username,
userData.display_name,
userData.pfp_url,
userData.follower_count,
userData.following_count,
JSON.stringify(userData.verifications),
];

dbManager.executeQuery(query, queryParams);

return NextResponse.json({ user: userData }, { status: 200 });
} else {
return NextResponse.json({ user: null }, { status: 201 });
}
} catch (err) {
console.error(
"[ERROR - api/v2/farcaster-data/fetch-bio/[fid]] Error while getting bio info about a user",
err
);
return internalServerErrorHttpResponse(err);
}
}
5 changes: 5 additions & 0 deletions src/utils/db-config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { Pool } from "pg";

export const sharedDatabasePool = new Pool({
connectionString: process.env.POSTGRES_URL,
});
84 changes: 24 additions & 60 deletions src/utils/v2/dataProcessors/farcasterDataProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import {
FarcasterReactionsDataProcessor,
} from "./farcasterReactionsProcessor";
import { DatabaseManager } from "../database/databaseManager";
import { logInfo } from "../logs/sentryLogger";

export type HistoricalDataPeriods = "all" | "60days";
export type HistoricalDataPeriods = 60 | 90;

export interface HistoricalDataFormat {
data: {
Expand All @@ -28,7 +29,7 @@ export interface HistoricalDataFormat {
export class FarcasterHistoricalDataProcessor {
private fid: number;
private nowTime: Date = new Date();
private periodStartDate: Date;
private periodEndDate: Date;
private userHistoricalData: Array<HistoricalDataFormat>;
private batchSize: number;

Expand All @@ -39,11 +40,12 @@ export class FarcasterHistoricalDataProcessor {

constructor(
fid: number,
period: HistoricalDataPeriods,
period: HistoricalDataPeriods | null,
batchSize: number = 5
) {
this.fid = fid;
this.periodStartDate = this.calcualtePeriodStartDate(period);
this.periodEndDate = this.calcualtePeriodStartDate(period);
logInfo(`End date: ${this.periodEndDate}`);
this.userHistoricalData = new Array<HistoricalDataFormat>();
this.batchSize = batchSize;

Expand All @@ -52,14 +54,15 @@ export class FarcasterHistoricalDataProcessor {
this.dbManager = DatabaseManager.getInstance();
}

private calcualtePeriodStartDate(period: HistoricalDataPeriods) {
private calcualtePeriodStartDate(period: HistoricalDataPeriods | null) {
switch (period) {
case "60days":
return new Date(this.nowTime.getTime() - 60 * 24 * 60 * 60 * 1000);
// If there is no period: return data for the last 1 year
case null:
return new Date(this.nowTime.getTime() - 360 * 24 * 60 * 60 * 1000);

// Back data for last 60 days by default
// If there is a period: use it to calcualte the date
default:
return new Date(this.nowTime.getTime() - 60 * 24 * 60 * 60 * 1000);
return new Date(this.nowTime.getTime() - period * 24 * 60 * 60 * 1000);
}
}

Expand Down Expand Up @@ -89,7 +92,7 @@ export class FarcasterHistoricalDataProcessor {
console.log(`Processed batch: ${userCasts.length} casts`);
console.log("Latest cast timestamp:", firstMessageTimestamp);

if (firstMessageTimestamp < this.periodStartDate || !nextPageToken) {
if (firstMessageTimestamp < this.periodEndDate || !nextPageToken) {
shouldContinue = false;
break;
}
Expand All @@ -102,9 +105,6 @@ export class FarcasterHistoricalDataProcessor {
console.log("Total number of casts:", this.userHistoricalData.length);

// this.reactionsDataProcessor.setUserCasts(this.userHistoricalData);

// Close the database connection after adding all user's historical data
this.dbManager.close();
}

private async fetchSingleRequest(pageToken: string): Promise<{
Expand Down Expand Up @@ -141,7 +141,7 @@ export class FarcasterHistoricalDataProcessor {
this.userHistoricalData = [...this.userHistoricalData, ...batch];

// Fetching replies and likes for a batch of casts that we received
// Create a clas fore getting reactions and replies
// Create a class for getting reactions and replies
const reactionsDataProcessor = new FarcasterReactionsDataProcessor(
this.fid
);
Expand All @@ -163,16 +163,20 @@ export class FarcasterHistoricalDataProcessor {
}

private async addBatchToDatabase(batch: CastInfoProps[]) {
// Implement the logic to add the batch to the database
// This is a placeholder implementation
console.log(`Adding ${batch.length} casts to the database`);
console.log(
`🚀 Adding batch with the ${batch.length} length to the database`
);
// await DatabaseClient.addCasts(batch);

try {
this.dbManager.addHistoricalData(this.fid, batch);
await this.dbManager.addHistoricalData(this.fid, batch);
console.log(`🚀 Added batch with the length: ${batch.length}`);
} catch (error) {
console.log("Error while adding info to database...");
}

// Close the database connection after adding all user's historical data
// this.dbManager.close();
}

public getHistoricalData() {
Expand All @@ -183,48 +187,8 @@ export class FarcasterHistoricalDataProcessor {
return this.userDataWithReactions;
}

// public async fetchHistoricalData() {
// let response;
// let firstMessageTimestamp: Date;
// let pageToken: string = "";

// do {
// // Fetching casts: last relevant frist
// response = await axios.get(
// `${process.env.NEXT_PUBLIC_NODE_ADDRESS}/v1/castsByFid?fid=${this.fid}&reverse=1&pageSize=800&pageToken=${pageToken}`
// );

// const userCasts = response.data.messages;
// pageToken = response.data.nextPageToken;

// // Getting the last cast in the response to understand the latest date of the cast
// const latestCastDetails = userCasts[userCasts.length - 1].data; // Assuming the first message is at the start of the array

// const humanReadableDate = farcasterTimestampToHumanReadable(
// latestCastDetails.timestamp
// ); // ConvertFarcaster timestamp to human readable date

// this.userHistoricalData = [...this.userHistoricalData, ...userCasts];

// firstMessageTimestamp = new Date(humanReadableDate);

// // console.log(userCasts);
// console.log(
// "User's casts batches timestamps (latest cast timestamp / period end date): ",
// firstMessageTimestamp,
// this.periodStartDate
// );
// // break;
// } while (firstMessageTimestamp >= this.periodStartDate);

// console.log(
// "Historical data is fetched. the number of rows:",
// this.userHistoricalData.length
// );

// // Set historical user data for further processing and receiving reactions info
// this.reactionsDataProcessor.setUserCasts(this.userHistoricalData);
// }
// TODO: Prepare the method that can fetch user's fids for a specific period from a node
public fetchFidFollowers() {}

// The method that will get the comparison analytics for different periods: 7 days, 14 days, 30 days
// public async getComparisonAnalytics() {
Expand Down
5 changes: 5 additions & 0 deletions src/utils/v2/dataProcessors/farcasterReactionsProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ export class FarcasterReactionsDataProcessor {
return null;
}

// Don't process replies of a user
if (cast.data.castAddBody?.parentCastId) {
return null;
}

const castHash: string = cast.hash;

const castText: string = cast.data.castAddBody.text; // Should process the case if text if not available here ???
Expand Down
Loading

0 comments on commit 0590ebc

Please sign in to comment.