开发基于字节流协议的服务器(tcp,ssl,unix)
#include " net4cxx/net4cxx.h"
using namespace net4cxx ;
class Echo : public Protocol , public std ::enable_shared_from_this<Echo> {
public:
void dataReceived (Byte *data, size_t length) override {
write (data, length);
}
};
class EchoFactory : public Factory {
public:
ProtocolPtr buildProtocol (const Address &address) override {
return std::make_shared<Echo>();
}
};
int main (int argc, char **argv) {
NET4CXX_PARSE_COMMAND_LINE (argc, argv);
Reactor reactor;
TCPServerEndpoint endpoint (&reactor, " 28001" );
endpoint.listen (std::make_shared<EchoFactory>());
reactor.run ();
return 0 ;
}
以上实现了一个最简单的例子,将客户端发送过来的信息回写;
将TCPServerEndPoint替换成SSLServerEndPoint将会启动一个基于sslSocket的服务器;
将TCPServerEndPoint替换成UNIXServerEndPoint将会启动一个基于unixSocket的服务器;
后面会展示一种更好的服务器启动方式,切换协议无需修改任何代码.
class Echo : public Protocol , public std ::enable_shared_from_this<Echo> {
public:
void connectionMade () override {
NET4CXX_LOG_INFO (gAppLog , " Connection made" );
}
void connectionLost (std::exception_ptr reason) override {
NET4CXX_LOG_INFO (gAppLog , " Connection lost" );
}
void dataReceived (Byte *data, size_t length) override {
write (data, length);
loseConnection ();
}
};
连接建立时将会回调connectionMade;
连接销毁时将会回调connectionLost;
调用loseConnection安全的关闭连接.
int main (int argc, char **argv) {
NET4CXX_PARSE_COMMAND_LINE (argc, argv);
Reactor reactor;
serverFromString (&reactor, " tcp:28001" )->listen (std::make_shared<EchoFactory>());
serverFromString (&reactor, " ssl:28002:privateKey=test.key:certKey=test.crt" )->listen (std::make_shared<EchoFactory>());
serverFromString (&reactor, " unix:/var/foo/bar" )->listen (std::make_shared<EchoFactory>());
reactor.run ();
return 0 ;
}
以上代码展示了在一个进程内同时启动了一个tcp服务器,ssl服务器,unix服务器;
观察服务器的启动方式,发现只有字符串参数的值不同,如果我们从配置中读取这个字符串的话,切换协议无须更改一行代码,这也是推荐的方式;
开发基于字节流协议的客户端(tcp,ssl,unix)
#include " net4cxx/net4cxx.h"
using namespace net4cxx ;
class WelcomeMessage : public Protocol , public std ::enable_shared_from_this<WelcomeMessage> {
public:
void connectionMade () override {
write (" Hello server, I am the client!" );
loseConnection ();
}
};
class WelcomeFactory : public ClientFactory {
public:
std::shared_ptr<Protocol> buildProtocol (const Address &address) override {
return std::make_shared<WelcomeMessage>();
}
};
int main (int argc, char **argv) {
NET4CXX_PARSE_COMMAND_LINE (argc, argv);
Reactor reactor;
reactor.connectTCP (" localhost" , " 28001" , std::make_shared<WelcomeFactory>());
reactor.run ();
return 0 ;
}
以上实现了一个最简单的例子,客户端向服务器打了个招乎,随后关闭连接;
将connectTCP替换成connectSSL或者connectUNIX能分别建立ssl或者unix客户端连接;
与服务器一样支持从字符串构建客户端连接,调用clientFromString即可,不再赘述;
int main (int argc, char **argv) {
NET4CXX_PARSE_COMMAND_LINE (argc, argv);
Reactor reactor;
reactor.connectTCP (" localhost" , " 28001" , std::make_shared<OneShotFactory>(std::make_shared<WelcomeMessage>()));
reactor.run ();
return 0 ;
}
使用内置的OneShotFactory可以指定总是返回某个固定的protocol,用户也无需创建自己的factory,这在服务器之间互联很有用;
class WelcomeFactory : public ReconnectingClientFactory {
public:
std::shared_ptr<Protocol> buildProtocol (const Address &address) override {
resetDelay ();
return std::make_shared<WelcomeMessage>();
}
};
当继承自内建的ReconnectingClientFactory,连接断开时,会自动启用指数避让原则进行重连;
#include " net4cxx/net4cxx.h"
using namespace net4cxx ;
class Echo : public DatagramProtocol , public std ::enable_shared_from_this<Echo> {
public:
void datagramReceived (Byte *datagram, size_t length, Address address) override {
std::string s ((char *)datagram, (char *)datagram + length);
NET4CXX_LOG_INFO (gAppLog , " Datagram received: %s From %s:%u" , s.c_str (), address.getAddress ().c_str (),
address.getPort ());
write (datagram, length, address);
}
};
int main (int argc, char **argv) {
NET4CXX_PARSE_COMMAND_LINE (argc, argv);
Reactor reactor;
reactor.listenUDP (28002 , std::make_shared<Echo>());
reactor.run ();
return 0 ;
}
以上实现了一个最简单的echo服务器;
将listenUDP调用替换成listenUNIXDatagram可以使用unix域数据报套接字;
#include " net4cxx/net4cxx.h"
using namespace net4cxx ;
class EchoClient : public DatagramProtocol , public std ::enable_shared_from_this<EchoClient> {
public:
void startProtocol () override {
write (" Hello boy!" );
}
void datagramReceived (Byte *datagram, size_t length, Address address) override {
std::string s ((char *)datagram, (char *)datagram + length);
NET4CXX_LOG_INFO (gAppLog , " Datagram received: %s From %s:%u" , s.c_str (), address.getAddress ().c_str (),
address.getPort ());
}
};
int main (int argc, char **argv) {
NET4CXX_PARSE_COMMAND_LINE (argc, argv);
Reactor reactor;
reactor.connectUDP (" 127.0.0.1" , 28002 , std::make_shared<MyProtocol>());
reactor.connectUNIXDatagram (" /data/foo/bar" , std::make_shared<MyProtocol>(), 8192 , " /data/foo/bar2" );
reactor.run ();
return 0 ;
}
以上实现了一个最简单的udp client;
将connectUDP调用替换成connectUNIXDatagram可以使用unix域数据报套接字;
#include " net4cxx/net4cxx.h"
using namespace net4cxx ;
class BroadcastServerFactory : public WebSocketServerFactory ,
public std::enable_shared_from_this<BroadcastServerFactory> {
public:
using WebSocketServerFactory::WebSocketServerFactory;
ProtocolPtr buildProtocol (const Address &address) override ;
void registerClient (WebSocketServerProtocolPtr client) {
if (_clients.find (client) == _clients.end ()) {
NET4CXX_LOG_INFO (gAppLog , " register client %s" , client->getPeerName ());
_clients.insert (client);
}
}
void unregisterClient (WebSocketServerProtocolPtr client) {
if (_clients.find (client) != _clients.end ()) {
NET4CXX_LOG_INFO (gAppLog , " unregister client %s" , client->getPeerName ());
_clients.erase (client);
}
}
void broadcast (const std::string &msg) {
NET4CXX_LOG_INFO (gAppLog , " broacasting message '%s' .." , msg);
for (auto c: _clients) {
c->sendMessage (msg);
NET4CXX_LOG_INFO (gAppLog , " message sent to %s" , c->getPeerName ());
}
}
protected:
int _tickCount{0 };
std::set<WebSocketServerProtocolPtr> _clients;
};
class BroadcastServerProtocol : public WebSocketServerProtocol {
public:
void onOpen () override {
getFactory<BroadcastServerFactory>()->registerClient (getSelf<BroadcastServerProtocol>());
}
void onMessage (ByteArray payload, bool isBinary) override {
if (!isBinary) {
auto msg = StrUtil::format (" %s from %s" , BytesToString (payload), getPeerName ());
getFactory<BroadcastServerFactory>()->broadcast (msg);
}
}
void connectionLost (std::exception_ptr reason) override {
WebSocketServerProtocol::connectionLost (reason);
getFactory<BroadcastServerFactory>()->unregisterClient (getSelf<BroadcastServerProtocol>());
}
};
ProtocolPtr BroadcastServerFactory::buildProtocol (const Address &address) {
return std::make_shared<BroadcastServerProtocol>();
}
int main (int argc, char **argv) {
NET4CXX_PARSE_COMMAND_LINE (argc, argv);
Reactor reactor;
auto factory = std::make_shared<BroadcastServerFactory>(" ws://127.0.0.1:9000" );
listenWS (&reactor, factory);
reactor.run ();
return 0 ;
}
以上实现了一个具有广播功能的websocket服务器;
#include " net4cxx/net4cxx.h"
using namespace net4cxx ;
class BroadcastClientProtocol : public WebSocketClientProtocol {
public:
void onOpen () override {
sendHello ();
}
void onMessage (ByteArray payload, bool isBinary) override {
if (!isBinary) {
NET4CXX_LOG_INFO (gAppLog , " Text message received: %s" , BytesToString (payload));
}
}
void sendHello () {
sendMessage (" Hello from client!" );
reactor ()->callLater (2.0 , [this , self=shared_from_this ()](){
sendHello ();
});
}
};
class BroadcastClientFactory : public WebSocketClientFactory {
public:
using WebSocketClientFactory::WebSocketClientFactory;
ProtocolPtr buildProtocol (const Address &address) override {
return std::make_shared<BroadcastClientProtocol>();
}
};
int main (int argc, char **argv) {
NET4CXX_PARSE_COMMAND_LINE (argc, argv);
Reactor reactor;
auto factory = std::make_shared<BroadcastClientFactory>(" ws://127.0.0.1:9000" );
connectWS (&reactor, factory);
reactor.run ();
return 0 ;
}
以上实现了一个会自动定时发消息的websocket客户端
void run (bool installSignalHandlers=true );
installSignalHandlers: 是否安装信号退出处理器
template <typename CallbackT>
DelayedCall callLater (double deadline, CallbackT &&callback);
template <typename CallbackT>
DelayedCall callLater (const Duration &deadline, CallbackT &&callback);
deadline: 延迟多少秒触发
callback: 回调函数,满足签名void ()
template <typename CallbackT>
DelayedCall callAt (time_t deadline, CallbackT &&callback);
template <typename CallbackT>
DelayedCall callAt (const Timestamp &deadline, CallbackT &&callback);
deadline: 触发的绝对时间
callback: 回调函数,满足签名void ()
template <typename CallbackT>
void addCallback (CallbackT &&callback);
callback: 回调函数,满足签名void ()
template <typename CallbackT>
void addStopCallback (CallbackT &&callback);
callback: 回调函数,满足签名void ()
ListenerPtr listenTCP (const std::string &port, std::shared_ptr<Factory> factory, const std::string &interface={});
port: 端口号或服务名称
factory: 协议工厂
interface: 指定监听的ip地址或域名
ConnectorPtr connectTCP (const std::string &host, const std::string &port, std::shared_ptr<ClientFactory> factory,
double timeout=30.0 , const Address &bindAddress={});
host: 连接的服务器的ip地址或域名
port: 连接的服务器的端口或服务名
factory: 协议工厂
timeout: 连接超时时间
bindAddress: 为客户端套接字绑定一个指定的地址和端口
ListenerPtr listenSSL (const std::string &port, std::shared_ptr<Factory> factory, SSLOptionPtr sslOption,
const std::string &interface={});
port: 端口号或服务名称
factory: 协议工厂
sslOption: ssl选项
interface: 指定监听的ip地址或域名
ConnectorPtr connectSSL (const std::string &host, const std::string &port, std::shared_ptr<ClientFactory> factory,
SSLOptionPtr sslOption, double timeout=30.0 , const Address &bindAddress={});
host: 连接的服务器的ip地址或域名
port: 连接的服务器的端口或服务名
factory: 协议工厂
sslOption: ssl选项
timeout: 连接超时时间
bindAddress: 为客户端套接字绑定一个指定的地址和端口
DatagramConnectionPtr listenUDP (unsigned short port, DatagramProtocolPtr protocol, const std::string &interface=" " ,
size_t maxPacketSize=8192 , bool listenMultiple=false );
port: 绑定的端口号
protocol: 协议处理器
interfance: 绑定的ip地址
maxPacketSize: 接收数据报的最大尺寸
listenMultiple: 允许多个套接字绑定相同的地址
DatagramConnectionPtr connectUDP (const std::string &address, unsigned short port, DatagramProtocolPtr protocol,
size_t maxPacketSize=8192 , const Address &bindAddress={},
bool listenMultiple=false );
address: 连接的ip地址
port: 连接的端口号
protocol: 协议处理器
interfance: 绑定的ip地址
maxPacketSize: 接收数据报的最大尺寸
bindAddress: 为客户端套接字绑定一个指定的地址和端口
listenMultiple: 允许多个套接字绑定相同的地址
ListenerPtr listenUNIX (const std::string &path, std::shared_ptr<Factory> factory);
path: 监听的文件路经
factory: 协议工厂
ConnectorPtr connectUNIX (const std::string &path, std::shared_ptr<ClientFactory> factory, double timeout=30.0 );
path: 服务器的文件路经
factory: 协议工厂
timeout: 连接超时时间
DatagramConnectionPtr listenUNIXDatagram (const std::string &path, DatagramProtocolPtr protocol,
size_t maxPacketSize=8192 );
path: 绑定的文件路经
protocol: 协议处理器
maxPacketSize: 接收数据报的最大尺寸
DatagramConnectionPtr connectUNIXDatagram (const std::string &path, DatagramProtocolPtr protocol,
size_t maxPacketSize=8192 , const std::string &bindPath=" " );
path: 连接的服务器的文件路经
protocol: 协议处理器
maxPacketSize: 接收数据报的最大尺寸
bindPath: 绑定的文件路经(如果要接收数据,必须绑定一个路经)
template <typename CallbackT>
DelayedResolve resolve (const std::string &name, CallbackT &&callback);
name: 要解析的域名
callback: 回调函数,满足签名void (StringVector)
explicit Address (std::string address=" " , unsigned short port=0 );
address: ip地址或文件路经(unix)
port: 端口号, 对unix域地址无效
void setAddress (std::string &&address);
void setAddress (const std::string &address);
const std::string& getAddress () const
void setPort (unsigned short port);
unsigned short getPort () const ;
explicit operator bool () const ;
bool operator !() const ;
bool operator ==(const Address &lhs, const Address &rhs);
bool operator !=(const Address &lhs, const Address &rhs);
enum class SSLVerifyMode {
CERT_NONE ,
CERT_OPTIONAL ,
CERT_REQUIRED ,
}
CERT_NONE: 不启用认证
CERT_OPTIONAL: 启用认证
CERT_REQUIRED: 启用认证,对端没有证书时认证失败
explicit SSLParams (bool serverSide= false );
void setCertFile (const std::string &certFile);
const std::string& getCertFile () const ;
void setKeyFile (const std::string &keyFile);
const std::string& getKeyFile () const ;
void setPassword (const std::string &password);
const std::string& getPassword () const ;
void setVerifyMode (SSLVerifyMode verifyMode);
SSLVerifyMode getVerifyMode () const ;
void setVerifyFile (const std::string &verifyFile);
const std::string& getVerifyFile () const ;
void setCheckHost (const std::string &hostName);
const std::string& getCheckHost () const ;
bool isServerSide () const ;
bool isServerSide () const ;
static SSLOptionPtr create (const SSLParams &sslParams);
sslParams: 用于初始化sslContext的参数
virtual void startFactory ();
virtual void stopFactory ();
virtual ProtocolPtr buildProtocol (const Address &address) = 0;
class ClientFactory : public Factory ;
virtual void startedConnecting (ConnectorPtr connector);
virtual void clientConnectionFailed (ConnectorPtr connector, std::exception_ptr reason);
virtual void clientConnectionLost (ConnectorPtr connector, std::exception_ptr reason);
class OneShotFactory : public ClientFactory ;
explicit OneShotFactory (ProtocolPtr protocol);
protocol: 绑定指定的protocol对象
ReconnectingClientFactory
class ReconnectingClientFactory : public ClientFactory ;
double getMaxDelay () const
void setMaxDelay (double maxDelay);
int getMaxRetires () const ;
void setMaxRetries (int maxRetries);
virtual void connectionMade ();
virtual void dataReceived (Byte *data, size_t length) = 0;
virtual void connectionLost (std::exception_ptr reason);
void write (const Byte *data, size_t length);
void write (const ByteArray &data);
void write (const char *data);
void write (const std::string &data);
void setNoDelay (bool enabled);
bool getKeepAlive () const ;
void setKeepAlive (bool enabled);
std::string getLocalAddress () const ;
unsigned short getLocalPort () const ;
std::string getRemoteAddress () const ;
unsigned short getRemotePort () const ;
template <typename FactoryT>
std::shared_ptr<FactoryT> getFactory () const ;
virtual void startListening ()=0;
virtual void stopListening ()=0;
virtual void startConnecting ()=0;
virtual void stopConnecting ()=0;
explicit Endpoint (Reactor *reactor);
class ServerEndpoint : public Endpoint ;
virtual ListenerPtr listen (std::shared_ptr<Factory> protocolFactory) const = 0;
class TCPServerEndpoint : public ServerEndpoint ;
TCPServerEndpoint (Reactor *reactor, std::string port, std::string interface={});
reactor: 关联的反应器
port: 绑定的端口
interface: 绑定的地址
class SSLServerEndpoint : public ServerEndpoint ;
SSLServerEndpoint (Reactor *reactor, std::string port, SSLOptionPtr sslOption, std::string interface={});
reactor: 关联的反应器
sslOption: ssl选项
port: 绑定的端口
interface: 绑定的地址
class UNIXServerEndpoint : public ServerEndpoint ;
UNIXServerEndpoint (Reactor *reactor, std::string path);
reactor: 关联的反应器
path: 绑定的路径
class ClientEndpoint : public Endpoint ;
virtual ConnectorPtr connect (std::shared_ptr<ClientFactory> protocolFactory) const = 0;
class TCPClientEndpoint : public ClientEndpoint ;
TCPClientEndpoint (Reactor *reactor, std::string host, std::string port, double timeout=30.0 , Address bindAddress={});
reactor: 关联的反应器
host: 要连接的对端地址
port: 要连接的对端端口
timeout: 连接超时时间
bindAddress: 绑定的本地地址
class SSLClientEndpoint : public ClientEndpoint ;
SSLClientEndpoint (Reactor *reactor, std::string host, std::string port, SSLOptionPtr sslOption, double timeout=30.0 , Address bindAddress={});
reactor: 关联的反应器
host: 要连接的对端地址
port: 要连接的对端端口
sslOption: ssl上下文
timeout: 连接超时时间
bindAddress: 绑定的本地地址
class UNIXClientEndpoint : public ClientEndpoint ;
UNIXClientEndpoint (Reactor *reactor, std::string path, double timeout=30.0 );
reactor: 关联的反应器
path: 要连接的对端路经
timeout: 连接超时时间
// /
// / \param reactor
// / \param description
// / tcp:80
// / tcp:80:interface=127.0.0.1
// / ssl:443:privateKey=key.pem:certKey=crt.pem
// / unix:/var/run/finger
// / \return
ServerEndpointPtr serverFromString (Reactor *reactor, const std::string &description);
// /
// / \param reactor
// / \param description
// / tcp:host=www.example.com:port=80
// / tcp:www.example.com:80
// / tcp:host=www.example.com:80
// / tcp:www.example.com:port=80
// / ssl:web.example.com:443:privateKey=foo.pem:certKey=foo.pem
// / ssl:host=web.example.com:port=443:caCertsDir=/etc/ssl/certs
// / tcp:www.example.com:80:bindAddress=192.0.2.100
// / unix:path=/var/foo/bar:timeout=9
// / unix:/var/foo/bar
// / unix:/var/foo/bar:timeout=9
// / \return
ClientEndpointPtr clientFromString (Reactor *reactor, const std::string &description);
ConnectorPtr connectProtocol (const ClientEndpoint &endpoint, ProtocolPtr protocol);
endpoint: 要连接的对端
protocol: 指定的协议处理器
virtual void startProtocol ();
virtual void stopProtocol ();
virtual void datagramReceived (Byte *datagram, size_t length, Address address) = 0;
datagram: 数据报内容
length: 数据报长度
address: 对端地址
virtual void connectionRefused ();
virtual void connectionFailed (std::exception_ptr reason);
void write (const Byte *datagram, size_t length, const Address &address={});
void write (const ByteArray &datagram, const Address &address={});
void write (const char *datagram, const Address &address={});
void write (const std::string &datagram, const Address &address={});
datagram: 数据报内容
length: 数据报长度
address: 对端地址
void connect (const Address &address);
bool getBroadcastAllowed () const ;
void setBroadcastAllowed (bool enabled);
std::string getLocalAddress () const ;
unsigned short getLocalPort () const ;
std::string getRemoteAddress () const ;
unsigned short getRemotePort () const ;
class WebSocketProtocol : public Protocol , public std ::enable_shared_from_this<WebSocketProtocol>;
virtual void onMessage (ByteArray payload, bool isBinary);
payload: 消息的内容
isBinary: 消息是否二进制
virtual void onPing (ByteArray payload);
virtual void onPong (ByteArray payload);
virtual void onClose (bool wasClean, boost::optional<unsigned short > code, boost::optional<std::string> reason);
code: 可选的关闭状态码
reason: 可选的关闭原因
void sendMessage (const Byte *payload, size_t length, bool isBinary=false , size_t fragmentSize=0 , bool sync=false , bool doNotCompress=false );
void sendMessage (const ByteArray &payload, bool isBinary=false , size_t fragmentSize=0 , bool sync=false , bool doNotCompress=false );
void sendMessage (const char *payload, bool isBinary=false , size_t fragmentSize=0 , bool sync=false , bool doNotCompress=false );
void sendMessage (const std::string &payload, bool isBinary=false , size_t fragmentSize=0 , bool sync=false , bool doNotCompress=false );
payload: 消息的内容
length: 消息的长度
isBinary: 是否二进制
void sendPing (const Byte *payload, size_t length);
void sendPing (const ByteArray &payload);
void sendPing (const char *payload);
void sendPing (const std::string &payload);
payload: 附带的消息内容
length: 附带的消息长度
void sendPong (const Byte *payload, size_t length);
void sendPong (const ByteArray &payload);
void sendPong (const char *payload);
void sendPong (const std::string &payload);
payload: 附带的消息内容
length: 附带的消息长度
void sendClose (boost::optional<unsigned short > code=boost::none, boost::optional<std::string> reason=boost::none);
code: 可选的关闭状态码
reason: 可选的关闭原因
template <typename SelfT>
std::shared_ptr<SelfT> getSelf () const ;
template <typename SelfT>
std::shared_ptr<SelfT> getSelf ();
class WebSocketServerProtocol : public WebSocketProtocol ;
class WebSocketServerFactory : public Factory ;
WebSocketServerFactory (std::string url=" " , StringVector protocols={}, std::string version=" " , WebSocketHeaders headers={}, unsigned short externalPort=0 );
class WebSocketClientProtocol : public WebSocketProtocol ;
class WebSocketClientFactory : public ClientFactory ;
explicit WebSocketClientFactory (std::string url = " " , std::string origin = " " , StringVector protocols = {}, std::string useragent = " " , WebSocketHeaders headers = {}, std::string proxy = " " );
ListenerPtr listenWS (Reactor *reactor, std::shared_ptr<WebSocketServerFactory> factory, SSLOptionPtr sslOption=nullptr , const std::string &interface={});
reactor: 关联的反应器
factory: 创建WebSocketServerProtocol的工厂
sslOption: ssl选项
interface: 绑定的地址
ConnectorPtr connectWS (Reactor *reactor, std::shared_ptr<WebSocketClientFactory> factory, SSLOptionPtr sslOption=nullptr , double timeout=30.0 , const Address &bindAddress={});
reactor: 关联的反应器
factory: 创建WebSocketClientProtocol的工厂
sslOption: ssl选项
timeout: 超时值
bindAddress: 绑定的地址