Skip to content

Commit

Permalink
Refactor socket (#3498)
Browse files Browse the repository at this point in the history
* Refactor socket

* rename, fix tests

* Optimize code, fix tests[2]

* fix tests[3]

* Optimize code [2]

* refactor SocketAddress

* fix

* fix: fix some build errors

* Optimize Socket::connect/accept

* fix tests

* Optimize code

* fix tests

* Optimize naming

* fix tests

* Optimize code

* add SwooleG.socket_recv_timeout

* remove error msg, fix test

* merge timeout in coroutine::Socket & network::Socket

* fix tests

* fix tests[2]

* fix tests[2]

Co-authored-by: codinghuang <2812240764@qq.com>
  • Loading branch information
matyhtf and huanghantao authored Jul 28, 2020
1 parent d6add5e commit e500f96
Show file tree
Hide file tree
Showing 74 changed files with 1,236 additions and 1,229 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*.pid
*.gcno
*.gcov
*.gcda
*.info
/Debug/*
modules/*
/.deps
Expand Down Expand Up @@ -99,4 +101,4 @@ core-tests/samples/CMakeCache\.txt
core-tests/samples/bin/
core-tests/samples/CMakeFiles/
/out
coverage.info
/gcov_result
9 changes: 5 additions & 4 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,11 @@ if test "$PHP_SWOOLE" != "no"; then
CFLAGS="$CFLAGS -fsanitize=address -fno-omit-frame-pointer"
CXXFLAGS="$CXXFLAGS -fsanitize=address -fno-omit-frame-pointer"
fi

if test "$PHP_GCOV" != "no"; then
PHP_DEBUG=1
CFLAGS="$CFLAGS -fprofile-arcs -ftest-coverage"
CXXFLAGS="$CXXFLAGS -fprofile-arcs -ftest-coverage"
CFLAGS="$CFLAGS -fprofile-arcs -ftest-coverage -lgcov"
CXXFLAGS="$CXXFLAGS -fprofile-arcs -ftest-coverage -lgcov"
fi

if test "$PHP_TRACE_LOG" != "no"; then
Expand Down Expand Up @@ -407,6 +407,7 @@ if test "$PHP_SWOOLE" != "no"; then
fi

swoole_source_file=" \
php_swoole.cc \
php_swoole_cxx.cc \
src/core/base.cc \
src/core/channel.cc \
Expand Down Expand Up @@ -436,6 +437,7 @@ if test "$PHP_SWOOLE" != "no"; then
src/memory/ring_buffer.cc \
src/memory/shared_memory.cc \
src/memory/table.cc \
src/network/address.cc \
src/network/client.cc \
src/network/dns.cc \
src/network/socket.cc \
Expand Down Expand Up @@ -482,7 +484,6 @@ if test "$PHP_SWOOLE" != "no"; then
src/wrapper/event.cc \
src/wrapper/server.cc \
src/wrapper/timer.cc \
swoole.cc \
swoole_async_coro.cc \
swoole_atomic.cc \
swoole_channel_coro.cc \
Expand Down
4 changes: 2 additions & 2 deletions core-tests/include/httplib_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1018,8 +1018,8 @@ class CoSocketStream : public detail::SocketStream {
void get_remote_ip_and_port(std::string &ip, int &port) const {
swSocketAddress sa;
sock_->getpeername(&sa);
ip = std::string(swSocket_get_ip(sock_->get_type(), &sa));
port = swSocket_get_port(sock_->get_type(), &sa);
ip = std::string(sa.get_ip());
port = sa.get_port();
}

private:
Expand Down
16 changes: 8 additions & 8 deletions core-tests/src/_lib/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ int Server::send(int session_id, const void *data, uint32_t length) {
return serv.send(&serv, session_id, data, length);
}

ssize_t Server::sendto(swSocketAddress *address, const char *__buf, size_t __n, int server_socket) {
char ip[256];
uint16_t port;

inet_ntop(AF_INET, (void *) &address->addr.inet_v4.sin_addr, ip, sizeof(ip));
port = ntohs(address->addr.inet_v4.sin_port);

return swSocket_udp_sendto(server_socket, ip, port, __buf, __n);
ssize_t Server::sendto(swSocketAddress *address, const char *__buf, size_t __n, int server_socket_fd) {
network::Socket *server_socket;
if (server_socket_fd < 0) {
server_socket = serv.udp_socket_ipv6 ? serv.udp_socket_ipv6 : serv.udp_socket_ipv4;
} else {
server_socket = serv.get_server_socket(server_socket_fd);
}
return server_socket->sendto(address, __buf, __n);
}

int Server::close(int session_id, int reset) {
Expand Down
141 changes: 123 additions & 18 deletions core-tests/src/network/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,138 @@

#include "test_core.h"

TEST(socket, swSocket_unix_sendto) {
int fd1, fd2, ret;
struct sockaddr_un un1, un2;
char sock1_path[] = "/tmp/udp_unix1.sock";
char sock2_path[] = "/tmp/udp_unix2.sock";
char test_data[] = "swoole";
using namespace std;
using namespace swoole;

sw_memset_zero(&un1, sizeof(struct sockaddr_un));
sw_memset_zero(&un2, sizeof(struct sockaddr_un));
char test_data[] = "hello swoole, hello world, php is best";

un1.sun_family = AF_UNIX;
un2.sun_family = AF_UNIX;
TEST(socket, sendto) {
char sock1_path[] = "/tmp/udp_unix1.sock";
char sock2_path[] = "/tmp/udp_unix2.sock";

unlink(sock1_path);
unlink(sock2_path);

fd1 = socket(AF_UNIX, SOCK_DGRAM, 0);
strncpy(un1.sun_path, sock1_path, sizeof(un1.sun_path) - 1);
bind(fd1, (struct sockaddr *) &un1, sizeof(un1));
auto sock1 = make_socket(SW_SOCK_UNIX_DGRAM, SW_FD_DGRAM_SERVER, 0);
sock1->bind(sock1_path, nullptr);

fd2 = socket(AF_UNIX, SOCK_DGRAM, 0);
strncpy(un2.sun_path, sock2_path, sizeof(un2.sun_path) - 1);
bind(fd2, (struct sockaddr *) &un2, sizeof(un2));
auto sock2 = make_socket(SW_SOCK_UNIX_DGRAM, SW_FD_DGRAM_SERVER, 0);
sock2->bind(sock2_path, nullptr);

ret = swSocket_unix_sendto(fd1, sock2_path, test_data, strlen(test_data));
ASSERT_GT(ret, 0);
ASSERT_GT(sock1->sendto(sock2_path, 0, test_data, strlen(test_data)), 0);

char buf[1024] = {};
network::Address sa;
sa.type = SW_SOCK_UNIX_DGRAM;
ASSERT_GT(sock2->recvfrom(buf, sizeof(buf), 0, &sa), 0);
ASSERT_STREQ(test_data, buf);
ASSERT_STREQ(sa.get_ip(), sock1_path);

sock1->free();
sock2->free();
unlink(sock1_path);
unlink(sock2_path);
}

static void test_sendto(enum swSocket_type sock_type) {
int port1 = 0, port2 = 0;
const char *ip = sock_type == SW_SOCK_UDP ? "127.0.0.1" : "::1";

auto sock1 = make_socket(sock_type, SW_FD_DGRAM_SERVER, 0);
sock1->bind(ip, &port1);

auto sock2 = make_socket(sock_type, SW_FD_DGRAM_SERVER, 0);
sock2->bind(ip, &port2);

ASSERT_GT(sock1->sendto(ip, port2, test_data, strlen(test_data)), 0);

char buf[1024] = {};
network::Address sa;
sa.type = sock_type;
ASSERT_GT(sock2->recvfrom(buf, sizeof(buf), 0, &sa), 0);

ASSERT_STREQ(test_data, buf);
ASSERT_EQ(sa.get_port(), port1);
ASSERT_STREQ(sa.get_ip(), ip);

sock1->free();
sock2->free();
}

TEST(socket, sendto_ipv4) {
test_sendto(SW_SOCK_UDP);
}

TEST(socket, sendto_ipv6) {
test_sendto(SW_SOCK_UDP6);
}

TEST(socket, recvfrom_blocking) {
mutex m;
m.lock();

thread t1 ([&m](){
auto svr = make_server_socket(SW_SOCK_UDP, TEST_HOST, TEST_PORT);
network::Address addr;
char buf[1024] = {};
svr->set_nonblock();
m.unlock();
svr->recvfrom_blocking(buf, sizeof(buf), 0, &addr);
ASSERT_STREQ(test_data, buf);
svr->free();
});

thread t2([&m](){
m.lock();
auto cli = make_socket(SW_SOCK_UDP, SW_FD_STREAM_CLIENT, 0);
network::Address addr;
addr.assign(SW_SOCK_TCP, TEST_HOST, TEST_PORT);
ASSERT_EQ(cli->connect(addr), SW_OK);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
cli->send(test_data, sizeof(test_data), 0);
cli->free();
});

t1.join();
t2.join();
}

TEST(socket, sendfile_blocking) {
string file = test::get_root_path() + "/examples/test.jpg";
mutex m;
m.lock();

String *str = swoole_file_get_contents(file.c_str());

thread t1 ([&m, str](){
auto svr = make_server_socket(SW_SOCK_TCP, TEST_HOST, TEST_PORT);
m.unlock();
auto cli = svr->accept();
int len;
cli->recv_blocking(&len, sizeof(len), MSG_WAITALL);
int _len = ntohl(len);
ASSERT_EQ(_len, str->get_length());
ASSERT_LT(_len, 1024 * 1024);
std::unique_ptr<char[]> data(new char[_len]);
cli->recv_blocking(data.get(), _len, MSG_WAITALL);
ASSERT_STREQ(data.get(), str->value());
cli->free();
svr->free();
});

thread t2([&m, &file, str](){
m.lock();
auto cli = make_socket(SW_SOCK_TCP, SW_FD_STREAM_CLIENT, 0);
network::Address addr;
addr.assign(SW_SOCK_TCP, TEST_HOST, TEST_PORT);
ASSERT_EQ(cli->connect(addr), SW_OK);
int len = htonl(str->get_length());
cli->send(&len, sizeof(len), 0);
ASSERT_EQ(cli->sendfile_blocking(file.c_str(), 0, 0, -1), SW_OK);
cli->free();
});

t1.join();
t2.join();
delete str;
}
17 changes: 4 additions & 13 deletions examples/cpp/test_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ int my_onReceive(swServer *serv, swRecvData *req) {
swoole_rtrim(req_data, req->info.len);
swNotice("onReceive[%d]: ip=%s|port=%d Data=%s|Len=%d",
g_receive_count,
swSocket_get_ip(conn->socket_type, &conn->info),
swSocket_get_port(conn->socket_type, &conn->info),
conn->info.get_ip(),
conn->info.get_port(),
req_data,
req->info.len);

Expand All @@ -130,7 +130,7 @@ int my_onPacket(swServer *serv, swRecvData *req) {

packet = (swDgramPacket *) req->data;

int serv_sock = req->info.server_fd;
auto serv_socket = serv->get_server_socket(req->info.server_fd);

if (packet->socket_type == SW_SOCK_UDP) {
inet_ntop(AF_INET, &packet->socket_addr.addr.inet_v4.sin_addr, address, sizeof(address));
Expand All @@ -152,16 +152,7 @@ int my_onPacket(swServer *serv, swRecvData *req) {
char resp_data[SW_IPC_BUFFER_SIZE];
int n = sw_snprintf(resp_data, SW_IPC_BUFFER_SIZE, "Server: %.*s", length, data);

if (packet->socket_type == SW_SOCK_UDP) {
ret = swSocket_udp_sendto(serv_sock, address, port, resp_data, n);
} else if (packet->socket_type == SW_SOCK_UDP6) {
ret = swSocket_udp_sendto6(serv_sock, address, port, resp_data, n);
} else if (packet->socket_type == SW_SOCK_UNIX_DGRAM) {
ret = swSocket_unix_sendto(serv_sock, address, resp_data, n);
} else {
assert(0);
return 1;
}
ret = serv_socket->sendto(address, port, resp_data, n);

if (ret < 0) {
swNotice("send to client fail. errno=%d", errno);
Expand Down
34 changes: 15 additions & 19 deletions include/coroutine_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@

#include <vector>

#define SW_DEFAULT_SOCKET_DNS_TIMEOUT -1
#define SW_DEFAULT_SOCKET_CONNECT_TIMEOUT 1
#define SW_DEFAULT_SOCKET_READ_TIMEOUT -1
#define SW_DEFAULT_SOCKET_WRITE_TIMEOUT -1

namespace swoole {
enum swTimeout_type {
SW_TIMEOUT_DNS = 1 << 0,
Expand Down Expand Up @@ -58,12 +53,7 @@ struct EventBarrier {

class Socket {
public:
static double default_dns_timeout;
static double default_connect_timeout;
static double default_read_timeout;
static double default_write_timeout;

swSocket *socket = nullptr;
network::Socket *socket = nullptr;
int errCode = 0;
const char *errMsg = "";
std::string errString;
Expand All @@ -72,7 +62,7 @@ class Socket {
bool open_eof_check = false;
bool http2 = false;

swProtocol protocol = {};
Protocol protocol = {};
swSocks5_proxy *socks5_proxy = nullptr;
swHttp_proxy *http_proxy = nullptr;

Expand Down Expand Up @@ -180,8 +170,14 @@ class Socket {

bool getsockname(swSocketAddress *sa);
bool getpeername(swSocketAddress *sa);
const char *get_ip();
int get_port();

inline const char *get_ip() {
return socket->info.get_ip();
}

inline int get_port() {
return socket->info.get_port();
}

inline bool has_bound(const enum swEvent_type event = SW_EVENT_RDWR) { return get_bound_co(event) != nullptr; }

Expand Down Expand Up @@ -339,10 +335,10 @@ class Socket {
int bind_port = 0;
int backlog = 0;

double dns_timeout = default_dns_timeout;
double connect_timeout = default_connect_timeout;
double read_timeout = default_read_timeout;
double write_timeout = default_write_timeout;
double dns_timeout = network::Socket::default_dns_timeout;
double connect_timeout = network::Socket::default_connect_timeout;
double read_timeout = network::Socket::default_read_timeout;
double write_timeout = network::Socket::default_write_timeout;
swTimer_node *read_timer = nullptr;
swTimer_node *write_timer = nullptr;

Expand Down Expand Up @@ -419,7 +415,7 @@ class Socket {
if (timeout != 0 && !*timer_pp) {
enabled = true;
if (timeout > 0) {
*timer_pp = swoole_timer_add((long) (timeout * 1000), SW_FALSE, callback, socket_);
*timer_pp = swoole_timer_add((long) (timeout * 1000), false, callback, socket_);
return *timer_pp != nullptr;
} else // if (timeout < 0)
{
Expand Down
Loading

0 comments on commit e500f96

Please sign in to comment.