Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7872](https://github.com/apache/incubator-seata/pull/7872)] Automatically calculate the values for JVM parameters
- [[#7876](https://github.com/apache/incubator-seata/pull/7876)] feature: add MCP custom configuration and authentication code
- [[#7878](https://github.com/apache/incubator-seata/pull/7878)] console supports creation and modification of transaction groups for Raft clusters
- [[#7903](https://github.com/apache/incubator-seata/pull/7903)] Support HTTP/2 stream push for the Watch API in Server Raft mode


### bugfix:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
- [[#7872](https://github.com/apache/incubator-seata/pull/7872)] 根据当前内存值自动计算 JVM 参数
- [[#7876](https://github.com/apache/incubator-seata/pull/7876)] feature: 添加MCP服务自定义配置属性和鉴权功能
- [[#7878](https://github.com/apache/incubator-seata/pull/7878)] 控制台支持raft集群模式的事务分组管理
- [[#7903](https://github.com/apache/incubator-seata/pull/7903)] 在Server Raft模式下支持Watch API的HTTP/2流推送


### bugfix:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.metadata;

/**
* Cluster watch event data class.
* This class represents the event data received from the cluster watch API.
* It is used as a DTO (Data Transfer Object) for deserializing server-sent events.
*
* <p>The server sends events in SSE format:
* <pre>
* data: {"type":"cluster-update|keepalive|timeout","group":"default","term":123,"timestamp":1234567890}
* </pre>
*
* <p>Note: The event type is included in the JSON data, not in a separate SSE "event:" field.
* This simplifies parsing and reduces the number of lines to read.
*
* @see org.apache.seata.common.util.SeataHttpWatch
*/
public class ClusterWatchEvent {

/**
* Event type: "cluster-update", "keepalive", or "timeout"
*/
private String type;

private String group;

private Long term;

private Long timestamp;

public ClusterWatchEvent() {}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public String getGroup() {
return group;
}

public void setGroup(String group) {
this.group = group;
}

public Long getTerm() {
return term;
}

public void setTerm(Long term) {
this.term = term;
}

public Long getTimestamp() {
return timestamp;
}

public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}

@Override
public String toString() {
return "ClusterWatchEvent{" + "type='"
+ type + '\'' + ", group='"
+ group + '\'' + ", term="
+ term + ", timestamp="
+ timestamp + '}';
}
}
110 changes: 92 additions & 18 deletions common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,24 +277,6 @@ public static void doGetWithHttp2(
executeAsync(client, request, callback);
}

private static RequestBody createRequestBody(Map<String, String> params, String contentType)
throws JsonProcessingException {
if (params == null || params.isEmpty()) {
return RequestBody.create(new byte[0]);
}

// Extract media type without parameters for robust comparison
String mediaTypeOnly = contentType == null ? "" : contentType.split(";")[0].trim();
if (MEDIA_TYPE_FORM_URLENCODED.toString().equals(mediaTypeOnly)) {
FormBody.Builder formBuilder = new FormBody.Builder();
params.forEach(formBuilder::add);
return formBuilder.build();
} else {
String json = OBJECT_MAPPER.writeValueAsString(params);
return RequestBody.create(json, MEDIA_TYPE_JSON);
}
}

private static OkHttpClient createHttp2ClientWithTimeout(int timeoutSeconds) {
return HTTP2_CLIENT_MAP.computeIfAbsent(timeoutSeconds, k -> new OkHttpClient.Builder()
// Use HTTP/2 prior knowledge to directly use HTTP/2 without an initial HTTP/1.1 upgrade
Expand Down Expand Up @@ -344,4 +326,96 @@ public void onFailure(Call call, IOException e) {
}
});
}

private static OkHttpClient createHttp2WatchClient(int connectTimeoutSeconds) {
return new OkHttpClient.Builder()
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
.connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS) // 连接阶段快速失败
.readTimeout(0, TimeUnit.SECONDS) // 等待TC推送数据(建立连接后持续监听服务器推送)
.writeTimeout(connectTimeoutSeconds, TimeUnit.SECONDS)
.build();
}

public static <T> SeataHttpWatch<T> watch(String url, Map<String, String> headers, Class<T> eventType)
throws IOException {
return watch(url, headers, null, "GET", eventType);
}

public static <T> SeataHttpWatch<T> watch(String url, Class<T> eventType) throws IOException {
return watch(url, null, null, "GET", eventType);
}

/**
* Execute a watch request with specified HTTP method and return a Watch iterator.
* This method creates a long-lived HTTP/2 connection to receive Server-Sent Events (SSE).
*/
private static <T> SeataHttpWatch<T> watch(
String url, Map<String, String> headers, RequestBody requestBody, String method, Class<T> eventType)
throws IOException {

OkHttpClient client = createHttp2WatchClient(30);
Request request = buildHttp2WatchRequest(url, headers, requestBody, method);
return SeataHttpWatch.createWatch(client, request, eventType);
}

public static <T> SeataHttpWatch<T> watchPost(
String url, Map<String, String> params, Map<String, String> headers, Class<T> eventType)
throws IOException {
try {
String contentType = headers != null ? headers.get("Content-Type") : "";
RequestBody requestBody = createRequestBody(params, contentType);
return watch(url, headers, requestBody, "POST", eventType);
} catch (JsonProcessingException e) {
LOGGER.error("Failed to create request body: {}", e.getMessage(), e);
throw new IOException("Failed to create request body", e);
}
}

public static <T> SeataHttpWatch<T> watchPost(String url, Map<String, String> params, Class<T> eventType)
throws IOException {
return watchPost(url, params, null, eventType);
}

private static Request buildHttp2WatchRequest(
String url, Map<String, String> headers, RequestBody requestBody, String method) {
Headers.Builder headerBuilder = new Headers.Builder();
if (headers != null) {
headers.forEach(headerBuilder::add);
}
// Always add Accept header for SSE
headerBuilder.add("Accept", "text/event-stream");

Request.Builder requestBuilder = new Request.Builder().url(url).headers(headerBuilder.build());

if ("POST".equals(method) && requestBody != null) {
requestBuilder.post(requestBody);
} else if ("PUT".equals(method) && requestBody != null) {
requestBuilder.put(requestBody);
} else if ("GET".equals(method)) {
requestBuilder.get();
} else {
// Default to GET if method is not specified or not supported
requestBuilder.get();
}

return requestBuilder.build();
}

private static RequestBody createRequestBody(Map<String, String> params, String contentType)
throws JsonProcessingException {
if (params == null || params.isEmpty()) {
return RequestBody.create(new byte[0]);
}

// Extract media type without parameters for robust comparison
String mediaTypeOnly = contentType == null ? "" : contentType.split(";")[0].trim();
if (MEDIA_TYPE_FORM_URLENCODED.toString().equals(mediaTypeOnly)) {
FormBody.Builder formBuilder = new FormBody.Builder();
params.forEach(formBuilder::add);
return formBuilder.build();
} else {
String json = OBJECT_MAPPER.writeValueAsString(params);
return RequestBody.create(json, MEDIA_TYPE_JSON);
}
}
}
Loading
Loading