Skip to content

Commit ab5908a

Browse files
Merge branch 'master' into feature_streaming_enhancments
2 parents 51ba870 + d5ddb8e commit ab5908a

File tree

6 files changed

+183
-17
lines changed

6 files changed

+183
-17
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@
33
## Change History
44
All notable changes to the Databricks Labs Data Generator will be documented in this file.
55

6+
### Unreleased
7+
8+
#### Changed
9+
* Fixed use of logger in _version.py and in spark_singleton.py
10+
611
### Version 0.3.2
712

813
#### Changed

README.md

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99
<!-- Dont remove: end exclude package -->
1010

1111
[![build](https://github.com/databrickslabs/dbldatagen/workflows/build/badge.svg?branch=master)](https://github.com/databrickslabs/dbldatagen/actions?query=workflow%3Abuild+branch%3Amaster)
12+
[![PyPi package](https://img.shields.io/pypi/v/dbldatagen?color=green)](https://pypi.org/project/dbldatagen/)
1213
[![codecov](https://codecov.io/gh/databrickslabs/dbldatagen/branch/master/graph/badge.svg)](https://codecov.io/gh/databrickslabs/dbldatagen)
13-
[![PyPi downloads](https://img.shields.io/pypi/dm/dbldatagen?label=PyPi%20Downloads)](https://pypi.org/project/dbldatagen/)
14+
[![PyPi downloads](https://img.shields.io/pypi/dm/dbldatagen?label=PyPi%20Downloads)](https://pypistats.org/packages/dbldatagen)
15+
1416
<!--
1517
[![Language grade: Python](https://img.shields.io/lgtm/grade/python/g/databrickslabs/dbldatagen.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/databrickslabs/dbldatagen/context:python)
1618
[![downloads](https://img.shields.io/github/downloads/databrickslabs/dbldatagen/total.svg)](https://hanadigital.github.io/grev/?user=databrickslabs&repo=dbldatagen)
@@ -89,6 +91,14 @@ release notes for library compatibility
8991

9092
- https://docs.databricks.com/release-notes/runtime/releases.html
9193

94+
When using the Databricks Labs Data Generator on Unity Catalog enabled environments, the Data Generator requires
95+
the use of `Single User` or `No Isolation Shared` access modes as some needed features are not available in `Shared`
96+
mode (for example, use of 3rd party libraries). Depending on settings, `Custom` access mode may be supported.
97+
98+
See the following documentation for more information:
99+
100+
- https://docs.databricks.com/data-governance/unity-catalog/compute.html
101+
92102
## Using the Data Generator
93103
To use the data generator, install the library using the `%pip install` method or install the Python wheel directly
94104
in your environment.
@@ -104,19 +114,19 @@ column_count = 10
104114
data_rows = 1000 * 1000
105115
df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows,
106116
partitions=4)
107-
.withIdOutput()
108-
.withColumn("r", FloatType(),
109-
expr="floor(rand() * 350) * (86400 + 3600)",
110-
numColumns=column_count)
111-
.withColumn("code1", IntegerType(), minValue=100, maxValue=200)
112-
.withColumn("code2", IntegerType(), minValue=0, maxValue=10)
113-
.withColumn("code3", StringType(), values=['a', 'b', 'c'])
114-
.withColumn("code4", StringType(), values=['a', 'b', 'c'],
115-
random=True)
116-
.withColumn("code5", StringType(), values=['a', 'b', 'c'],
117-
random=True, weights=[9, 1, 1])
118-
119-
)
117+
.withIdOutput()
118+
.withColumn("r", FloatType(),
119+
expr="floor(rand() * 350) * (86400 + 3600)",
120+
numColumns=column_count)
121+
.withColumn("code1", IntegerType(), minValue=100, maxValue=200)
122+
.withColumn("code2", IntegerType(), minValue=0, maxValue=10)
123+
.withColumn("code3", StringType(), values=['a', 'b', 'c'])
124+
.withColumn("code4", StringType(), values=['a', 'b', 'c'],
125+
random=True)
126+
.withColumn("code5", StringType(), values=['a', 'b', 'c'],
127+
random=True, weights=[9, 1, 1])
128+
129+
)
120130
121131
df = df_spec.build()
122132
num_rows=df.count()

dbldatagen/_version.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ def _get_spark_version(sparkVersion):
4444
spark_version_info = VersionInfo(int(major), int(minor), int(patch), release, build="0")
4545
except (RuntimeError, AttributeError):
4646
spark_version_info = VersionInfo(major=3, minor=0, patch=1, release="unknown", build="0")
47-
logging.warning("Could not parse spark version - using assumed Spark Version : %s", spark_version_info)
47+
logger = logging.getLogger(__name__)
48+
logger.warning("Could not parse spark version - using assumed Spark Version : %s", spark_version_info)
4849

4950
return spark_version_info
5051

dbldatagen/data_generator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class DataGenerator:
8989

9090
# restrict spurious messages from java gateway
9191
logging.getLogger("py4j").setLevel(logging.WARNING)
92-
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.NOTSET)
92+
#logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.NOTSET)
9393

9494
def __init__(self, sparkSession=None, name=None, randomSeedMethod=None,
9595
rows=1000000, startingId=0, randomSeed=None, partitions=None, verbose=False,

dbldatagen/spark_singleton.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ def getLocalInstance(cls, appName="new Spark session", useAllCores=True):
4242
else:
4343
spark_core_count = cpu_count - 1
4444

45-
logging.info("Spark core count: %d", spark_core_count)
45+
logger = logging.getLogger(__name__)
46+
logger.info("Spark core count: %d", spark_core_count)
4647

4748
sparkSession = SparkSession.builder \
4849
.master(f"local[{spark_core_count}]") \

tests/test_logging.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import logging
2+
import pytest
3+
4+
from pyspark.sql import functions as F
5+
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
6+
7+
#import dbldatagen as dg
8+
9+
10+
@pytest.fixture(scope="class")
11+
def setupSpark():
12+
import dbldatagen as dg
13+
sparkSession = dg.SparkSingleton.getLocalInstance("unit tests")
14+
return sparkSession
15+
16+
17+
@pytest.fixture(scope="class")
18+
def setupLogging():
19+
#FORMAT = '%(asctime)-15s %(message)s'
20+
#logging.basicConfig(format=FORMAT)
21+
pass
22+
23+
24+
class TestLoggingOperation:
25+
testDataSpec = None
26+
dfTestData = None
27+
SMALL_ROW_COUNT = 100000
28+
TINY_ROW_COUNT = 1000
29+
column_count = 10
30+
row_count = SMALL_ROW_COUNT
31+
32+
def setup_log_capture(self, caplog_object):
33+
""" set up log capture fixture
34+
35+
Sets up log capture fixture to only capture messages after setup and only
36+
capture warnings and errors
37+
38+
"""
39+
caplog_object.set_level(logging.INFO)
40+
41+
# clear messages from setup
42+
caplog_object.clear()
43+
44+
def get_log_capture_warnings_and_errors(self, caplog_object, textFlag):
45+
"""
46+
gets count of errors containing specified text
47+
48+
:param caplog_object: log capture object from fixture
49+
:param textFlag: text to search for to include error or warning in count
50+
:return: count of errors containg text specified in `textFlag`
51+
"""
52+
flagged_text_warnings_and_errors = 0
53+
for r in caplog_object.records:
54+
if (r.levelname == "WARNING" or r.levelname == "ERROR") and textFlag in r.message:
55+
flagged_text_warnings_and_errors += 1
56+
57+
return flagged_text_warnings_and_errors
58+
59+
def get_log_capture_info(self, caplog_object, textFlag):
60+
"""
61+
gets count of errors containing specified text
62+
63+
:param caplog_object: log capture object from fixture
64+
:param textFlag: text to search for to include error or warning in count
65+
:return: count of errors containg text specified in `textFlag`
66+
"""
67+
flagged_text_info = 0
68+
for r in caplog_object.records:
69+
if (r.levelname == "INFO") and textFlag in r.message:
70+
flagged_text_info += 1
71+
72+
return flagged_text_info
73+
74+
75+
def test_logging_operation(self, caplog):
76+
# caplog fixture captures log content
77+
self.setup_log_capture(caplog)
78+
79+
date_format = "%Y-%m-%d %H:%M:%S"
80+
log_format = "[%(name)s]%(asctime)s %(levelname)-8s [%(module)s][%(funcName)s] TESTING1 %(message)s"
81+
formatter = logging.Formatter(log_format, date_format)
82+
handler = logging.StreamHandler()
83+
handler.setFormatter(formatter)
84+
logger = logging.getLogger(__name__)
85+
logger.setLevel(level=logging.INFO)
86+
logger.addHandler(handler)
87+
88+
logger.warning("Info message 1")
89+
90+
# Prints: 2023-03-08 14:18:59 INFO Info message
91+
92+
from dbldatagen import DataGenerator
93+
94+
logger.info("Info message 2")
95+
96+
for h in logger.handlers:
97+
h.flush()
98+
99+
message1_count = self.get_log_capture_warnings_and_errors(caplog, "Info message 1")
100+
assert message1_count == 1, "Should only have 1 message 1"
101+
102+
message2_count = self.get_log_capture_info(caplog, "Info message 2")
103+
assert message2_count == 1, "Should only have 1 message 2"
104+
105+
106+
def test_logging_operation2(self, setupSpark, caplog):
107+
self.setup_log_capture(caplog)
108+
109+
spark=setupSpark
110+
111+
date_format = "%Y-%m-%d %H:%M:%S"
112+
log_format = "%(asctime)s %(levelname)-8s TESTING2 %(message)s"
113+
formatter1 = logging.Formatter(log_format, date_format)
114+
handler1 = logging.StreamHandler()
115+
handler1.setFormatter(formatter1)
116+
logger2 = logging.getLogger("test1")
117+
logger2.setLevel(level=logging.INFO)
118+
logger2.addHandler(handler1)
119+
120+
logger2.warning("Info message 1")
121+
# Prints: 2023-03-08 14:18:59 INFO Info message
122+
123+
from dbldatagen import DataGenerator
124+
125+
spec = (DataGenerator(sparkSession=spark, name="test_data_set1", rows=10000, seedMethod='hash_fieldname')
126+
.withIdOutput()
127+
.withColumn("r", "float", expr="floor(rand() * 350) * (86400 + 3600)",
128+
numColumns=10)
129+
.withColumn("code1", "int", min=100, max=200)
130+
.withColumn("code2", "int", min=0, max=10)
131+
.withColumn("code3", "string", values=['a', 'b', 'c'])
132+
.withColumn("code4", "string", values=['a', 'b', 'c'], random=True)
133+
.withColumn("code5", "string", values=['a', 'b', 'c'], random=True, weights=[9, 1, 1])
134+
)
135+
136+
df = spec.build()
137+
138+
139+
# Prints: INFO: Version : VersionInfo(major='0', minor='3', patch='1', release='', build='')
140+
logger2.info("Info message 2")
141+
142+
for h in logger2.handlers:
143+
h.flush()
144+
145+
message1_count = self.get_log_capture_warnings_and_errors(caplog, "Info message 1")
146+
assert message1_count == 1, "Should only have 1 message 1"
147+
148+
message2_count = self.get_log_capture_info(caplog, "Info message 2")
149+
assert message2_count == 1, "Should only have 1 message 2"

0 commit comments

Comments
 (0)