Skip to content

Commit e3415b8

Browse files
committed
fix send
1 parent 89a7f6e commit e3415b8

File tree

5 files changed

+51
-25
lines changed

5 files changed

+51
-25
lines changed

.env

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@ AMQHOST=localhost
66
AMQPORT=61613
77
AMQUSERNAME=admin
88
AMQPASSWORD=admin
9-
AMQDESTINATIONNAME=my-topic
9+
AMQDESTINATIONNAME=topic
1010
AMQSUBSCRIPTIONNAME=sub-user

src/context.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ export const user: Attributes = {
4242
export interface ApplicationContext {
4343
handle: (data: User, header?: StringMap) => Promise<number>;
4444
read: (handle: (data: User, header?: StringMap) => Promise<number>) => void;
45+
write: (data: User) => Promise<boolean>;
4546
health: HealthController;
4647
}
4748

@@ -56,7 +57,7 @@ export function createContext(db: Db, client: Client, config: Config): Applicati
5657
const writer = new ActiveMQWriter<User>(client, config.destinationName, config.subscriptionName);
5758
const retryService = new RetryService<User, boolean>(writer.write, log, log);
5859
const handler = new Handler<User, boolean>(retryWriter.write, validator.validate, retries, errorHandler.error, log, log, retryService.retry, 3, 'retry');
59-
const ctx: ApplicationContext = { handle: handler.handle, read: subscriber.subscribe, health };
60+
const ctx: ApplicationContext = { handle: handler.handle, read: subscriber.subscribe, health, write: writer.write };
6061
return ctx;
6162
}
6263
export function log(msg: any): void {

src/index.ts

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
import { json } from 'body-parser';
2-
import dotenv from 'dotenv';
3-
import express from 'express';
4-
import http from 'http';
5-
import { connectToDb } from 'mongodb-extension';
6-
import { createContext } from './context';
7-
import { ActiveMQConnection, Config } from './services/activemq';
1+
import { json } from "body-parser";
2+
import dotenv from "dotenv";
3+
import express from "express";
4+
import http from "http";
5+
import { connectToDb } from "mongodb-extension";
6+
import { createContext } from "./context";
7+
import { ActiveMQConnection, Config } from "./services/activemq";
8+
import { getBody } from "logger-core";
9+
810
// import { SendController } from './services/activemq/send';
911
// import { printData, retry } from './services/pubsub/retry';
1012

@@ -26,8 +28,15 @@ const amqSubscriptionName = process.env.AMQSUBSCRIPTIONNAME;
2628
app.use(json());
2729

2830
connectToDb(`${mongoURI}`, `${mongoDB}`).then(async (db) => {
29-
if (!amqhost || !amqport || !amqUsername || !amqPassword || !amqDestinationName || !amqSubscriptionName) {
30-
throw new Error('config wrong!');
31+
if (
32+
!amqhost ||
33+
!amqport ||
34+
!amqUsername ||
35+
!amqPassword ||
36+
!amqDestinationName ||
37+
!amqSubscriptionName
38+
) {
39+
throw new Error("config wrong!");
3140
}
3241
const config: Config = {
3342
host: amqhost,
@@ -42,13 +51,26 @@ connectToDb(`${mongoURI}`, `${mongoDB}`).then(async (db) => {
4251
const ctx = createContext(db, client, config);
4352
ctx.read(ctx.handle);
4453

45-
app.get('/health', ctx.health.check);
46-
app.post('/send', (req, res) => {
47-
ctx.handle(req.body)
48-
res.writeHead(200, {'Content-Type': 'application/json'});
49-
res.end(JSON.stringify({message: 'message was produced'}));
50-
});
51-
http.createServer(app).listen(port, () => {
52-
console.log('Start server at port ' + port);
53-
});
54+
http
55+
.createServer((req, res) => {
56+
if (req.url === "/health") {
57+
ctx.health.check(req, res);
58+
} else if (req.url === "/send") {
59+
getBody(req).then((body: any) => {
60+
ctx
61+
.write(JSON.parse(body))
62+
.then(() => {
63+
res.writeHead(200, { "Content-Type": "application/json" });
64+
res.end(JSON.stringify({ message: "message was produced" }));
65+
})
66+
.catch((err) => {
67+
res.writeHead(500, { "Content-Type": "application/json" });
68+
res.end(JSON.stringify({ error: err }));
69+
});
70+
}).catch(err => console.log(err));
71+
}
72+
})
73+
.listen(port, () => {
74+
console.log("Start server at port " + port);
75+
});
5476
});

src/services/activemq/subscriber.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ export class ActiveMQSubscriber<T> {
3232
subscribe(handle: (data: T, attributes?: StringMap) => Promise<number>): void {
3333
const prefix = this.prefix && this.prefix.length > 0 ? this.prefix : '/';
3434
const subscribeHeaders = {
35-
'destination': `${this.destinationName}${prefix}${this.subscriptionName}`,
35+
'destination': `${this.destinationName}${prefix}${this.subscriptionName}`,
3636
'ack': this.ackMode,
3737
'subscription-type': 'ANYCAST'
3838
};

src/services/activemq/writer.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,19 @@ export class ActiveMQWriter<T> {
2727
return new Promise((resolve, reject) => {
2828
const frame = this.client.send(sendHeaders);
2929
if (this.log) {
30-
this.log('produce send data : ' + JSON.stringify(message));
30+
this.log('produce send data : ' + JSON.stringify(message.data));
3131
}
32-
const result = frame.write(JSON.stringify(message));
32+
const result = frame.write(JSON.stringify(message.data));
3333
frame.end((err: any) => {
3434
if (err) {
3535
console.log('Error activemq: ', err);
36-
reject(err);
36+
reject(err);
3737
}
3838
});
3939
resolve(result);
40-
});
40+
})
41+
42+
43+
;
4144
}
4245
}

0 commit comments

Comments
 (0)