Skip to content

Commit

Permalink
fix discontinuity in HAVE range seeds send to new peers
Browse files Browse the repository at this point in the history
  • Loading branch information
slugalisk committed Mar 17, 2019
1 parent bb62141 commit a28905f
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 129 deletions.
6 changes: 3 additions & 3 deletions src/DiagnosticMenu/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class SwarmState extends Component {
},
{
key: 'chunkRate',
value: formatNumber(scheduler.chunkRate.value()),
value: formatNumber(scheduler.chunkRate.value() * 1000) + '/s',
},
];

Expand Down Expand Up @@ -115,11 +115,11 @@ class PeerStateTable extends Component {
},
{
key: 'wasteRate',
value: formatNumber(peerState.wasteRate.value()),
value: formatNumber(peerState.wasteRate.value() * 1000) + '/s',
},
{
key: 'chunkRate',
value: formatNumber(peerState.chunkRate.value()),
value: formatNumber(peerState.chunkRate.value() * 1000) + '/s',
},
{
key: 'ledbat.baseDelay',
Expand Down
8 changes: 5 additions & 3 deletions src/Test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ const App = () => {
useEffect(() => {
const injector = new ChunkedWriteStreamInjector();

injector.on('publish', ({swarm}) => {
injector.on('publish', ({injector: {swarm}}) => {
setSwarmUri(swarm.uri);
server.ppsppClient.publishSwarm(swarm);
});

injector.on('unpublish', ({swarm}) => {
injector.on('unpublish', ({injector: {swarm}}) => {
server.ppsppClient.unpublishSwarm(swarm);
});

injector.start();
injector.start({
bitRate: 9000000,
});

return () => injector.stop();
}, []);
Expand Down
6 changes: 3 additions & 3 deletions src/bitarray.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ export default class BitArray {
}

min() {
for (let i = this.offset; i <= this.offset + this.capacity; i += 8) {
for (let i = this.offset; i <= this.offset + this.values.length * 8; i += 8) {
if (this.values[this.getByteIndex(i)] !== 0) {
const firstBit = Math.floor(i / 8) * 8;
for (let j = firstBit; j < firstBit + 8; j ++) {
Expand All @@ -160,9 +160,9 @@ export default class BitArray {
}

max() {
for (let i = this.capacity + this.offset; i >= this.offset; i -= 8) {
for (let i = this.values.length * 8 + this.offset; i >= this.offset; i -= 8) {
if (this.values[this.getByteIndex(i)] !== 0) {
const lastBit = Math.ceil(i / 8) * 8 + 1;
const lastBit = Math.ceil((i + 1) / 8) * 8;
for (let j = lastBit; j > lastBit - 8; j --) {
if (this.get(j)) {
return j;
Expand Down
21 changes: 13 additions & 8 deletions src/bitarray.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,12 @@ it('setRange with large range when wrapping', () => {

b.setRange(6528, 6592);
expect(b.toValueArray()).toEqual((new Array(64).fill()).map((_, i) => i + 6528));
});

it('setRange some other shit', () => {
let b = new BitArray(6563);

b.setRange(52544, 52608);
expect(b.toValueArray()).toEqual((new Array(64).fill()).map((_, i) => i + 52544));
});

it ('get', () => {
it('get', () => {
const b = new BitArray(64);

b.set(1);
Expand All @@ -124,7 +120,7 @@ it ('get', () => {
expect(b.get(100000)).toEqual(true);
});

it ('min', () => {
it('min', () => {
const b = new BitArray(64);

b.set(1);
Expand All @@ -140,7 +136,7 @@ it ('min', () => {
expect(b.min()).toEqual(63);
});

it ('max', () => {
it('max', () => {
const b = new BitArray(64);

b.set(1);
Expand All @@ -150,7 +146,16 @@ it ('max', () => {
expect(b.max()).toEqual(100000);
});

it ('getIndexValue', () => {
it('max after wrapping with odd capacity', () => {
const b = new BitArray(1641);

for (let i = 1; i < 10000; i += 100) {
b.set(i, true);
expect(b.max()).toEqual(i);
}
});

it('getIndexValue', () => {
const b = new BitArray(64);

expect(b.getIndexValue(0, 0)).toEqual(0);
Expand Down
2 changes: 1 addition & 1 deletion src/chunkedStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class ChunkedWriteStreamInjector extends EventEmitter {
name = 'chunked-stream',
bitRate = 3500000,
...injectorOptions
}) {
} = {}) {
this.name = name;

const data = Buffer.alloc(bitRate / 8);
Expand Down
20 changes: 0 additions & 20 deletions src/dht.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,6 @@ it('dht clients can send and receive messages', async () => {
))));
});

it('dht clients can process messages in busy clusters', async () => {
jest.setTimeout(10000);

const indices = new Array(20).fill(0).map((_, i) => i);
const pairs = indices.reduce((pairs, src) => pairs.concat(indices.filter(i => i !== src).map(dst => ({src, dst}))), []);

const server = new Server();
const clients = await Promise.all(indices.map((_, i) => new Promise(
resolve => setTimeout(() => resolve(Client.create(new ConnManager(server))), 50 * i)),
));
const dhtClients = clients.map(({dhtClient}) => dhtClient);

dhtClients.forEach(client => client.on('receive.test', ({callback}) => callback()));

await new Promise(resolve => setTimeout(resolve, 1000))
.then(() => Promise.all(pairs.map(({src, dst}) => new Promise(
resolve => dhtClients[src].send(dhtClients[dst].id, 'test', {src, dst}, resolve),
))));
});

it('dht clients can respond to messages via callbacks', async () => {
const indices = new Array(3).fill(0).map((_, i) => i);
const pairs = indices.reduce((pairs, src) => pairs.concat(indices.filter(i => i !== src).map(dst => ({src, dst}))), []);
Expand Down
6 changes: 5 additions & 1 deletion src/ppspp/injector.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export default class Injector {
this.chunksPerSignature = chunksPerSignature;
this.inputBuffer = [];
this.inputBufferSize = 0;
this.outputResult = Promise.resolve();
}

appendData(data) {
Expand Down Expand Up @@ -62,7 +63,10 @@ export default class Injector {
chunks.push(buf.slice(offset, offset + this.chunkSize));
}

this.swarm.contentIntegrity.appendSubtree(chunks).then(subtree => {
this.outputResult = Promise.all([
this.swarm.contentIntegrity.appendSubtree(chunks),
this.outputResult,
]).then(([subtree]) => {
this.swarm.chunkBuffer.setRange(subtree.rootAddress, chunks);
this.swarm.scheduler.markChunksLoaded(subtree.rootAddress);
});
Expand Down
Loading

0 comments on commit a28905f

Please sign in to comment.