Skip to content

Commit

Permalink
Updates for 1.3.1 release
Browse files Browse the repository at this point in the history
- fixed UDF names lookups to do exact regex match in the CSV list in the meta-data region
  i.e. ",<name>," or "<head><name>," instead of just searching for <name>;
  this fixes failures in UserDefinedFunctionsDUnitTest when run in suite due to previous
  test having superset name "intudf_embed" which subsumed "intudf" causing the code to think
  that "intudf" was dropped in meta-data region
- updated log4j2 to 2.17.2
- changed jetty version to 9.4.43.v20210629
- updated sub-modules versions
- rollover the log-file on startup in the log4j2.properties.template
- fixed custom logj42.properties getting overridden by log4j2-defaults.properties at start
- fixed log file output shown on startup
- changed standard output file "start_" to use suffix as actual configured log-file name
- updated cluster-util.sh and other utilities to use log4j2.properties instead of
  log4j.properties
- updated docs and remaining places for log4j2.properties instead of log4j.properties
- updated dependency versions in LICENSE and NOTICE
- fixed sporadic failures in JDBCPreparedStatementDUnitTest, SnappyStorageEvictorSuite
- changed all instances of Process.exitValue() to instead use timed waitFor() because former
  is broken and may return values like "-1" before process exit
  • Loading branch information
sumwale committed Apr 12, 2022
1 parent 3594353 commit a466d26
Show file tree
Hide file tree
Showing 28 changed files with 306 additions and 262 deletions.
28 changes: 14 additions & 14 deletions LICENSE

Large diffs are not rendered by default.

178 changes: 91 additions & 87 deletions NOTICE

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ allprojects {
sparkDistName = "spark-${sparkVersion}-bin-hadoop2.7"
sparkCurrentVersion = '2.4.8'
sparkCurrentDistName = "spark-${sparkCurrentVersion}-bin-hadoop2.7"
sparkJobServerVersion = '0.6.2.12'
sparkJobServerVersion = '0.6.2.13'
snappySparkMetricsLibVersion = '2.0.0.1'
log4j2Version = '2.17.1'
log4j2Version = '2.17.2'
slf4jVersion = '1.7.32'
junitVersion = '4.12'
mockitoVersion = '1.10.19'
Expand All @@ -121,7 +121,7 @@ allprojects {
sparkXmlVersion = '0.4.1'
scalatestVersion = '2.2.6'
py4jVersion = '0.10.7'
jettyVersion = '9.4.44.v20210927'
jettyVersion = '9.4.43.v20210629'
guavaVersion = '14.0.1'
fastutilVersion = '8.5.6'
kryoVersion = '4.0.1'
Expand Down Expand Up @@ -159,7 +159,7 @@ allprojects {
eclipseCollectionsVersion = '10.4.0'

pegdownVersion = '1.6.0'
snappyStoreVersion = '1.6.6'
snappyStoreVersion = '1.6.7'
snappydataVersion = version
zeppelinInterpreterVersion = '0.8.2.1'

Expand Down
2 changes: 1 addition & 1 deletion cluster/bin/snappy
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ elif [ -z "$SNAPPY_NO_QUICK_LAUNCH" -a $# -ge 2 \
fi
fi

JARS="`echo "${SPARK_HOME}"/jars/snappydata-launcher* "${SPARK_HOME}"/jars/gemfire-shared* "${SPARK_HOME}"/jars/log4j-* "${SPARK_HOME}"/jars/jna-4.* | sed 's/ /:/g'`"
JARS="${SPARK_HOME}/conf:`echo "${SPARK_HOME}"/jars/snappydata-launcher* "${SPARK_HOME}"/jars/gemfire-shared* "${SPARK_HOME}"/jars/log4j-* "${SPARK_HOME}"/jars/jna-4.* | sed 's/ /:/g'`"
exec $RUNNER $JAVA_ARGS -Xverify:none -cp "$JARS" io.snappydata.tools.QuickLauncher "$@" $HOSTNAME_FOR_CLIENTS $IMPLICIT_AWS_CLIENT_BIND_ADDRESS
IMPLICIT_CLIENT_BIND_ADDRESS=
EXPLICIT_CLIENT_BIND_ADDRESS=
Expand Down
2 changes: 2 additions & 0 deletions cluster/conf/log4j2.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ appender.rolling.fileName = snappydata.log
appender.rolling.filePattern = snappydata.%d{yy-MM-dd}.%i.log.gz
appender.rolling.append = true
appender.rolling.policies.type = Policies
appender.rolling.policies.startup.type = OnStartupTriggeringPolicy
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size = 100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
Expand All @@ -59,6 +60,7 @@ appender.code.fileName = generatedcode.log
appender.code.filePattern = generatedcode.%d{yy-MM-dd}.%i.log.gz
appender.code.append = true
appender.code.policies.type = Policies
appender.code.policies.startup.type = OnStartupTriggeringPolicy
appender.code.policies.size.type = SizeBasedTriggeringPolicy
appender.code.policies.size.size = 100MB
appender.code.strategy.type = DefaultRolloverStrategy
Expand Down
8 changes: 4 additions & 4 deletions cluster/sbin/cluster-util.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
#
# Copyright (c) 2017-2019 TIBCO Software Inc. All rights reserved.
#
Expand Down Expand Up @@ -61,7 +61,7 @@ usage() {
echo -e ' \t ''\t'"If specified, the script doesn't ask for confirmation for execution of the command on each member node."
echo
echo -e ' \t '"--copy-conf"
echo -e ' \t ''\t'"This is a shortcut command which when specified copies log4j.properties, snappy-env.sh and "
echo -e ' \t ''\t'"This is a shortcut command which when specified copies log4j2.properties, snappy-env.sh and "
echo -e ' \t ''\t'"spark-env.sh configuration files from local machine to all the members."
echo -e ' \t ''\t'"These files are copied only if a) these are absent in the destination member or b) their content is different. In "
echo -e ' \t ''\t'"latter case, a backup of the file is taken in conf/backup directory on destination member, before copy."
Expand Down Expand Up @@ -165,9 +165,9 @@ START_ALL_TIMESTAMP="$(date +"%Y_%m_%d_%H_%M_%S")"

function copyConf() {
for entry in "${SPARK_CONF_DIR}"/*; do
if [ -f "$entry" ];then
if [ -f "$entry" ]; then
fileName=$(basename $entry)
if [[ $fileName == "log4j.properties" || $fileName == "snappy-env.sh" || $fileName == "spark-env.sh" ]]; then
if [[ $fileName == "log4j2.properties" || $fileName == "snappy-env.sh" || $fileName == "spark-env.sh" ]]; then
if ! ssh $node "test -e $entry"; then #"File does not exist."
scp ${SPARK_CONF_DIR}/$fileName $node:${SPARK_CONF_DIR}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class SnappyMetricsSystemDUnitTest(s: String)
def collectJsonStats(): mutable.Map[String, AnyRef] = {
val url = "http://localhost:9090/metrics/json/"
// val json = scala.io.Source.fromURL(url).mkString
val json = s"curl $url".!!
val json = s"curl -s $url".!!
val data = jsonStrToMap(json)
val rs = data.-("counters", "meters", "histograms", "timers", "version")
val map = scala.collection.mutable.LinkedHashMap[String, AnyRef]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.snappydata.externalstore

import java.sql.{PreparedStatement, SQLException}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CountDownLatch, CyclicBarrier, Executors, TimeoutException}
import java.util.concurrent.{CyclicBarrier, Executors, TimeoutException}

import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import scala.util.Try
Expand Down Expand Up @@ -207,11 +207,11 @@ class JDBCPreparedStatementDUnitTest(s: String) extends ClusterManagerTestBase(s
ps.setString(2, "str" + i)
ps.addBatch()
if (i % 10 == 0) {
var records = ps.executeBatch()
val records = ps.executeBatch()
records.foreach(r => numRows += r)
}
}
var records = ps.executeBatch()
val records = ps.executeBatch()
records.foreach(r => numRows += r)
(1, numRows)
}
Expand All @@ -222,17 +222,16 @@ class JDBCPreparedStatementDUnitTest(s: String) extends ClusterManagerTestBase(s
val conn = getANetConnection(netPort1)
val query1 = "update t3 set fs = ? where fs = ?"
ps = conn.prepareStatement(query1)
var fs1 = 1
for (i <- val1 to val2) {
ps.setString(1, "temp" + i)
ps.setString(2, "str" + i)
ps.addBatch()
if (i % 10 == 0) {
var records = ps.executeBatch()
val records = ps.executeBatch()
records.foreach(r => numRows += r)
}
}
var records = ps.executeBatch()
val records = ps.executeBatch()
records.foreach(r => numRows += r)
(1, numRows)
}
Expand All @@ -247,16 +246,16 @@ class JDBCPreparedStatementDUnitTest(s: String) extends ClusterManagerTestBase(s
ps.setString(1, "temp" + i2)
ps.addBatch()
if (i2 % 10 == 0) {
var records = ps.executeBatch()
val records = ps.executeBatch()
records.foreach(r => numRows += r)
}
}
var records = ps.executeBatch()
val records = ps.executeBatch()
records.foreach(r => numRows += r)
(1, numRows)
}

def testComplexDataTypes() : Unit = {
def testComplexDataTypes(): Unit = {
vm2.invoke(classOf[ClusterManagerTestBase], "startNetServer", netPort1)
val conn = getANetConnection(netPort1)
val stmt = conn.createStatement()
Expand Down Expand Up @@ -302,42 +301,40 @@ class JDBCPreparedStatementDUnitTest(s: String) extends ClusterManagerTestBase(s
val stmt = conn.createStatement()
stmt.execute("drop table if exists t3")
stmt.execute("create table t3(id integer, fs string) using column options" +
"(key_columns 'id', COLUMN_MAX_DELTA_ROWS '7', BUCKETS '2')")

var thrCount1: Integer = 0
var insertedRecords = 0
val colThread1 = new Thread(new Runnable {def run() {
(1 to 5) foreach (i => {
var result = insertRecords(1, 10)
thrCount1 += result._1
insertedRecords += result._2
})
}
"(COLUMN_MAX_DELTA_ROWS '7', BUCKETS '2')")

val insertedRecords = new AtomicInteger(0)
val colThread1 = new Thread(new Runnable {
def run(): Unit = {
(1 to 5) foreach { _ =>
val result = insertRecords(1, 10)
insertedRecords.getAndAdd(result._2)
}
}
})
colThread1.start()

var thrCount2: Integer = 0
val colThread2 = new Thread(new Runnable {def run() {
(1 to 5) foreach (i => {
var result = insertRecords(11, 20)
thrCount2 += result._1
insertedRecords += result._2
})
}
val colThread2 = new Thread(new Runnable {
def run(): Unit = {
(1 to 5) foreach { _ =>
val result = insertRecords(11, 20)
insertedRecords.getAndAdd(result._2)
}
}
})
colThread2.start()

colThread1.join()
colThread2.join()

conn.commit()
var rscnt = stmt.executeQuery("select count(*) from t3")
rscnt.next()
assertEquals(100, rscnt.getInt(1))
assertEquals(100, insertedRecords)
assertEquals(100, insertedRecords.get())

val rs = stmt.executeQuery("select * from t3 order by id")


var i = 1
var cnt = 0

Expand All @@ -351,51 +348,47 @@ class JDBCPreparedStatementDUnitTest(s: String) extends ClusterManagerTestBase(s
cnt = cnt + 1
}

var thrCount3: Integer = 0
var updatedRecords = 0
val colThread3 = new Thread(new Runnable {def run() {
(1 to 5) foreach (i => {
var result = updateRecords(1, 20)
thrCount3 += result._1
updatedRecords += result._2
})
}
val updatedRecords = new AtomicInteger(0)
val colThread3 = new Thread(new Runnable {
def run(): Unit = {
(1 to 5) foreach { _ =>
val result = updateRecords(1, 20)
updatedRecords.getAndAdd(result._2)
}
}
})
colThread3.start()

var thrCount4: Integer = 0
val colThread4 = new Thread(new Runnable {def run() {
(1 to 5) foreach (i => {
var result = updateRecords(11, 20)
thrCount4 += result._1
updatedRecords += result._2
})
}
val colThread4 = new Thread(new Runnable {
def run(): Unit = {
(1 to 5) foreach { _ =>
val result = updateRecords(11, 20)
updatedRecords.getAndAdd(result._2)
}
}
})
colThread4.start()

var thrCount5: Integer = 0
val colThread5 = new Thread(new Runnable {def run() {
(1 to 5) foreach (i => {
var result = updateRecords(21, 30)
thrCount5 += result._1
updatedRecords += result._2
})
}
val colThread5 = new Thread(new Runnable {
def run(): Unit = {
(1 to 5) foreach { _ =>
val result = updateRecords(21, 30)
updatedRecords.getAndAdd(result._2)
}
}
})
colThread5.start()

colThread3.join()
colThread4.join()
colThread5.join()


rscnt = stmt.executeQuery("select count(*) from t3")
rscnt.next()
assertEquals(100, rscnt.getInt(1))
assertEquals(100, updatedRecords)
assertEquals(100, updatedRecords.get())

var rs1 = stmt.executeQuery("select * from t3 order by id")
val rs1 = stmt.executeQuery("select * from t3 order by id")
var i2 = 1
cnt = 0
while (rs1.next()) {
Expand All @@ -408,37 +401,34 @@ class JDBCPreparedStatementDUnitTest(s: String) extends ClusterManagerTestBase(s
cnt = cnt + 1
}

var thrCount6: Integer = 0
val deletedRecords = new AtomicInteger(0)
val colThread6 = new Thread(new Runnable {def run() {
(1 to 5) foreach (i => {
val result = deleteRecords(1, 20)
thrCount6 += result._1
deletedRecords.getAndAdd(result._2)
})
}
val colThread6 = new Thread(new Runnable {
def run(): Unit = {
(1 to 5) foreach { _ =>
val result = deleteRecords(1, 20)
deletedRecords.getAndAdd(result._2)
}
}
})
colThread6.start()

var thrCount7: Integer = 0
val colThread7 = new Thread(new Runnable {def run() {
(1 to 5) foreach (i => {
val result = deleteRecords(11, 20)
thrCount7 += result._1
deletedRecords.getAndAdd(result._2)
})
}
val colThread7 = new Thread(new Runnable {
def run(): Unit = {
(1 to 5) foreach { _ =>
val result = deleteRecords(11, 20)
deletedRecords.getAndAdd(result._2)
}
}
})
colThread7.start()

var thrCount8: Integer = 0
val colThread8 = new Thread(new Runnable {def run() {
(1 to 5) foreach (i => {
val result = deleteRecords(21, 30)
thrCount8 += result._1
deletedRecords.getAndAdd(result._2)
})
}
val colThread8 = new Thread(new Runnable {
def run(): Unit = {
(1 to 5) foreach { _ =>
val result = deleteRecords(21, 30)
deletedRecords.getAndAdd(result._2)
}
}
})
colThread8.start()

Expand All @@ -449,7 +439,7 @@ class JDBCPreparedStatementDUnitTest(s: String) extends ClusterManagerTestBase(s
rscnt = stmt.executeQuery("select count(*) from t3")
rscnt.next()
assertEquals(0, rscnt.getInt(1))
assertEquals(100, deletedRecords.get)
assertEquals(100, deletedRecords.get())
}

def testQueryCancellation(): Unit = {
Expand All @@ -462,7 +452,7 @@ class JDBCPreparedStatementDUnitTest(s: String) extends ClusterManagerTestBase(s
// significantly long duration.
stmt.execute(
s"""create table $table (col1 int, col2 int) using column as
|select id as col1, id as col2 from range(10000000)""".stripMargin)
|select id as col1, id as col2 from range(10000000)""".stripMargin)
val barrier = new CyclicBarrier(2)
try {
implicit val context: ExecutionContextExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.util.{Failure, Success, Try}

import io.snappydata.cluster.ClusterManagerTestBase
import io.snappydata.test.dunit.{AvailablePortHelper, DistributedTestBase}
import org.apache.commons.io.FileUtils

import org.apache.spark.{SparkUtilsAccess, TestUtils}
import org.apache.spark.TestUtils.JavaSourceFromString
Expand All @@ -36,6 +37,11 @@ case class OrderData(ref: Int, description: String, amount: Long)
class UserDefinedFunctionsDUnitTest(val s: String)
extends ClusterManagerTestBase(s) {

override def afterClass(): Unit = {
super.afterClass()
FileUtils.deleteQuietly(SparkUtilsAccess.destDir)
}

def testDriverHA(): Unit = {
// Stop the lead node
ClusterManagerTestBase.stopAny()
Expand Down
Loading

0 comments on commit a466d26

Please sign in to comment.