From 55e9d92281af593bd200449e8af9f44ebcc11866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E5=BF=97?= Date: Sun, 8 Apr 2018 17:06:50 +0800 Subject: [PATCH] =?UTF-8?q?PHP=E6=95=B0=E6=8D=AE=E5=BA=93=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=B1=A0=E7=9A=84=E7=AE=80=E5=8D=95=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 16 ++++ README.md | 14 ++++ conf/PoolConf.php | 17 +++++ examples/client.php | 40 ++++++++++ examples/client2.php | 25 +++++++ examples/client3.php | 18 +++++ lib/Pool/Client.php | 52 +++++++++++++ lib/Pool/Exception.php | 5 ++ lib/Pool/Server.php | 129 +++++++++++++++++++++++++++++++++ lib/Pool/Server/Connection.php | 56 ++++++++++++++ lib/Pool/Server/Stmt.php | 35 +++++++++ worker/worker.php | 108 +++++++++++++++++++++++++++ 12 files changed, 515 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 conf/PoolConf.php create mode 100644 examples/client.php create mode 100644 examples/client2.php create mode 100644 examples/client3.php create mode 100644 lib/Pool/Client.php create mode 100644 lib/Pool/Exception.php create mode 100644 lib/Pool/Server.php create mode 100644 lib/Pool/Server/Connection.php create mode 100644 lib/Pool/Server/Stmt.php create mode 100644 worker/worker.php diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..12475d9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +nbproject/ +.idea/ +nbbuild/ +dist/ +nbdist/ +nbactions.xml +nb-configuration.xml +.nb-gradle/ +*.iml +out +gen +*.cache +*.log +/config.inc.php +build +version \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e360378 --- /dev/null +++ b/README.md @@ -0,0 +1,14 @@ +# 在PHP中使用pdo驱动和gearman实现的数据库连接池 + +[连接池实现参考](https://gonzalo123.com/2010/11/01/database-connection-pooling-with-php-and-gearman/) +[gearman参考](https://blog.csdn.net/qq43599939/article/details/54177438) + +有待改进的点: +1. 预先加载的连接数量要可配置 +2. 当实际的PDO连接数不够时刻自动增加 +3. 异常处理 +4. PDO方法支持 +5. ... + + + diff --git a/conf/PoolConf.php b/conf/PoolConf.php new file mode 100644 index 0000000..0947507 --- /dev/null +++ b/conf/PoolConf.php @@ -0,0 +1,17 @@ + array( + 'dsn' => "pgsql:dbname=gonzalo;host=localhost", + 'username' => 'user', + 'password' => 'password', + 'options' => null), + ); + + static $SERVERS = array( + array('127.0.0.1', 4730), + array('127.0.0.1', 4731), + ); +} \ No newline at end of file diff --git a/examples/client.php b/examples/client.php new file mode 100644 index 0000000..df7ed60 --- /dev/null +++ b/examples/client.php @@ -0,0 +1,40 @@ +getConnection(PoolConf::PG1); // Server\Connection实例 + +$sql = "SELECT * FROM TEST.TBL1"; +$stmt = $conn->prepare($sql); // Stmt实例 + +$stmt->execute(); +$data = $stmt->fetchall(); +echo "

count: " . count($data) . "

"; + +try { + $sql = "SELECT * FROM NON_EXISTENT_TABLE WHERE sealeccion=1"; + $stmt = $conn->prepare($sql); + + $stmt->execute(); + $data = $stmt->fetchall(); + echo "

count: " . count($data) . "

"; + +} catch (Exception $e) { + echo "ERROR: " . $e->getMessage(); +} + + + +echo "
";
+print_r(Client::singleton()->info(PoolConf::PG1));
+echo "
"; + +/* +$conn->execute(array('NAME' => 'gonzalo')); +$conn->commit(); +*/ diff --git a/examples/client2.php b/examples/client2.php new file mode 100644 index 0000000..158aa52 --- /dev/null +++ b/examples/client2.php @@ -0,0 +1,25 @@ +getConnection(PoolConf::PG1); + +$data = $conn->prepare("SELECT * FROM TEST.TBL1 WHERE FIELD1=:S")->execute(array('S' => 1))->fetchall(); + +echo count($data); + +echo "
";
+print_r(Client::singleton()->info(PoolConf::PG1));
+echo "
"; + + + +/* +$conn->execute(array('NAME' => 'gonzalo')); +$conn->commit(); +*/ diff --git a/examples/client3.php b/examples/client3.php new file mode 100644 index 0000000..aa56e8e --- /dev/null +++ b/examples/client3.php @@ -0,0 +1,18 @@ +getConnection(PoolConf::PG1); + +$conn->beginTransaction(); +$data = $conn->prepare("SELECT * FROM TEST.TBL1 WHERE SELECCION=:S")->execute(array('S' => 1))->fetchall(); +$conn->rollback(); + +echo "
";
+print_r(Client::singleton()->info(PoolConf::PG1));
+echo "
"; diff --git a/lib/Pool/Client.php b/lib/Pool/Client.php new file mode 100644 index 0000000..7962127 --- /dev/null +++ b/lib/Pool/Client.php @@ -0,0 +1,52 @@ +_client = $client; + } + + public static function shutdown() { + if (count(self::$_conn) > 0) { + foreach (self::$_conn as $conn) { + $conn->release(); + } + } + + } + + public static function singleton() { + if (is_null(self::$_instance)) { + register_shutdown_function(array("\Pool\Client", 'shutdown')); + } + $client = new \GearmanClient(); + foreach (\PoolConf::$SERVERS as $server) { + $client->addServer($server[0], $server[1]); + } + + self::$_instance = new Client($client); + + return self::$_instance; + + } + + public function getConnection($key) { + $cid = $this->_client->do("getConnection", $key); + $conn = new Server\Connection($cid, $this->_client, $key); + self::$_conn[] = $conn; + return $conn; + } + + public function info($key) + { + return unserialize($this->_client->do("info", $key)); + } + +} \ No newline at end of file diff --git a/lib/Pool/Exception.php b/lib/Pool/Exception.php new file mode 100644 index 0000000..f25e18e --- /dev/null +++ b/lib/Pool/Exception.php @@ -0,0 +1,5 @@ +setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION); + self::$pool[$key][] = $conn; + } + + public static function getConnection($key) { + $conn = array_shift(self::$pool[$key]); + $cid = uniqid(); + self::$usedPool[$key][$cid] = $conn; + + return $cid; + + } + + public static function prepare($key, $cid, $sql) { + $conn = Server::_getConnection($key, $cid); + $stmtId = ''; + if (!is_null($conn)) { + $stmtId = md5(serialize(array($key, $sql))); + $stmt = Server::_getStmt($stmtId); + if (false === $stmt) { + $stmt = $conn->prepare($sql); // PDO实例 + } + self::_setStmt($stmtId, $stmt); + } else { + echo "ERROR: cid:{$cid}, sql:{$sql}\n"; + } + return $stmtId; + + } + + public static function execute($stmtId, $parameters) { + $stmt = Server::_getStmt($stmtId); + try { + $stmt->execute($parameters); + } catch (\PDOException $e) { + return serialize(new Exception($e->getMessage(), $e->getCode())); + } + self::_setStmt($stmtId, $stmt); + return $stmtId; + } + + public static function fetchAll($stmtId) { + $stmt = Server::_getStmt($stmtId); + if (!is_null($stmt)) { + return $stmt->fetchAll(); + } else { + echo "ERROR: stmtId:-{$stmtId}-\n"; + } + } + + public static function info($key) { + return array( + 'usedPool' => count(self::$usedPool[$key]), + 'pool' => count(self::$pool[$key]), + 'stmts' => count(self::$stmts) + ); + + } + + public static function release($key, $cid) { + $conn = array_shift(self::$usedPool[$key]); + self::$pool[$key][] = $conn; + } + + public function beginTransaction($cid, $key) { + $conn = Server::_getConnection($key, $cid); + $conn->beginTransaction(); + } + + public function commit($cid, $key) { + $conn = Server::_getConnection($key, $cid); + $conn->commit(); + } + + public function rollback($cid, $key) { + $conn = Server::_getConnection($key, $cid); + $conn->rollback(); + } + + public static function _getConnection($key, $cid) { + return self::$usedPool[$key][$cid]; + } + + public static function _getStmt($stmtId) { + if (array_key_exists($stmtId, self::$stmts)) { + return self::$stmts[$stmtId]; + } else { + return false; + } + } + + public static function _setStmt($stmtId, $stmt) { + self::$stmts[$stmtId] = $stmt; + } + +} \ No newline at end of file diff --git a/lib/Pool/Server/Connection.php b/lib/Pool/Server/Connection.php new file mode 100644 index 0000000..10e6c1d --- /dev/null +++ b/lib/Pool/Server/Connection.php @@ -0,0 +1,56 @@ +_cid = $cid; + $this->_client = $client; + $this->_key = $key; + } + + public function prepare($sql) { + $stmtId = $this->_client->do('prepare', serialize(array( + 'sql' => $sql, + 'cid' => $this->_cid, + 'key' => $this->_key, + ))); + + return new Stmt($stmtId, $this->_cid, $this->_client); // Stmt实例 + } + + public function release() { + $this->_client->do('release', serialize(array( + 'key' => $this->_key, + 'cid' => $this->_cid, + ))); + } + + public function beginTransaction() { + $this->_client->do('beginTransaction', serialize(array( + 'cid' => $this->_cid, + 'key' => $this->_key, + ))); + } + + public function commit() { + $this->_client->do('commit', serialize(array( + 'cid' => $this->_cid, + 'key' => $this->_key, + ))); + } + + public function rollback() { + $this->_client->do('rollback', serialize(array( + 'cid' => $this->_cid, + 'key' => $this->_key, + ))); + } + + +} \ No newline at end of file diff --git a/lib/Pool/Server/Stmt.php b/lib/Pool/Server/Stmt.php new file mode 100644 index 0000000..00d4651 --- /dev/null +++ b/lib/Pool/Server/Stmt.php @@ -0,0 +1,35 @@ +_cid, $this->_client); + function __construct($stmtId, $cid, $client) + { + $this->_stmt = $stmtId; + $this->_cid = $cid; + $this->_client = $client; + } + + public function execute($parameters=array()) { + $out = $this->_client->do('execute', serialize(array( + 'parameters' => $parameters, + 'stmt' => $this->_stmt, + ))); + $this->_stmt = $out; + return $this; + } + + public function fetchAll () { + $data = $this->_client->do('fetchAll', serialize(array( + 'stmt' => $this->_stmt, + ))); + + return unserialize($data); + + } + +} \ No newline at end of file diff --git a/worker/worker.php b/worker/worker.php new file mode 100644 index 0000000..f76f8a1 --- /dev/null +++ b/worker/worker.php @@ -0,0 +1,108 @@ +addServer($server[0], $server[1]); +} + +\Pool\Server::init(); + +$worker->addFunction('getConnection', 'getConnection'); +$worker->addFunction('prepare', 'prepare'); +$worker->addFunction('execute', 'execute'); +$worker->addFunction('fetchAll', 'fetchAll'); +$worker->addFunction('info', 'info'); +$worker->addFunction('release', 'release'); +$worker->addFunction('beginTransaction', 'beginTransaction'); +$worker->addFunction('commit', 'commit'); +$worker->addFunction('rollback', 'rollback'); + +while (1) { + try { + $worker->work(); + if ($worker->returnCode() != GEARMAN_SUCCESS) { + break; + } + } catch (Exception $e) { + echo $e->getMessage(); + } +} + +function getConnection ($job) { + echo __function__."\n"; + // $job->workload() 接收数据 + $key = $job->workload(); // PoolConf::PG1 + + return \Pool\Server::getConnection($key); +} + +function prepare ($job) { + echo __function__."\n"; + // $job->workload() 接收数据 + $params = unserialize($job->workload()); + $sql = $params['sql']; + $cid = $params['cid']; + $key = $params['key']; + return \Pool\Server::prepare($key, $cid, $sql); +} + +function execute ($job) { + echo __function__."\n"; + // $job->workload() 接收数据 + $params = unserialize($job->workload()); + $stmtId = $params['stmt']; + $parameters = $params['parameters']; + return \Pool\Server::execute($stmtId, $parameters); +} + +function fetchAll ($job) { + echo __function__."\n"; + $params = unserialize($job->workload()); + $stmtId = $params['stmt']; + return serialize(\Pool\Server::fetchAll($stmtId)); +} + +function info ($job) { + echo __function__."\n"; + $key = $job->workload(); + return serialize(\Pool\Server::info($key)); +} + +function release ($job) { + echo __function__."\n"; + $params = unserialize($job->workload()); + $cid = $params['cid']; + $key = $params['key']; + return serialize(\Pool\Server::release($key, $cid)); +} + +function beginTransaction ($job) { + echo __function__."\n"; + $params = unserialize($job->workload()); + $cid = $params['cid']; + $key = $params['key']; + + return \Pool\Server::beginTransaction($cid, $key); +} + +function commit ($job) { + echo __function__."\n"; + $params = unserialize($job->workload()); + $cid = $params['cid']; + $key = $params['key']; + + return \Pool\Server::commit($cid, $key); +} + +function rollback ($job) { + echo __function__."\n"; + $params = unserialize($job->workload()); + $cid = $params['cid']; + $key = $params['key']; + + return \Pool\Server::rollback($cid, $key); +} +