Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced Http\Client, add write_func callback #5097

Merged
merged 3 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions examples/coroutine/http/write_func.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php
Co::set([
'trace_flags' => SWOOLE_TRACE_HTTP2,
'log_level' => 0,
]);
Co\run(function () {
$client = new Swoole\Coroutine\Http\Client('www.jd.com', 443, true);
$client->set(['write_func' => function($client, $data) {
var_dump(strlen($data));
}]);
$client->get('/');
var_dump(strlen($client->getBody()));
return 0;
});
22 changes: 22 additions & 0 deletions ext-src/php_swoole_cxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,28 @@ class CharPtr {
}
};

struct Callable {
zval zfunc;
zend_fcall_info_cache fcc;

Callable(zval *_zfunc) {
zfunc = *_zfunc;
Z_TRY_ADDREF_P(&zfunc);
}

bool is_callable() {
return zend_is_callable_ex(&zfunc, NULL, 0, NULL, &fcc, NULL);
}

bool call(uint32_t argc, zval *argv, zval *retval) {
return sw_zend_call_function_ex(&zfunc, &fcc, argc, argv, retval) == SUCCESS;
}

~Callable() {
Z_TRY_DELREF_P(&zfunc);
}
};

namespace function {
/* must use this API to call event callbacks to ensure that exceptions are handled correctly */
bool call(zend_fcall_info_cache *fci_cache, uint32_t argc, zval *argv, zval *retval, const bool enable_coroutine);
Expand Down
52 changes: 45 additions & 7 deletions ext-src/swoole_http_client_coro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,18 @@ class Client {
bool websocket_compression = false; // allow to compress websocket messages
bool accept_websocket_compression = false; // websocket server accepts compression
#endif
bool in_callback = false;
bool has_upload_files = false;

File *download_file = nullptr; // save http response to file
zend::String download_file_name; // unlink the file on error
zend_long download_offset = 0;
bool has_upload_files = false;

/* safety zval */
zval _zobject;
zval *zobject = &_zobject;
zval zsocket;
zend::Callable *write_func = nullptr;
String *tmp_write_buffer = nullptr;
bool connection_close = false;

Expand Down Expand Up @@ -474,15 +477,25 @@ static int http_parser_on_headers_complete(swoole_http_parser *parser) {

static int http_parser_on_body(swoole_http_parser *parser, const char *at, size_t length) {
Client *http = (Client *) parser->data;
if (http->write_func) {
zval zargv[2];
zargv[0] = *http->zobject;
ZVAL_STRINGL(&zargv[1], at, length);
http->in_callback = true;
bool success = http->write_func->call(2, zargv, nullptr);
http->in_callback = false;
zval_ptr_dtor(&zargv[1]);
return success ? 0 : -1;
}
#ifdef SW_HAVE_COMPRESSION
if (http->body_decompression && !http->compression_error && http->compress_method != HTTP_COMPRESS_NONE) {
else if (http->body_decompression && !http->compression_error && http->compress_method != HTTP_COMPRESS_NONE) {
if (!http->decompress_response(at, length)) {
http->compression_error = true;
goto _append_raw;
}
} else
}
#endif
{
else {
#ifdef SW_HAVE_COMPRESSION
_append_raw:
#endif
Expand All @@ -496,17 +509,17 @@ static int http_parser_on_body(swoole_http_parser *parser, const char *at, size_
std::unique_ptr<File> fp(new File(download_file_name, O_CREAT | O_WRONLY, 0664));
if (!fp->ready()) {
swoole_sys_warning("open(%s, O_CREAT | O_WRONLY) failed", download_file_name);
return false;
return -1;
}
if (http->download_offset == 0) {
if (!fp->truncate(0)) {
swoole_sys_warning("ftruncate(%s) failed", download_file_name);
return false;
return -1;
}
} else {
if (!fp->set_offest(http->download_offset)) {
swoole_sys_warning("fseek(%s, %jd) failed", download_file_name, (intmax_t) http->download_offset);
return false;
return -1;
}
}
http->download_file = fp.release();
Expand Down Expand Up @@ -723,6 +736,20 @@ void Client::apply_setting(zval *zset, const bool check_all) {
websocket_compression = zval_is_true(ztmp);
}
#endif
if (php_swoole_array_get_value(vht, "write_func", ztmp)) {
if (write_func) {
delete write_func;
}
write_func = new zend::Callable(ztmp);
if (!write_func->is_callable()) {
delete write_func;
write_func = nullptr;
zend_throw_exception_ex(swoole_exception_ce,
SW_ERROR_INVALID_PARAMS,
"write_func must be of type callable, %s given",
zend_zval_type_name(ztmp));
}
}
}
if (socket) {
php_swoole_socket_set(socket, zset);
Expand Down Expand Up @@ -1397,6 +1424,10 @@ bool Client::recv_response(double timeout) {
retval,
total_bytes,
parser.state == s_start_res);
if (socket->get_socket()->close_wait) {
success = false;
break;
}
if (parser.state == s_start_res) {
// handle redundant data (websocket packet)
if (parser.upgrade && (size_t) retval > parsed_n + SW_WEBSOCKET_HEADER_LEN) {
Expand Down Expand Up @@ -1580,6 +1611,10 @@ bool Client::close(const bool should_be_reset) {
if (!_socket) {
return false;
}
if (in_callback) {
_socket->get_socket()->close_wait = 1;
return true;
}
zend_update_property_bool(Z_OBJCE_P(zobject), SW_Z8_OBJ_P(zobject), ZEND_STRL("connected"), 0);
if (!_socket->close()) {
php_swoole_socket_set_error_properties(zobject, _socket);
Expand All @@ -1599,6 +1634,9 @@ Client::~Client() {
if (tmp_write_buffer) {
delete tmp_write_buffer;
}
if (write_func) {
delete write_func;
}
}

static sw_inline HttpClientObject *http_client_coro_fetch_object(zend_object *obj) {
Expand Down
48 changes: 48 additions & 0 deletions tests/swoole_http_client_coro/write_func_1.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
--TEST--
swoole_http_client_coro: write func 1
--SKIPIF--
<?php require __DIR__ . '/../include/skipif.inc'; ?>
--FILE--
<?php
require __DIR__ . '/../include/bootstrap.php';

const N = 8;
$chunks = [];
$n = N;
while ($n--) {
$chunks[] = base64_encode(random_bytes(random_int(256, 4096)));
}

$pm = new ProcessManager;
$pm->parentFunc = function ($pid) use ($pm, $chunks) {
Co\run(function () use ($pm, $chunks) {
$cli = new Swoole\Coroutine\Http\Client('127.0.0.1', $pm->getFreePort());
$index = 0;
$cli->set(['write_func' => function ($client, $data) use ($chunks, &$index) {
Assert::eq($chunks[$index], $data);
$index++;
}]);
Assert::assert($cli->get('/'));
});
$pm->kill();
echo "DONE\n";
};

$pm->childFunc = function () use ($pm, $chunks) {
Co\run(function () use ($pm, $chunks) {
$server = new Swoole\Coroutine\Http\Server('127.0.0.1', $pm->getFreePort());
$server->handle('/', function ($req, $resp) use ($server, $chunks) {
foreach ($chunks as $chunk) {
$resp->write($chunk);
usleep(mt_rand(10, 50) * 1000);
}
});
$server->start();
});
};

$pm->childFirst();
$pm->run();
?>
--EXPECT--
DONE
54 changes: 54 additions & 0 deletions tests/swoole_http_client_coro/write_func_2.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
--TEST--
swoole_http_client_coro: write func 1
--SKIPIF--
<?php require __DIR__ . '/../include/skipif.inc'; ?>
--FILE--
<?php
require __DIR__ . '/../include/bootstrap.php';

const N = 16;
$chunks = [];
$n = N;
while ($n--) {
$chunks[] = base64_encode(random_bytes(random_int(256, 4096)));
}

$pm = new ProcessManager;
$pm->parentFunc = function ($pid) use ($pm, $chunks) {
Co\run(function () use ($pm, $chunks) {
$cli = new Swoole\Coroutine\Http\Client('127.0.0.1', $pm->getFreePort());
$index = 0;
$cli->set(['write_func' => function ($client, $data) use ($chunks, &$index) {
Assert::eq($chunks[$index], $data);
$index++;
if ($index == N / 2) {
// reset connection
$client->close();
}
}]);
Assert::false($cli->get('/'));
Assert::eq($cli->getStatusCode(), SWOOLE_HTTP_CLIENT_ESTATUS_SERVER_RESET);
});
$pm->kill();
echo "DONE\n";
};

$pm->childFunc = function () use ($pm, $chunks) {
Co\run(function () use ($pm, $chunks) {
$server = new Swoole\Coroutine\Http\Server('127.0.0.1', $pm->getFreePort());
$server->handle('/', function ($req, $resp) use ($server, $chunks) {
foreach ($chunks as $chunk) {
$resp->write($chunk);
usleep(mt_rand(10, 50) * 1000);
}
$resp->end();
});
$server->start();
});
};

$pm->childFirst();
$pm->run();
?>
--EXPECT--
DONE