Skip to content

Commit 34ffa81

Browse files
committed
CXF-9057: Chunked Stream is closed regularly when Exception is thrown (#2107)
(cherry picked from commit ff6cf7b) (cherry picked from commit 20e65dc)
1 parent c5886a8 commit 34ffa81

File tree

8 files changed

+586
-2
lines changed

8 files changed

+586
-2
lines changed

core/src/main/java/org/apache/cxf/interceptor/AttachmentOutInterceptor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,13 @@ public void handleMessage(Message message) {
119119
AttachmentSerializer ser = message.getContent(AttachmentSerializer.class);
120120
if (ser != null) {
121121
try {
122+
message.put(Message.PARTIAL_ATTACHMENTS_MESSAGE, true);
122123
String cte = (String)message.getContextualProperty(Message.CONTENT_TRANSFER_ENCODING);
123124
if (cte != null) {
124125
ser.setContentTransferEncoding(cte);
125126
}
126127
ser.writeAttachments();
128+
message.put(Message.PARTIAL_ATTACHMENTS_MESSAGE, false);
127129
} catch (IOException e) {
128130
throw new Fault(new org.apache.cxf.common.i18n.Message("WRITE_ATTACHMENTS", BUNDLE), e);
129131
}

core/src/main/java/org/apache/cxf/message/Message.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,13 @@ public interface Message extends StringMap {
8181
String EMPTY_PARTIAL_RESPONSE_MESSAGE = "org.apache.cxf.partial.response.empty";
8282
String ONE_WAY_REQUEST = "OnewayRequest";
8383

84+
/**
85+
* Boolean property specifying if the attachments have been partially written
86+
* (due to I/O error, fe).
87+
*/
88+
String PARTIAL_ATTACHMENTS_MESSAGE = "org.apache.cxf.partial.attachments";
89+
90+
8491
/**
8592
* Boolean property specifying if oneWay response must be processed.
8693
*/

rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap11FaultOutInterceptor.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.cxf.common.logging.LogUtils;
3636
import org.apache.cxf.common.util.StringUtils;
3737
import org.apache.cxf.interceptor.Fault;
38+
import org.apache.cxf.message.Message;
39+
import org.apache.cxf.message.MessageUtils;
3840
import org.apache.cxf.phase.Phase;
3941
import org.apache.cxf.staxutils.StaxUtils;
4042

@@ -61,9 +63,16 @@ static class Soap11FaultOutInterceptorInternal extends AbstractSoapInterceptor {
6163
super(Phase.MARSHAL);
6264
}
6365
public void handleMessage(SoapMessage message) throws Fault {
64-
XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
6566
Fault f = (Fault) message.getContent(Exception.class);
67+
68+
// If only some attachments have been written (usually, using chunked transfer), we could
69+
// have been streaming some data already and may not be able to inject a fault in the middle
70+
// of the data transfer.
71+
if (MessageUtils.getContextualBoolean(message, Message.PARTIAL_ATTACHMENTS_MESSAGE, false)) {
72+
throw f;
73+
}
6674

75+
XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
6776
SoapFault fault = SoapFault.createFault(f, message.getVersion());
6877

6978
try {

rt/bindings/soap/src/main/java/org/apache/cxf/binding/soap/interceptor/Soap12FaultOutInterceptor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.cxf.common.util.StringUtils;
3838
import org.apache.cxf.interceptor.Fault;
3939
import org.apache.cxf.message.Message;
40+
import org.apache.cxf.message.MessageUtils;
4041
import org.apache.cxf.phase.Phase;
4142
import org.apache.cxf.staxutils.StaxUtils;
4243

@@ -64,9 +65,16 @@ static class Soap12FaultOutInterceptorInternal extends AbstractSoapInterceptor {
6465
}
6566
public void handleMessage(SoapMessage message) throws Fault {
6667
LOG.info(getClass() + (String) message.get(Message.CONTENT_TYPE));
68+
Fault f = (Fault)message.getContent(Exception.class);
69+
70+
// If only some attachments have been written (usually, using chunked transfer), we could
71+
// have been streaming some data already and may not be able to inject a fault in the middle
72+
// of the data transfer.
73+
if (MessageUtils.getContextualBoolean(message, Message.PARTIAL_ATTACHMENTS_MESSAGE, false)) {
74+
throw f;
75+
}
6776

6877
XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
69-
Fault f = (Fault)message.getContent(Exception.class);
7078
message.put(org.apache.cxf.message.Message.RESPONSE_CODE, f.getStatusCode());
7179

7280
SoapFault fault = SoapFault.createFault(f, message.getVersion());

systests/jaxws/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,18 @@
6363
<goal>wsdl2java</goal>
6464
</goals>
6565
</execution>
66+
<execution>
67+
<id>generate-attachments-test-sources</id>
68+
<phase>generate-test-sources</phase>
69+
<configuration>
70+
<fork>${cxf.codegenplugin.forkmode}</fork>
71+
<testSourceRoot>${basedir}/target/generated/src/test/java</testSourceRoot>
72+
<testWsdlRoot>${basedir}/src/test/resources/attachments</testWsdlRoot>
73+
</configuration>
74+
<goals>
75+
<goal>wsdl2java</goal>
76+
</goals>
77+
</execution>
6678
</executions>
6779
</plugin>
6880
<plugin>
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cxf.systest.jaxws;
21+
22+
import java.io.ByteArrayInputStream;
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.io.OutputStream;
26+
import java.util.Arrays;
27+
import java.util.concurrent.ThreadLocalRandom;
28+
import java.util.logging.Logger;
29+
30+
import javax.activation.DataHandler;
31+
import javax.activation.DataSource;
32+
import javax.xml.ws.Binding;
33+
import javax.xml.ws.BindingProvider;
34+
import javax.xml.ws.Endpoint;
35+
import javax.xml.ws.soap.SOAPBinding;
36+
import javax.xml.ws.soap.SOAPFaultException;
37+
38+
import org.apache.cxf.Download;
39+
import org.apache.cxf.DownloadFault_Exception;
40+
import org.apache.cxf.DownloadNextResponseType;
41+
import org.apache.cxf.common.logging.LogUtils;
42+
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
43+
import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
44+
import org.apache.cxf.testutil.common.AbstractBusTestServerBase;
45+
46+
import org.junit.BeforeClass;
47+
import org.junit.Test;
48+
49+
import static org.hamcrest.CoreMatchers.containsString;
50+
import static org.hamcrest.CoreMatchers.equalTo;
51+
import static org.hamcrest.MatcherAssert.assertThat;
52+
import static org.junit.Assert.assertThrows;
53+
import static org.junit.Assert.assertTrue;
54+
55+
public class AttachmentChunkingTest extends AbstractBusClientServerTestBase {
56+
private static final String PORT = allocatePort(DownloadServer.class);
57+
private static final Logger LOG = LogUtils.getLogger(AttachmentChunkingTest.class);
58+
59+
private static final class DownloadImpl implements Download {
60+
@Override
61+
public DownloadNextResponseType downloadNext(Boolean simulate) {
62+
final DownloadNextResponseType responseType = new DownloadNextResponseType();
63+
responseType.setDataContent(new DataHandler(new DataSource() {
64+
@Override
65+
public InputStream getInputStream() {
66+
if (simulate) {
67+
return simulate();
68+
} else {
69+
return generate(100000);
70+
}
71+
}
72+
73+
@Override
74+
public OutputStream getOutputStream() {
75+
return null;
76+
}
77+
78+
@Override
79+
public String getContentType() {
80+
return "";
81+
}
82+
83+
@Override
84+
public String getName() {
85+
return "";
86+
}
87+
}));
88+
89+
return responseType;
90+
}
91+
}
92+
93+
public static class DownloadServer extends AbstractBusTestServerBase {
94+
protected void run() {
95+
Object implementor = new DownloadImpl();
96+
String address = "http://localhost:" + PORT + "/SoapContext/SoapPort";
97+
Endpoint.publish(address, implementor);
98+
}
99+
100+
public static void main(String[] args) {
101+
try {
102+
DownloadServer s = new DownloadServer();
103+
s.start();
104+
} catch (Exception ex) {
105+
ex.printStackTrace();
106+
System.exit(-1);
107+
} finally {
108+
LOG.info("done!");
109+
}
110+
}
111+
}
112+
113+
@BeforeClass
114+
public static void startServers() throws Exception {
115+
assertTrue("server did not launch correctly", launchServer(DownloadServer.class, true));
116+
}
117+
118+
@Test
119+
public void testChunkingPartialFailure() {
120+
final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
121+
factory.setServiceClass(Download.class);
122+
123+
final Download client = (Download) factory.create();
124+
final BindingProvider bindingProvider = (BindingProvider) client;
125+
final Binding binding = bindingProvider.getBinding();
126+
127+
final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT);
128+
bindingProvider.getRequestContext().put("jakarta.xml.ws.service.endpoint.address", address);
129+
((SOAPBinding) binding).setMTOMEnabled(true);
130+
131+
// See please https://issues.apache.org/jira/browse/CXF-9057
132+
SOAPFaultException ex = assertThrows(SOAPFaultException.class, () -> client.downloadNext(true));
133+
assertThat(ex.getMessage(), containsString("simulated error during stream processing"));
134+
}
135+
136+
@Test
137+
public void testChunking() throws IOException, DownloadFault_Exception {
138+
final JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
139+
factory.setServiceClass(Download.class);
140+
141+
final Download client = (Download) factory.create();
142+
final BindingProvider bindingProvider = (BindingProvider) client;
143+
final Binding binding = bindingProvider.getBinding();
144+
145+
final String address = String.format("http://localhost:%s/SoapContext/SoapPort/DownloadPort", PORT);
146+
bindingProvider.getRequestContext().put("jakarta.xml.ws.service.endpoint.address", address);
147+
((SOAPBinding) binding).setMTOMEnabled(true);
148+
149+
final DownloadNextResponseType response = client.downloadNext(false);
150+
assertThat(response.getDataContent().getInputStream().readAllBytes().length, equalTo(100000));
151+
}
152+
153+
private static InputStream generate(int size) {
154+
final byte[] buf = new byte[size];
155+
Arrays.fill(buf, (byte) 'x');
156+
return new ByteArrayInputStream(buf);
157+
}
158+
159+
private static InputStream simulate() {
160+
return new InputStream() {
161+
@Override
162+
public int read() {
163+
return (byte) 'x';
164+
}
165+
166+
@Override
167+
public int read(byte[] b, int off, int len) {
168+
if (ThreadLocalRandom.current().nextBoolean()) {
169+
throw new IllegalArgumentException("simulated error during stream processing");
170+
}
171+
172+
for (int i = off; i < off + len; i++) {
173+
b[i] = (byte) 'x';
174+
}
175+
176+
return len;
177+
}
178+
};
179+
}
180+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<wsdl:definitions name="Download" targetNamespace="http://cxf.apache.org/"
3+
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
4+
xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:tns="http://cxf.apache.org/"
5+
xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
6+
xmlns:wsam="http://www.w3.org/2007/05/addressing/metadata">
7+
<wsdl:types>
8+
<xs:schema xmlns:tns="http://cxf.apache.org/"
9+
xmlns:xmime="http://www.w3.org/2005/05/xmlmime"
10+
xmlns:xs="http://www.w3.org/2001/XMLSchema" attributeFormDefault="unqualified"
11+
elementFormDefault="unqualified" targetNamespace="http://cxf.apache.org/">
12+
13+
<xs:import namespace="http://www.w3.org/2005/05/xmlmime"/>
14+
<xs:element name="downloadNext" type="tns:downloadNext"/>
15+
<xs:element name="downloadNextResponse" type="tns:downloadNextResponse"/>
16+
<xs:complexType name="downloadNextResponseType">
17+
<xs:sequence>
18+
<xs:element name="dataContent" type="xs:base64Binary"
19+
xmime:expectedContentTypes="application/octet-stream"/>
20+
</xs:sequence>
21+
</xs:complexType>
22+
<xs:complexType name="downloadNext">
23+
<xs:sequence>
24+
<xs:element minOccurs="0" name="simulate" type="xs:boolean"/>
25+
</xs:sequence>
26+
</xs:complexType>
27+
<xs:complexType name="downloadNextResponse">
28+
<xs:sequence>
29+
<xs:element form="qualified" minOccurs="0" name="downloadNextResponse"
30+
type="tns:downloadNextResponseType"/>
31+
</xs:sequence>
32+
</xs:complexType>
33+
<xs:element name="DownloadFault" type="tns:DownloadFault"/>
34+
<xs:complexType name="DownloadFault">
35+
<xs:sequence/>
36+
</xs:complexType>
37+
</xs:schema>
38+
</wsdl:types>
39+
<wsdl:message name="DownloadFault">
40+
<wsdl:part name="DownloadFault" element="tns:DownloadFault">
41+
</wsdl:part>
42+
</wsdl:message>
43+
<wsdl:message name="downloadNextResponse">
44+
<wsdl:part name="parameters" element="tns:downloadNextResponse">
45+
</wsdl:part>
46+
</wsdl:message>
47+
<wsdl:message name="downloadNext">
48+
<wsdl:part name="parameters" element="tns:downloadNext">
49+
</wsdl:part>
50+
</wsdl:message>
51+
<wsdl:portType name="Download">
52+
<wsdl:operation name="downloadNext">
53+
<wsdl:input name="downloadNext" message="tns:downloadNext"
54+
wsam:Action="http://cxf.apache.org/downloadNext">
55+
</wsdl:input>
56+
<wsdl:output name="downloadNextResponse" message="tns:downloadNextResponse"
57+
wsam:Action="http://cxf.apache.org/Download/downloadNextResponse">
58+
</wsdl:output>
59+
<wsdl:fault name="DownloadFault" message="tns:DownloadFault"
60+
wsam:Action="http://cxf.apache.org/Download/downloadNext/Fault/DownloadFault">
61+
</wsdl:fault>
62+
</wsdl:operation>
63+
</wsdl:portType>
64+
<wsdl:binding name="DownloadServiceInterfaceServiceSoapBinding" type="tns:Download">
65+
<soap:binding style="document" transport="http://schemas.xmlsoap.org/soap/http"/>
66+
<wsdl:operation name="downloadNext">
67+
<soap:operation soapAction="http://cxf.apache.org/downloadNext" style="document"/>
68+
<wsdl:input name="downloadNext">
69+
<soap:body use="literal"/>
70+
</wsdl:input>
71+
<wsdl:output name="downloadNextResponse">
72+
<soap:body use="literal"/>
73+
</wsdl:output>
74+
<wsdl:fault name="DownloadFault">
75+
<soap:fault name="DownloadFault" use="literal"/>
76+
</wsdl:fault>
77+
</wsdl:operation>
78+
</wsdl:binding>
79+
<wsdl:service name="DownloadServiceInterfaceService">
80+
<wsdl:port name="DownloadPort" binding="tns:DownloadServiceInterfaceServiceSoapBinding">
81+
<soap:address location="http://localhost:9090/DownloadPort"/>
82+
</wsdl:port>
83+
</wsdl:service>
84+
</wsdl:definitions>

0 commit comments

Comments
 (0)