From ea9dedc204196605ebffbe6bf2f021455c09ea49 Mon Sep 17 00:00:00 2001 From: David Sykes Date: Wed, 8 Jun 2016 17:38:55 +1000 Subject: [PATCH] Update to Java 8 and add streaming result Added JapperStreamingResult to allow for streaming of large result sets Added option to configure fetch size --- build.xml | 6 +- src/org/dt/japper/Japper.java | 272 +++++++++++++++++-- src/org/dt/japper/JapperStreamingResult.java | 218 +++++++++++++++ test-src/SimpleStreamingTest.java | 164 +++++++++++ 4 files changed, 629 insertions(+), 31 deletions(-) create mode 100644 src/org/dt/japper/JapperStreamingResult.java create mode 100644 test-src/SimpleStreamingTest.java diff --git a/build.xml b/build.xml index 2e6ad45..5b0b2ac 100644 --- a/build.xml +++ b/build.xml @@ -10,7 +10,7 @@ - + @@ -38,8 +38,8 @@ diff --git a/src/org/dt/japper/Japper.java b/src/org/dt/japper/Japper.java index 6b85973..258f98c 100644 --- a/src/org/dt/japper/Japper.java +++ b/src/org/dt/japper/Japper.java @@ -12,7 +12,7 @@ import java.util.*; /* - * Copyright (c) 2012-2015, David Sykes and Tomasz Orzechowski + * Copyright (c) 2012-2016, David Sykes and Tomasz Orzechowski * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -72,9 +72,139 @@ public class Japper { private static final Log log = LogFactory.getLog(Japper.class); + public static class Config { + private int fetchSize = 500; + + public void setFetchSize(int fetchSize) { + this.fetchSize = fetchSize; + } + + public int getFetchSize() { + return fetchSize; + } + } + + public static final Config DEFAULT_CONFIG = new Config(); + + + + /** + * Execute the given SQL query mapping the results to instances of resultType, returning the results + * in a form that they can be streamed. + *

+ * + * @param conn the connection to execute the query on + * @param resultType the result to map the query results to + * @param sql the SQL statement to execute + * @param params the parameters to the query + * @param + * @return a {@link JapperStreamingResult} which allows the caller to treat the results as an {@link Iterable} or as a {@link java.util.stream.Stream}. + */ + public static JapperStreamingResult streamableOf(Connection conn, Class resultType, String sql, Object...params) { + return streamableOf(DEFAULT_CONFIG, conn, resultType, null, sql, params); + } + + + /** + * Execute the given SQL query mapping the results to instances of resultType, returning the results + * in a form that they can be streamed. + *

+ * {@link #DEFAULT_CONFIG} will used for the configuration. + *

+ * + * @param config the {@link Config} to use when executing this query + * @param conn the connection to execute the query on + * @param resultType the result to map the query results to + * @param sql the SQL statement to execute + * @param params the parameters to the query + * @param + * @return a {@link JapperStreamingResult} which allows the caller to treat the results as an {@link Iterable} or as a {@link java.util.stream.Stream}. + */ + public static JapperStreamingResult streamableOf(Config config, Connection conn, Class resultType, String sql, Object...params) { + return streamableOf(config, conn, resultType, null, sql, params); + } + + /** + * Execute the given SQL query mapping the results to instances of resultType, returning the results + * in a form that they can be streamed. + *

+ * + * @param config the {@link Config} to use when executing this query + * @param conn the connection to execute the query on + * @param resultType the result to map the query results to + * @param rowProcessor an (optional) {@link org.dt.japper.RowProcessor} to perform additional per-row processing on the result + * @param sql the SQL statement to execute + * @param params the parameters to the query + * @param + * @return a {@link JapperStreamingResult} which allows the caller to treat the results as an {@link Iterable} or as a {@link java.util.stream.Stream}. + */ + public static JapperStreamingResult streamableOf(Config config, Connection conn, Class resultType, RowProcessor rowProcessor, String sql, Object...params) { + Profile profile = new Profile(resultType, sql); + + boolean needsCleanup = true; + + PreparedStatement ps = null; + try { + ps = prepareSql(profile, config, conn, sql, params); + + profile.startQuery(); + ResultSetMetaData metaData = ps.getMetaData(); + if (rowProcessor != null) { + rowProcessor.prepare(metaData); + } + logMetaData(metaData); + + ResultSet rs = ps.executeQuery(); + profile.stopQuery(); + + profile.startMap(); + Mapper mapper = getMapper(resultType, sql, metaData); + profile.stopMapperCreation(); + + needsCleanup = false; + return new JapperStreamingResult(ps, rs, mapper, rowProcessor, profile); + } + catch (SQLException sqlEx) { + try { + profile.end(); + profile.log(); + } + catch (Throwable ignoredExceptionDuringProfileLog) { } + + throw new JapperException(sqlEx); + } + finally { + if (needsCleanup) { + Throwable exceptionDuringDispose = null; + if (rowProcessor != null) { + try { + rowProcessor.dispose(); + } + catch (Throwable t) { + exceptionDuringDispose = t; + } + } + + try { if (ps != null) ps.close(); } catch (SQLException ignored) {} + + if (exceptionDuringDispose != null) { + if (exceptionDuringDispose instanceof JapperException) { + throw (JapperException) exceptionDuringDispose; + } + throw new JapperException(exceptionDuringDispose); + } + } + } + } + + + /** * Execute the given SQL query on the given connection, mapping the result to the given - * resultType + * resultType. + *

+ * {@link #DEFAULT_CONFIG} will used for the configuration. + *

* * @param conn the connection to execute the query on * @param resultType the result to map the query results to @@ -84,13 +214,29 @@ public class Japper { * @return the list of resultType instances containing the results of the query, or an empty list of the query returns no results */ public static List query(Connection conn, Class resultType, RowProcessor rowProcessor, String sql, Object...params) { + return query(DEFAULT_CONFIG, conn, resultType, rowProcessor, sql, params); + } + + /** + * Execute the given SQL query on the given connection, mapping the result to the given + * resultType + * + * @param config the {@link Config} to use when executing this query + * @param conn the connection to execute the query on + * @param resultType the result to map the query results to + * @param rowProcessor an (optional) {@link org.dt.japper.RowProcessor} to perform additional per-row processing on the result + * @param sql the SQL statement to execute + * @param params the parameters to the query + * @return the list of resultType instances containing the results of the query, or an empty list of the query returns no results + */ + public static List query(Config config, Connection conn, Class resultType, RowProcessor rowProcessor, String sql, Object...params) { Profile profile = new Profile(resultType, sql); List result = new ArrayList(); PreparedStatement ps = null; try { - ps = prepareSql(profile, conn, sql, params); + ps = prepareSql(profile, config, conn, sql, params); profile.startQuery(); ResultSetMetaData metaData = ps.getMetaData(); @@ -153,6 +299,9 @@ public static List query(Connection conn, Class resultType, RowProcess /** * Execute the given SQL query on the given connection, mapping the result to the given * resultType + *

+ * {@link #DEFAULT_CONFIG} will used for the configuration. + *

* * @param conn the connection to execute the query on * @param resultType the result to map the query results to @@ -161,7 +310,22 @@ public static List query(Connection conn, Class resultType, RowProcess * @return the list of resultType instances containing the results of the query, or an empty list of the query returns no results */ public static List query(Connection conn, Class resultType, String sql, Object...params) { - return query(conn, resultType, null, sql, params); + return query(DEFAULT_CONFIG, conn, resultType, null, sql, params); + } + + /** + * Execute the given SQL query on the given connection, mapping the result to the given + * resultType + * + * @param config the {@link Config} to use when executing this query + * @param conn the connection to execute the query on + * @param resultType the result to map the query results to + * @param sql the SQL statement to execute + * @param params the parameters to the query + * @return the list of resultType instances containing the results of the query, or an empty list of the query returns no results + */ + public static List query(Config config, Connection conn, Class resultType, String sql, Object...params) { + return query(config, conn, resultType, null, sql, params); } @@ -173,6 +337,9 @@ public static List query(Connection conn, Class resultType, String sql * and then returns the first element of the returned list. It is assumed the caller * is not issuing a query that returns thousands of rows and then only wants the first one *

+ * {@link #DEFAULT_CONFIG} will used for the configuration. + *

+ * * @param conn the connection to execute the query on * @param resultType the result to map the query results to * @param rowProcessor an (optional) {@link org.dt.japper.RowProcessor} to perform additional per-row processing on the result @@ -181,6 +348,26 @@ public static List query(Connection conn, Class resultType, String sql * @return the first result of the query mapped to a resultType instances, or null if the query returns no results */ public static T queryOne(Connection conn, Class resultType, RowProcessor rowProcessor, String sql, Object...params) { + return queryOne(DEFAULT_CONFIG, conn, resultType, rowProcessor, sql, params); + } + + /** + * Execute the given SQL query on the given connection, mapping the result to the given + * resultType. Return only the first result returned. + *

+ * NOTE: at present this implementation of this is very naive. It simply calls query() + * and then returns the first element of the returned list. It is assumed the caller + * is not issuing a query that returns thousands of rows and then only wants the first one + *

+ * @param config the {@link Config} to use when executing this query + * @param conn the connection to execute the query on + * @param resultType the result to map the query results to + * @param rowProcessor an (optional) {@link org.dt.japper.RowProcessor} to perform additional per-row processing on the result + * @param sql the SQL statement to execute + * @param params the parameters to the query + * @return the first result of the query mapped to a resultType instances, or null if the query returns no results + */ + public static T queryOne(Config config, Connection conn, Class resultType, RowProcessor rowProcessor, String sql, Object...params) { List results = query(conn, resultType, rowProcessor, sql, params); if (results.size() > 0) { return results.get(0); @@ -196,6 +383,9 @@ public static T queryOne(Connection conn, Class resultType, RowProcessor< * and then returns the first element of the returned list. It is assumed the caller * is not issuing a query that returns thousands of rows and then only wants the first one *

+ * {@link #DEFAULT_CONFIG} will used for the configuration. + *

+ * * @param conn the connection to execute the query on * @param resultType the result to map the query results to * @param sql the SQL statement to execute @@ -203,30 +393,43 @@ public static T queryOne(Connection conn, Class resultType, RowProcessor< * @return the first result of the query mapped to a resultType instances, or null if the query returns no results */ public static T queryOne(Connection conn, Class resultType, String sql, Object...params) { - List results = query(conn, resultType, null, sql, params); - if (results.size() > 0) { - return results.get(0); - } - return null; + return queryOne(DEFAULT_CONFIG, conn, resultType, (RowProcessor)null, sql, params); } - - + + /** * Execute the given SQL query on the given connection, returning the result as a * {@link QueryResult}. - * + *

+ * {@link #DEFAULT_CONFIG} will used for the configuration. + *

+ * * @param conn the connection to execute the query on * @param sql the SQL statement to execute * @param params the parameters to the query * @return the result set as a {@link QueryResult} */ public static QueryResult query(Connection conn, String sql, Object...params) { + return query(DEFAULT_CONFIG, conn, sql, params); + } + + /** + * Execute the given SQL query on the given connection, returning the result as a + * {@link QueryResult}. + * + * @param config the {@link Config} to use when executing this query + * @param conn the connection to execute the query on + * @param sql the SQL statement to execute + * @param params the parameters to the query + * @return the result set as a {@link QueryResult} + */ + public static QueryResult query(Config config, Connection conn, String sql, Object...params) { Profile profile = new Profile(ResultSet.class, sql); PreparedStatement ps = null; try { - ps = prepareSql(profile, conn, sql, params); + ps = prepareSql(profile, config, conn, sql, params); profile.startQuery(); ResultSetMetaData metaData = ps.getMetaData(); @@ -252,8 +455,8 @@ public static QueryResult query(Connection conn, String sql, Object...params) { throw new JapperException(sqlEx); } } - - + + /** * Execute the given SQL statement on the given {@link Connection}, returning the * number of rows affected. @@ -261,18 +464,39 @@ public static QueryResult query(Connection conn, String sql, Object...params) { * This method is designed for issuing UPDATE/DELETE or other non-query SQL statements * on the database, but taking advantage of all of the parameter parsing, setting and * conversions offered by Japper. - * + *

+ * {@link #DEFAULT_CONFIG} will used for the configuration. + *

+ * * @param conn the connection to execute the query on * @param sql the SQL statement to execute * @param params the parameters to the query * @return the number of rows affected by the given statement */ public static int execute(Connection conn, String sql, Object...params) { + return execute(DEFAULT_CONFIG, conn, sql, params); + } + + /** + * Execute the given SQL statement on the given {@link Connection}, returning the + * number of rows affected. + *

+ * This method is designed for issuing UPDATE/DELETE or other non-query SQL statements + * on the database, but taking advantage of all of the parameter parsing, setting and + * conversions offered by Japper. + * + * @param config the {@link Config} to use when executing this query + * @param conn the connection to execute the query on + * @param sql the SQL statement to execute + * @param params the parameters to the query + * @return the number of rows affected by the given statement + */ + public static int execute(Config config, Connection conn, String sql, Object...params) { Profile profile = new Profile("statement", int.class, sql); PreparedStatement ps = null; try { - ps = prepareSql(profile, conn, sql, params); + ps = prepareSql(profile, config, conn, sql, params); profile.startQuery(); int rowsAffected = ps.executeUpdate(); @@ -459,22 +683,14 @@ private static CallableStatement prepareCallSql(Profile profile, Connection conn return cs; } - private static PreparedStatement prepareSql(Profile profile, Connection conn, String sql, Object...params) throws SQLException { + private static PreparedStatement prepareSql(Profile profile, Config config, Connection conn, String sql, Object...params) throws SQLException { profile.startPrep(); ParameterParser parser = new ParameterParser(sql, params).parse(); profile.setSql(parser.getSql()); PreparedStatement ps = conn.prepareStatement(parser.getSql()); - /* - * For some testing over a remote connection, we want to make sure the query results - * get loaded in a single round-trip - * 500 is a good size for these tests - * - * TODO think a bit more about whether this is a good default, or whether this - * needs to be a parameter - */ - ps.setFetchSize(500); + ps.setFetchSize(config.getFetchSize()); if (params != null && params.length > 0) { for (ParameterParser.ParameterValue paramValue : parser.getParameterValues()) { @@ -627,7 +843,7 @@ private static void logMetaData(ResultSetMetaData metaData) throws SQLException } - private static class Profile { + static class Profile { private String dmlType; private Class type; diff --git a/src/org/dt/japper/JapperStreamingResult.java b/src/org/dt/japper/JapperStreamingResult.java new file mode 100644 index 0000000..e84288f --- /dev/null +++ b/src/org/dt/japper/JapperStreamingResult.java @@ -0,0 +1,218 @@ +package org.dt.japper; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + + +/* + * Copyright (c) 2012-2016, David Sykes and Tomasz Orzechowski + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * - Neither the name David Sykes nor Tomasz Orzechowski may be used to endorse + * or promote products derived from this software without specific prior written + * permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. * @author Administrator + * + * + */ + + +/** + * A class wrapping the {@link ResultSet} of a {@link Japper} query allowing the caller + * to stream over the results, rather than first build a list of all of the results. + *

+ * This object can be iterated over directly using the {@link #hasNext()}/{@link #next()} methods. + *

+ *

+ * Optionally, the caller may wish to iterate using the for (T result : japperStreamingResult) pattern. + *

+ *

+ * Finally, this object is also capable of returning a {@link Stream}. + *

+ *

+ * This {@link #close()} method must be called on this to ensure that the results are cleaned up correctly. This + * class implements {@link AutoCloseable} to enable use of the try-with-resources pattern to make this easier + * to ensure. + *

+ *
+ *   try (JapperStreamingResult<MyType> result = Japper.streamableOf(conn, MyType.class, SQL_GET_MY_TYPES, "PARAM1", param1)) {
+ *     result.stream()
+ *        .filter(type -> type.isThisTheDroneWeAreLookingFor())
+ *        .forEach(type -> System.out.println("We found drone " + type.getName()))
+ *        ;
+ *   }
+ * 
+ * @param + */ +public class JapperStreamingResult implements AutoCloseable, Iterator, Iterable { + + /** + * The SQL {@link PreparedStatement} from which we originally got our result set + */ + private final PreparedStatement ps; + + /** + * The {@link ResultSet} from which we are streaming the results + */ + private final ResultSet resultSet; + + /** + * The {@link Mapper} to be used for mapping each record into + * the wanted result type. + */ + private final Mapper mapper; + + /** + * A {@link RowProcessor} to call for each row of the result + */ + private final RowProcessor rowProcessor; + + /** + * Profile data about this execution + */ + private final Japper.Profile profile; + + /** + * Have we actually started streaming the result? + */ + private boolean streamStarted; + + private boolean nextKnown; + private T nextResult; + + JapperStreamingResult(PreparedStatement ps, ResultSet resultSet, Mapper mapper, RowProcessor rowProcessor, Japper.Profile profile) { + this.ps = ps; + this.resultSet = resultSet; + this.mapper = mapper; + this.rowProcessor = rowProcessor; + this.profile = profile; + } + + public Stream stream() { + return StreamSupport.stream(spliterator(), false); + } + + @Override + public Spliterator spliterator() { + return Spliterators.spliteratorUnknownSize(iterator(), Spliterator.ORDERED | Spliterator.NONNULL); + } + + @Override + public Iterator iterator() { + if (streamStarted) { + throw new IllegalStateException("Streaming of these results has already started. It cannot be re-started!"); + } + + return this; + } + + + @Override + public boolean hasNext() { + if (!nextKnown) { + try { + knowNext(); + } + catch (SQLException sqlEx) { + throw new JapperException(sqlEx); + } + } + + return nextResult != null; + } + + @Override + public T next() { + if (!nextKnown) { + try { + knowNext(); + } + catch (SQLException sqlEx) { + throw new JapperException(sqlEx); + } + } + + if (nextResult == null) { + throw new NoSuchElementException(); + } + + nextKnown = false; + return nextResult; + } + + private void knowNext() throws SQLException { + streamStarted = true; + + nextResult = null; + + if (resultSet.next()) { + profile.startMapRow(); + nextResult = mapper.map(resultSet, rowProcessor); + profile.stopMapRow(); + } + else { + // we are at the end of the results + profile.stopMap(); + } + + nextKnown = true; + } + + @Override + public void close() { + try { + profile.end(); + profile.log(); + } + catch (Throwable ignoredExceptionDuringProfileLog) { } + + Throwable exceptionDuringDispose = null; + if (rowProcessor != null) { + try { + rowProcessor.dispose(); + } + catch (Throwable t) { + exceptionDuringDispose = t; + } + } + + try { resultSet.close(); } catch (SQLException ignored) {} + try { ps.close(); } catch (SQLException ignored) {} + + if (exceptionDuringDispose != null) { + if (exceptionDuringDispose instanceof JapperException) { + throw (JapperException) exceptionDuringDispose; + } + throw new JapperException(exceptionDuringDispose); + } + } + +} diff --git a/test-src/SimpleStreamingTest.java b/test-src/SimpleStreamingTest.java new file mode 100644 index 0000000..5130efa --- /dev/null +++ b/test-src/SimpleStreamingTest.java @@ -0,0 +1,164 @@ +import org.dt.japper.Japper; +import org.dt.japper.JapperStreamingResult; +import org.dt.japper.TestData; +import org.dt.japper.testmodel.PartModel; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.sql.Connection; +import java.util.NoSuchElementException; + +import static org.junit.Assert.*; + + +/* + * Copyright (c) 2012-2016, David Sykes and Tomasz Orzechowski + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * - Neither the name David Sykes nor Tomasz Orzechowski may be used to endorse + * or promote products derived from this software without specific prior written + * permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. * @author Administrator + * + * + */ + +public class SimpleStreamingTest { + + private static TestData testData; + + @BeforeClass + public static void setupDB() throws Exception { + testData = new TestData(); + testData.create(); + } + + private static final String SQL_PARTS = + " SELECT *" + + " FROM part" + + " ORDER BY partno" + ; + + @Test + public void mapParts() throws Exception { + Connection conn = testData.connect(); + + try (JapperStreamingResult parts = Japper.streamableOf(conn, PartModel.class, SQL_PARTS)) { + assertTrue(parts.hasNext()); + + PartModel part = parts.next(); + assertEquals("123456", part.getPartno()); + assertEquals("FAB", part.getPartType()); + } + + conn.close(); + } + + @Test + public void testMultipleHasNext() throws Exception { + Connection conn = testData.connect(); + + try (JapperStreamingResult parts = Japper.streamableOf(conn, PartModel.class, SQL_PARTS)) { + assertTrue(parts.hasNext()); + assertTrue(parts.hasNext()); + assertTrue(parts.hasNext()); + + PartModel part = parts.next(); + assertEquals("123456", part.getPartno()); + assertEquals("FAB", part.getPartType()); + } + + conn.close(); + } + + @Test + public void testNoSuchElementException() throws Exception { + Connection conn = testData.connect(); + + try (JapperStreamingResult parts = Japper.streamableOf(conn, PartModel.class, SQL_PARTS)) { + parts.next(); + parts.next(); + parts.next(); + + try { + parts.next(); + fail("We should have thrown a NoSuchElementException"); + } + catch (NoSuchElementException nseEx) { + // this is success! + } + } + + conn.close(); + } + + @Test + public void testAsIterable() throws Exception { + Connection conn = testData.connect(); + + try (JapperStreamingResult parts = Japper.streamableOf(conn, PartModel.class, SQL_PARTS)) { + for (PartModel part : parts) { + assertNotNull(part); + } + } + + conn.close(); + } + + @Test + public void testAsStream() throws Exception { + Connection conn = testData.connect(); + + try (JapperStreamingResult parts = Japper.streamableOf(conn, PartModel.class, SQL_PARTS)) { + long count = parts.stream() + .map(part -> part != null) + .count() + ; + assertEquals(3, count); + } + + conn.close(); + } + + + @Test + public void testRestartException() throws Exception { + Connection conn = testData.connect(); + + try (JapperStreamingResult parts = Japper.streamableOf(conn, PartModel.class, SQL_PARTS)) { + parts.next(); + + try { + parts.forEach(part -> assertNotNull(part)); + fail("We were able to 'restart'!"); + } + catch (IllegalStateException isEx) { + // this is expected + } + } + + conn.close(); + } + +}