Skip to content

Commit 20e65dc

Browse files
committed
CXF-9057: Chunked Stream is closed regularly when Exception is thrown (#2107)
(cherry picked from commit ff6cf7b)
1 parent daf5620 commit 20e65dc

File tree

8 files changed

+585
-2
lines changed

8 files changed

+585
-2
lines changed

core/src/main/java/org/apache/cxf/interceptor/AttachmentOutInterceptor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,13 @@ public void handleMessage(Message message) {
119119
AttachmentSerializer ser = message.getContent(AttachmentSerializer.class);
120120
if (ser != null) {
121121
try {
122+
message.put(Message.PARTIAL_ATTACHMENTS_MESSAGE, true);
122123
String cte = (String)message.getContextualProperty(Message.CONTENT_TRANSFER_ENCODING);
123124
if (cte != null) {
124125
ser.setContentTransferEncoding(cte);
125126
}
126127
ser.writeAttachments();
128+
message.put(Message.PARTIAL_ATTACHMENTS_MESSAGE, false);
127129
} catch (IOException e) {
128130
throw new Fault(new org.apache.cxf.common.i18n.Message("WRITE_ATTACHMENTS", BUNDLE), e);
129131
}

core/src/main/java/org/apache/cxf/message/Message.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ public interface Message extends StringMap {
8282
String EMPTY_PARTIAL_RESPONSE_MESSAGE = "org.apache.cxf.partial.response.empty";
8383
String ONE_WAY_REQUEST = "OnewayRequest";
8484

85+
/**
86+
* Boolean property specifying if the attachments have been partially written
87+
* (due to I/O error, fe).
88+
*/
89+
String PARTIAL_ATTACHMENTS_MESSAGE = "org.apache.cxf.partial.attachments";
90+
91+
8592
/**
8693
* Boolean property specifying if oneWay response must be processed.
8794
*/

rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.cxf.common.logging.LogUtils;
3636
import org.apache.cxf.common.util.StringUtils;
3737
import org.apache.cxf.interceptor.Fault;
38+
import org.apache.cxf.message.Message;
39+
import org.apache.cxf.message.MessageUtils;
3840
import org.apache.cxf.phase.Phase;
3941
import org.apache.cxf.staxutils.StaxUtils;
4042

@@ -61,9 +63,16 @@ static class Soap11FaultOutInterceptorInternal extends AbstractSoapInterceptor {
6163
super(Phase.MARSHAL);
6264
}
6365
public void handleMessage(SoapMessage message) throws Fault {
64-
XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
6566
Fault f = (Fault) message.getContent(Exception.class);
67+
68+
// If only some attachments have been written (usually, using chunked transfer), we could
69+
// have been streaming some data already and may not be able to inject a fault in the middle
70+
// of the data transfer.
71+
if (MessageUtils.getContextualBoolean(message, Message.PARTIAL_ATTACHMENTS_MESSAGE, false)) {
72+
throw f;
73+
}
6674

75+
XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
6776
SoapFault fault = SoapFault.createFault(f, message.getVersion());
6877

6978
try {

rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.cxf.common.util.StringUtils;
3838
import org.apache.cxf.interceptor.Fault;
3939
import org.apache.cxf.message.Message;
40+
import org.apache.cxf.message.MessageUtils;
4041
import org.apache.cxf.phase.Phase;
4142
import org.apache.cxf.staxutils.StaxUtils;
4243

@@ -64,9 +65,16 @@ static class Soap12FaultOutInterceptorInternal extends AbstractSoapInterceptor {
6465
}
6566
public void handleMessage(SoapMessage message) throws Fault {
6667
LOG.info(getClass() + (String) message.get(Message.CONTENT_TYPE));
68+
Fault f = (Fault)message.getContent(Exception.class);
69+
70+
// If only some attachments have been written (usually, using chunked transfer), we could
71+
// have been streaming some data already and may not be able to inject a fault in the middle
72+
// of the data transfer.
73+
if (MessageUtils.getContextualBoolean(message, Message.PARTIAL_ATTACHMENTS_MESSAGE, false)) {
74+
throw f;
75+
}
6776

6877
XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
69-
Fault f = (Fault)message.getContent(Exception.class);
7078
message.put(org.apache.cxf.message.Message.RESPONSE_CODE, f.getStatusCode());
7179

7280
SoapFault fault = SoapFault.createFault(f, message.getVersion());

systests/jaxws/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,18 @@
6363
<goal>wsdl2java</goal>
6464
</goals>
6565
</execution>
66+
<execution>
67+
<id>generate-attachments-test-sources</id>
68+
<phase>generate-test-sources</phase>
69+
<configuration>
70+
<fork>${cxf.codegenplugin.forkmode}</fork>
71+
<testSourceRoot>${basedir}/target/generated/src/test/java</testSourceRoot>
72+
<testWsdlRoot>${basedir}/src/test/resources/attachments</testWsdlRoot>
73+
</configuration>
74+
<goals>
75+
<goal>wsdl2java</goal>
76+
</goals>
77+
</execution>
6678
</executions>
6779
</plugin>
6880
<plugin>
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cxf.systest.jaxws;
21+
22+
import java.io.ByteArrayInputStream;
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.io.OutputStream;
26+
import java.util.Arrays;
27+
import java.util.concurrent.ThreadLocalRandom;
28+
import java.util.logging.Logger;
29+
30+
import jakarta.activation.DataHandler;
31+
import jakarta.activation.DataSource;
32+
import jakarta.xml.ws.Binding;
33+
import jakarta.xml.ws.BindingProvider;
34+
import jakarta.xml.ws.Endpoint;
35+
import jakarta.xml.ws.soap.SOAPBinding;
36+
import jakarta.xml.ws.soap.SOAPFaultException;
37+
import org.apache.cxf.Download;
38+
import org.apache.cxf.DownloadFault_Exception;
39+
import org.apache.cxf.DownloadNextResponseType;
40+
import org.apache.cxf.common.logging.LogUtils;
41+
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
42+
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
43+
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
44+
45+
import org.junit.BeforeClass;
46+
import org.junit.Test;
47+
48+
import static org.hamcrest.CoreMatchers.containsString;
49+
import static org.hamcrest.CoreMatchers.equalTo;
50+
import static org.hamcrest.MatcherAssert.assertThat;
51+
import static org.junit.Assert.assertThrows;
52+
import static org.junit.Assert.assertTrue;
53+
54+
public class AttachmentChunkingTest extends AbstractBusClientServerTestBase {
55+
private static final String PORT = allocatePort(DownloadServer.class);
56+
private static final Logger LOG = LogUtils.getLogger(AttachmentChunkingTest.class);
57+
58+
private static final class DownloadImpl implements Download {
59+
@Override
60+
public DownloadNextResponseType downloadNext(Boolean simulate) {
61+
final DownloadNextResponseType responseType = new DownloadNextResponseType();
62+
responseType.setDataContent(new DataHandler(new DataSource() {
63+
@Override
64+
public InputStream getInputStream() {
65+
if (simulate) {
66+
return simulate();
67+
} else {
68+
return generate(100000);
69+
}
70+
}
71+
72+
@Override
73+
public OutputStream getOutputStream() {
74+
return null;
75+
}
76+
77+
@Override
78+
public String getContentType() {
79+
return "";
80+
}
81+
82+
@Override
83+
public String getName() {
84+
return "";
85+
}
86+
}));
87+
88+
return responseType;
89+
}
90+
}
91+
92+
public static class DownloadServer extends AbstractBusTestServerBase {
93+
protected void run() {
94+
Object implementor = new DownloadImpl();
95+
String address = "http://localhost:" + PORT + "/SoapContext/SoapPort";
96+
Endpoint.publish(address, implementor);
97+
}
98+
99+
public static void main(String[] args) {
100+
try {
101+
DownloadServer s = new DownloadServer();
102+
s.start();
103+
} catch (Exception ex) {
104+
ex.printStackTrace();
105+
System.exit(-1);
106+
} finally {
107+
LOG.info("done!");
108+
}
109+
}
110+
}
111+
112+
@BeforeClass
113+
public static void startServers() throws Exception {
114+
assertTrue("server did not launch correctly", launchServer(DownloadServer.class, true));
115+
}
116+
117+
@Test
118+
public void testChunkingPartialFailure() {
119+
final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
120+
factory.setServiceClass(Download.class);
121+
122+
final Download client = (Download) factory.create();
123+
final BindingProvider bindingProvider = (BindingProvider) client;
124+
final Binding binding = bindingProvider.getBinding();
125+
126+
final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT);
127+
bindingProvider.getRequestContext().put("jakarta.xml.ws.service.endpoint.address", address);
128+
((SOAPBinding) binding).setMTOMEnabled(true);
129+
130+
// See please https://issues.apache.org/jira/browse/CXF-9057
131+
SOAPFaultException ex = assertThrows(SOAPFaultException.class, () -> client.downloadNext(true));
132+
assertThat(ex.getMessage(), containsString("simulated error during stream processing"));
133+
}
134+
135+
@Test
136+
public void testChunking() throws IOException, DownloadFault_Exception {
137+
final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
138+
factory.setServiceClass(Download.class);
139+
140+
final Download client = (Download) factory.create();
141+
final BindingProvider bindingProvider = (BindingProvider) client;
142+
final Binding binding = bindingProvider.getBinding();
143+
144+
final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT);
145+
bindingProvider.getRequestContext().put("jakarta.xml.ws.service.endpoint.address", address);
146+
((SOAPBinding) binding).setMTOMEnabled(true);
147+
148+
final DownloadNextResponseType response = client.downloadNext(false);
149+
assertThat(response.getDataContent().getInputStream().readAllBytes().length, equalTo(100000));
150+
}
151+
152+
private static InputStream generate(int size) {
153+
final byte[] buf = new byte[size];
154+
Arrays.fill(buf, (byte) 'x');
155+
return new ByteArrayInputStream(buf);
156+
}
157+
158+
private static InputStream simulate() {
159+
return new InputStream() {
160+
@Override
161+
public int read() {
162+
return (byte) 'x';
163+
}
164+
165+
@Override
166+
public int read(byte[] b, int off, int len) {
167+
if (ThreadLocalRandom.current().nextBoolean()) {
168+
throw new IllegalArgumentException("simulated error during stream processing");
169+
}
170+
171+
for (int i = off; i < off + len; i++) {
172+
b[i] = (byte) 'x';
173+
}
174+
175+
return len;
176+
}
177+
};
178+
}
179+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<wsdl:definitions name="Download" targetNamespace="http://cxf.apache.org/"
3+
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
4+
xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:tns="http://cxf.apache.org/"
5+
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
6+
xmlns:wsam="http://www.w3.org/2007/05/addressing/metadata">
7+
<wsdl:types>
8+
<xs:schema xmlns:tns="http://cxf.apache.org/"
9+
xmlns:xmime="http://www.w3.org/2005/05/xmlmime"
10+
xmlns:xs="http://www.w3.org/2001/XMLSchema" attributeFormDefault="unqualified"
11+
elementFormDefault="unqualified" targetNamespace="http://cxf.apache.org/">
12+
13+
<xs:import namespace="http://www.w3.org/2005/05/xmlmime"/>
14+
<xs:element name="downloadNext" type="tns:downloadNext"/>
15+
<xs:element name="downloadNextResponse" type="tns:downloadNextResponse"/>
16+
<xs:complexType name="downloadNextResponseType">
17+
<xs:sequence>
18+
<xs:element name="dataContent" type="xs:base64Binary"
19+
xmime:expectedContentTypes="application/octet-stream"/>
20+
</xs:sequence>
21+
</xs:complexType>
22+
<xs:complexType name="downloadNext">
23+
<xs:sequence>
24+
<xs:element minOccurs="0" name="simulate" type="xs:boolean"/>
25+
</xs:sequence>
26+
</xs:complexType>
27+
<xs:complexType name="downloadNextResponse">
28+
<xs:sequence>
29+
<xs:element form="qualified" minOccurs="0" name="downloadNextResponse"
30+
type="tns:downloadNextResponseType"/>
31+
</xs:sequence>
32+
</xs:complexType>
33+
<xs:element name="DownloadFault" type="tns:DownloadFault"/>
34+
<xs:complexType name="DownloadFault">
35+
<xs:sequence/>
36+
</xs:complexType>
37+
</xs:schema>
38+
</wsdl:types>
39+
<wsdl:message name="DownloadFault">
40+
<wsdl:part name="DownloadFault" element="tns:DownloadFault">
41+
</wsdl:part>
42+
</wsdl:message>
43+
<wsdl:message name="downloadNextResponse">
44+
<wsdl:part name="parameters" element="tns:downloadNextResponse">
45+
</wsdl:part>
46+
</wsdl:message>
47+
<wsdl:message name="downloadNext">
48+
<wsdl:part name="parameters" element="tns:downloadNext">
49+
</wsdl:part>
50+
</wsdl:message>
51+
<wsdl:portType name="Download">
52+
<wsdl:operation name="downloadNext">
53+
<wsdl:input name="downloadNext" message="tns:downloadNext"
54+
wsam:Action="http://cxf.apache.org/downloadNext">
55+
</wsdl:input>
56+
<wsdl:output name="downloadNextResponse" message="tns:downloadNextResponse"
57+
wsam:Action="http://cxf.apache.org/Download/downloadNextResponse">
58+
</wsdl:output>
59+
<wsdl:fault name="DownloadFault" message="tns:DownloadFault"
60+
wsam:Action="http://cxf.apache.org/Download/downloadNext/Fault/DownloadFault">
61+
</wsdl:fault>
62+
</wsdl:operation>
63+
</wsdl:portType>
64+
<wsdl:binding name="DownloadServiceInterfaceServiceSoapBinding" type="tns:Download">
65+
<soap:binding style="document" transport="http://schemas.xmlsoap.org/soap/http"/>
66+
<wsdl:operation name="downloadNext">
67+
<soap:operation soapAction="http://cxf.apache.org/downloadNext" style="document"/>
68+
<wsdl:input name="downloadNext">
69+
<soap:body use="literal"/>
70+
</wsdl:input>
71+
<wsdl:output name="downloadNextResponse">
72+
<soap:body use="literal"/>
73+
</wsdl:output>
74+
<wsdl:fault name="DownloadFault">
75+
<soap:fault name="DownloadFault" use="literal"/>
76+
</wsdl:fault>
77+
</wsdl:operation>
78+
</wsdl:binding>
79+
<wsdl:service name="DownloadServiceInterfaceService">
80+
<wsdl:port name="DownloadPort" binding="tns:DownloadServiceInterfaceServiceSoapBinding">
81+
<soap:address location="http://localhost:9090/DownloadPort"/>
82+
</wsdl:port>
83+
</wsdl:service>
84+
</wsdl:definitions>

0 commit comments

Comments
 (0)