Skip to content

propose a v2 api #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@ or nice error messages etc.

``` js
var createServer = require('pull-net/server')
var connect = require('pull-net/client')

createServer(function (stream) {
pull(stream.source, stream.sink) //ECHO
}).listen(9999, '127.0.0.1')
}).listen(9999, function (err) {

var connect = require('pull-net/client')
connect(9999, function (err, stream) {

var stream = connect(9999, '127.0.0.1')
pull(
pull.once(new Buffer('hello tcp')),
stream,
pull.collect(console.log)
)

pull(
pull.once(new Buffer('hello tcp')),
stream,
pull.collect(console.log)
)
})
})
```

## Questions
Expand All @@ -47,3 +49,4 @@ what if a server was a stream of clients? does that really help?
## License

MIT

65 changes: 41 additions & 24 deletions client.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,51 @@ var TCP = process.binding('tcp_wrap').TCP
var TCPConnectWrap = process.binding('tcp_wrap').TCPConnectWrap
var pull = require('pull-stream')
var net = require('net')
var errno = require('util')._errnoException

module.exports = function (port, address, cb) {
cb = cb || function () {}
port |= 0
var clientHandle = new TCP()
var connect = new TCPConnectWrap()
var stream

connect.port = port
connect.address = address
connect.oncomplete = function afterConnect (err) {
if (err) return cb(new Error('error connecting 1:' + err))
cb && cb(null, stream)
}
var err
if (net.isIPv4) {
err = clientHandle.connect(connect, address, port)
} else {
err = clientHandle.connect6(connect, address, port)
var lookup = require('dns').lookup

module.exports = function (opts, host, cb) {
var port
if(!cb) cb = host, host = null
if('number' == typeof opts)
port = opts
else if(opts && 'object' === typeof opts) {
host = opts.host; port = opts.port
}
host = host || '0.0.0.0'

if (err) {
err = new Error('connection failed: ' + err)
return {
source: pull.error(err),
sink: function (read) { read(err, cb) }
var clientHandle = new TCP()

function connect(host, port, cb) {
var stream
var wrap = new TCPConnectWrap()
wrap.port = port
wrap.address = host
wrap.oncomplete = function afterConnect (err) {
if (err) cb(errno(err, 'connect'))
else cb(null, stream)
}

if(net.isIPv4(host))
err = clientHandle.connect(wrap, host, port)
else
err = clientHandle.connect6(wrap, host, port)

if(err) cb(errno(err))
else stream = Handle(clientHandle)
}
return Handle(clientHandle, cb)

var err

if(net.isIP(host))
connect(host, port, cb)
else
lookup(host, function (err, ip) {
if(err) cb(errno(err, 'dns-lookup'))
else connect(ip, port, cb)
})

}


13 changes: 10 additions & 3 deletions handle.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ module.exports = function (handle, cb) {
end.async = false
end.handle = handle
end.oncomplete = function (_, __, ___, err) {
cb(err)
cb && cb(err)
}
handle.shutdown(end)
}
Expand All @@ -42,7 +42,7 @@ module.exports = function (handle, cb) {
if (abort) {
shutdown(function (err) {
_cb(err || abort)
cb(err)
cb && cb(err)
})
}

Expand Down Expand Up @@ -71,7 +71,8 @@ module.exports = function (handle, cb) {
if (err) return read(err, cb)
else read(null, next)
}
if (handle.writeBuffer(write, data) === 0) {
handle.writeBuffer(write, data)
if (handle.writeQueueSize < HIGH) {
write.oncomplete = noop
read(null, next)
}
Expand All @@ -80,3 +81,9 @@ module.exports = function (handle, cb) {
}
}
}






3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

exports.connect = require('./client')
exports.createServer = require('./server')
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"devDependencies": {
"pull-stream": "^3.4.0",
"standard": "^7.1.2",
"stream-to-pull-stream": "^1.6.8"
"stream-to-pull-stream": "^1.7.1",
"tape": "^4.6.0"
},
"scripts": {
"test": "set -e; for t in test/*.js; do node $t; done",
Expand Down
58 changes: 39 additions & 19 deletions server.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,56 @@
var TCP = process.binding('tcp_wrap').TCP
var net = require('net')
var Handle = require('./handle')
var errno = require('util')._errnoException

function noop () {}
var lookup = require('dns').lookup

module.exports = function (onConnect) {
var server = new TCP()

var host, port
return {
listen: function (port, addr, cb) {
cb = cb || noop
var err
if (net.isIPv6(addr)) {
err = server.bind6(addr, port)
} else {
err = server.bind(addr, port)
listen: function (opts, cb) {
if('object' == typeof opts) {
host = opts.host || '0.0.0.0'
port = opts.port | 0
}

if (err) {
server.close()
cb(err)
return
else if('number' === typeof opts) {
port = opts
host = '0.0.0.0'
}
cb = cb || function (err) { if(err) throw err }
var err

// 512 connections allowed in backlog
server.listen(511)
function listen (host, port, cb) {
if(net.isIPv4(host))
err = server.bind(host, port)
else
err = server.bind6(host, port)

server.onconnection = function (err, client) {
if (err) {
return console.error(new Error('error connected:' + err))
server.close()
cb(errno(err))
return
}

// 512 connections allowed in backlog
server.listen(511)

server.onconnection = function (err, client) {
if (err) console.error(errno(err))
else onConnect(Handle(client))
}
onConnect(Handle(client, noop))
}

if (net.isIP(host))
listen(host, port, cb)
else {
lookup(host, function (err, ip) {
if(err) cb(err)
else listen(ip, port, cb)
})
}

return server
},
address: function () {
Expand All @@ -51,3 +70,4 @@ module.exports = function (onConnect) {
}
}
}

42 changes: 42 additions & 0 deletions test/backpressure.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
var crypto = require('crypto')
var net = require('..')
var pull = require('pull-stream')
var Looper = require('pull-looper')
var port = 9988, c = 0

net.createServer(function (stream) {
pull(stream, stream)
// pull(stream.source, pull.drain(console.log))
}).listen(port)

net.connect(port, 'localhost', function (err, stream) {
if(err) throw err

pull(
pull.infinite(function () {
var d = crypto.randomBytes(1024*1)
c += d.length
// console.log(c)
return d
}),
Looper,
stream.sink
)

// stream.source(null, function (err, data) {
// if(err) console.log('ended', err)
// else console.log('data', data)
// })


var t = 0

pull(stream.source, pull.asyncMap(function (d, cb) {
setTimeout(function () {
cb(null, d)
}, 100)
}), pull.drain(function (e) {
console.log(e.length, t += e.length)
}))
})

85 changes: 85 additions & 0 deletions test/client-echo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
var pull = require('pull-stream')
var tape = require('tape')

var connect = require('../client')
//var createServer = require('../server')
var net = require('net')
var toPull = require('stream-to-pull-stream')

var server

tape('setup server', function (t) {
server = net.createServer(function (stream) {
stream = toPull.duplex(stream)
pull(
stream.source,
pull.through(function (data) {
console.log('THROUGH', data)
}, function (err) {
console.log('END', err)
}),
stream.sink)
}).listen(9988, function () {
t.end()
})
})

console.log('server', server)

// setTimeout(function () {

function echoTest(args) {
tape('connect to echo server with:'+JSON.stringify(args), function (t) {
connect.apply(null, args.concat(function (err, stream) {
//(9988, '127.0.0.1')
if(err) throw err

var input = [new Buffer('HELLO THERE')]

pull(
pull.values(input),
stream,
pull.collect(function (err, ary) {
if(err) throw err
t.deepEqual(ary, input)
t.end()
})
)
}))
})
}


//, function (err, stream) {
// if(err) throw err
// console.log(err, stream)
// pull(
// pull.values([new Buffer('HELLO THERE')]),
// stream,
// pull.drain(console.log, function () {
// console.log('END')
// server.close()
// })
// )
// })
// },100)

echoTest([9988, '127.0.0.1'])
echoTest([9988, 'localhost'])
//echoTest([9988, '127.0.0.1'])
echoTest([9988, '0.0.0.0'])
echoTest([9988, '::'])
echoTest([{port: 9988}])

tape('close server', function (t) {
server.close()
t.end()
})








3 changes: 3 additions & 0 deletions test/echo-backpressure.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@


var net = require('net')
Loading