Skip to content

添加了基于 SCRAM-SHA512 加密方式的连接 #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
本仓库是在 [longlang/kafka](https://github.com/swoole/phpkafka) 的基础上增加了 `SCRAM-SHA-512` 加密方式的连接。

使用时 `sasl` 配置为
```php
...
'sasl' => [
'type' => \longlang\phpkafka\Sasl\ScramSha512Sasl::class,
'username' => env('KAFKA_SASL_USERNAME', ''),
'password' => env('KAFKA_SASL_PASSWORD', ''),
// 是否验证第二次握手的服务器响应消息的签名
'verify_final_signature' => (bool) env('KAFKA_SASL_VERIFY_FINAL_SIGNATURE', false),
],
...
```


# Changed Log
## [v1.2.3] - 2023-08-24
### Added
- 增加基于 `SCRAM-SHA-512` 加密方式的连接;
50 changes: 50 additions & 0 deletions src/Client/SyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
use longlang\phpkafka\Protocol\SaslHandshake\SaslHandshakeRequest;
use longlang\phpkafka\Protocol\SaslHandshake\SaslHandshakeResponse;
use longlang\phpkafka\Protocol\Type\Int32;
use longlang\phpkafka\Sasl\PlainSasl;
use longlang\phpkafka\Sasl\SaslInterface;
use longlang\phpkafka\Sasl\ScramSha512Sasl;
use longlang\phpkafka\Socket\SocketInterface;
use longlang\phpkafka\Socket\StreamSocket;

Expand Down Expand Up @@ -207,6 +209,21 @@ protected function sendAuthInfo(): void
if (!$class instanceof SaslInterface) {
return;
}

if ($class instanceof PlainSasl) {
/* \longlang\phpkafka\Sasl\PlainSasl $class */
$this->sendPlainAuthInfo($class);
} elseif ($class instanceof ScramSha512Sasl) {
/* \longlang\phpkafka\Sasl\ScramSha512Sasl $class */
$this->sendScramSha512AuthInfo($class);
} else {
return;
}
}

private function sendPlainAuthInfo(SaslInterface $class): void
{
/** @var \longlang\phpkafka\Sasl\PlainSasl $class */
$handshakeRequest = new SaslHandshakeRequest();
$handshakeRequest->setMechanism($class->getName());
$correlationId = $this->send($handshakeRequest);
Expand All @@ -221,4 +238,37 @@ protected function sendAuthInfo(): void
$authenticateResponse = $this->recv($correlationId);
ErrorCode::check($authenticateResponse->getErrorCode());
}

private function sendScramSha512AuthInfo(SaslInterface $class): void
{
/** @var \longlang\phpkafka\Sasl\ScramSha512Sasl $class */
// Send first verification message
$handshakeRequest = new SaslHandshakeRequest();
$handshakeRequest->setMechanism($class->getName());
$correlationId = $this->send($handshakeRequest);
/** @var SaslHandshakeResponse $handshakeResponse */
$handshakeResponse = $this->recv($correlationId);
ErrorCode::check($handshakeResponse->getErrorCode());

// First handshake
$authenticateRequest = new SaslAuthenticateRequest();
$authenticateRequest->setAuthBytes($class->getAuthBytes());
$correlationId = $this->send($authenticateRequest);
/** @var SaslAuthenticateResponse $authenticateResponse */
$authenticateResponse = $this->recv($correlationId);
ErrorCode::check($authenticateResponse->getErrorCode());

// Second handshake
$authenticateRequest = new SaslAuthenticateRequest();
$authenticateRequest->setAuthBytes($class->getFinalMessage($authenticateResponse->getAuthBytes()));
$correlationId = $this->send($authenticateRequest);
/** @var SaslAuthenticateResponse $authenticateResponse */
$authenticateResponse = $this->recv($correlationId);
ErrorCode::check($authenticateResponse->getErrorCode());

// Verify the second server response
if ($class->enableFinalSignatureVerification()) {
$class->verifyFinalMessage($authenticateResponse->getAuthBytes());
}
}
}
14 changes: 12 additions & 2 deletions src/Consumer/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,21 @@ protected function heartbeat(): void
}
}

protected function getLastHeartbeatTime(): float
{
return $this->lastHeartbeatTime > 0 ? $this->lastHeartbeatTime : microtime(true);
}

protected function setLastHeartbeatTime(float $lastHeartbeatTime): void
{
$this->lastHeartbeatTime = $lastHeartbeatTime;
}

protected function checkBeartbeat(): void
{
$time = microtime(true);
if ($time - $this->lastHeartbeatTime >= $this->config->getGroupHeartbeat()) {
$this->lastHeartbeatTime = $time;
if ($time - $this->getLastHeartbeatTime() >= $this->config->getGroupHeartbeat()) {
$this->setLastHeartbeatTime($time);
$this->heartbeat();
}
}
Expand Down
196 changes: 196 additions & 0 deletions src/Sasl/ScramSha512Sasl.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
<?php

declare(strict_types=1);

namespace longlang\phpkafka\Sasl;

use longlang\phpkafka\Config\CommonConfig;
use longlang\phpkafka\Exception\KafkaErrorException;
use longlang\phpkafka\Protocol\ErrorCode;

class ScramSha512Sasl implements SaslInterface
{
/**
* @var CommonConfig
*/
protected $config;

/**
* @var string
*/
protected $nonce = '';

/**
* @var string
*/
protected $saltedPassword = '';

/**
* @var string
*/
protected $authMessage = '';

public function __construct(CommonConfig $config)
{
$this->config = $config;
$this->nonce = base64_encode(random_bytes(16));
}

/**
* 授权模式.
*/
public function getName(): string
{
return 'SCRAM-SHA-512';
}

/**
* SCRAM-SHA-512 first handshake.
*/
public function getAuthBytes(): string
{
$config = $this->config->getSasl();
if (empty($config['username']) || empty($config['password'])) {
throw new KafkaErrorException('sasl not found auth info');
}

return sprintf('n,,%s', $this->getFirstMessageBare());
}

/**
* Get all SASL configurations.
*/
public function getSaslConfigs(): array
{
return $this->config->getSasl();
}

/**
* Get SASL simple configuration.
*/
public function getSaslConfig(string $key): mixed
{
return $this->getSaslConfigs()[$key] ?? null;
}

/**
* Second handshake of SCRAM-SHA-512.
*/
public function getFinalMessage(string $response): string
{
// Split the response after the first handshake
[$r, $s, $i] = explode(',', $response);

// Extract the random number, salt, and number of iterations
$serverNonce = $this->ltrimMessage($r);
$salt = base64_decode($this->ltrimMessage($s));
$iterations = (int) $this->ltrimMessage($i);

// Calculate the parameters for the second handshake
$saltedPassword = $this->calculateSaltedPassword($this->getPassword(), $salt, $iterations);
$this->saltedPassword = $saltedPassword;

$clientKey = $this->calculateClientKey($saltedPassword);
$storedKey = $this->calculateStoredKey($clientKey);

$clientFirstMessageBare = $this->getFirstMessageBare();
$serverFirstMessage = $response;
$clientFinalMessageWithoutProof = $this->getMessageWithoutProof($serverNonce);

$authMessage = sprintf('%s,%s,%s', $clientFirstMessageBare, $serverFirstMessage, $clientFinalMessageWithoutProof);
$this->authMessage = $authMessage;
$clientSignature = $this->hmac($authMessage, $storedKey);

return sprintf('%s,p=%s', $clientFinalMessageWithoutProof, base64_encode($clientKey ^ $clientSignature));
}

/**
* SHA-512 encryption.
*/
public function hmac(string $data, string $key): string
{
return hash_hmac('sha512', $data, $key, true);
}

/**
* Remove the first two characters of the server response message.
*/
public function ltrimMessage(string $param): string
{
return substr($param, 2);
}

/**
* Whether to enable final signature verification.
*/
public function enableFinalSignatureVerification(): bool
{
return (bool) $this->getSaslConfig('verify_final_signature');
}

/**
* Verify final signature.
*/
public function verifyFinalMessage(string $message): void
{
$receivedSignature = $this->ltrimMessage($message);
$receivedSignature = base64_decode($receivedSignature);

$serverKey = $this->hmac('Server Key', $this->saltedPassword);
$expectedSignature = $this->hmac($this->authMessage, $serverKey);

if (!hash_equals($receivedSignature, $expectedSignature)) {
ErrorCode::check(ErrorCode::SASL_AUTHENTICATION_FAILED);
}
}

/**
* Get first handshake information of SCRAM-SHA-512.
*/
private function getFirstMessageBare(): string
{
return sprintf('n=%s,r=%s', $this->getSaslConfig('username'), $this->nonce);
}

/**
* Get SASL password.
*/
private function getPassword(): string
{
return $this->getSaslConfigs()['password'] ?? '';
}

/**
* Compute salted password using PBKDF2 function and the salt and iteration count provided by the server.
*/
private function calculateSaltedPassword(string $password, string $salt, int $iterations): string
{
return hash_pbkdf2('sha512', $password, $salt, $iterations, 0, true);
}

/**
* Compute client key using salted password and HMAC function to calculate client key.
*/
private function calculateClientKey(string $saltedPassword): string
{
// In SCRAM-SHA-512, a salted password is required to encrypt the calculation secret
// and the key is fixed to "Client Key"
return $this->hmac('Client Key', $saltedPassword);
}

/**
* Compute stored key using client key and SHA-256 function to calculate stored key.
*/
private function calculateStoredKey(string $clientKey): string
{
return hash('sha512', $clientKey, true);
}

/**
* Get message without proof.
*/
private function getMessageWithoutProof(string $nonce): string
{
return sprintf('c=biws,r=%s', $nonce);
}
}