Skip to content

Commit

Permalink
FEAT-#979: Enable reading from SQL server. (#4279)
Browse files Browse the repository at this point in the history
Co-authored-by: eavidan <eran.avidan@intel.com>
Co-authored-by: Devin Petersohn <devin-petersohn@users.noreply.github.com>
Signed-off-by: mvashishtha <mahesh@ponder.io>
  • Loading branch information
3 people authored Mar 7, 2022
1 parent acd0ff9 commit 2d40797
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 15 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,10 @@ jobs:
env:
MODIN_ENGINE: ${{matrix.engine}}
MODIN_MEMORY: 1000000000
# Only test reading from SQL server on ubuntu for now. Eventually, we
# should test on Windows, too, but we will have to set up the SQL
# server differently.
MODIN_TEST_READ_FROM_SQL_SERVER: true
name: test-ubuntu (engine ${{matrix.engine}}, python ${{matrix.python-version}})
steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -463,6 +467,8 @@ jobs:
- run: python -m pytest -n 2 modin/pandas/test/test_groupby.py
- run: python -m pytest -n 2 modin/pandas/test/test_reshape.py
- run: python -m pytest -n 2 modin/pandas/test/test_general.py
- run: chmod +x ./.github/workflows/sql_server/set_up_sql_server.sh
- run: ./.github/workflows/sql_server/set_up_sql_server.sh
- run: python -m pytest modin/pandas/test/test_io.py --verbose
- run: python -m pytest modin/experimental/pandas/test/test_io_exp.py
- run: pip install "dfsql>=0.4.2" "pyparsing<=2.4.7" && pytest modin/experimental/sql/test/test_sql.py
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ jobs:
env:
MODIN_ENGINE: ${{matrix.engine}}
MODIN_MEMORY: 1000000000
# Only test reading from SQL server on ubuntu for now. Eventually, we
# should test on Windows, too, but we will have to set up the SQL
# server differently.
MODIN_TEST_READ_FROM_SQL_SERVER: true
name: test-ubuntu (engine ${{matrix.engine}}, python ${{matrix.python-version}})
steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -166,6 +170,8 @@ jobs:
- run: python -m pytest -n 2 modin/pandas/test/test_groupby.py
- run: python -m pytest -n 2 modin/pandas/test/test_reshape.py
- run: python -m pytest -n 2 modin/pandas/test/test_general.py
- run: chmod +x ./.github/workflows/sql_server/set_up_sql_server.sh
- run: ./.github/workflows/sql_server/set_up_sql_server.sh
- run: python -m pytest modin/pandas/test/test_io.py
- run: python -m pytest modin/experimental/pandas/test/test_io_exp.py
- uses: codecov/codecov-action@v2
Expand Down
14 changes: 14 additions & 0 deletions .github/workflows/sql_server/set_up_sql_server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# This script sets up a SQL server listening at 0.0.0.0:1234.

# If any step fails, we can't set up a valid SQL server for unit tests.
set -e

# Pull the 2019 SQL server docker container image by following:
# https://docs.microsoft.com/en-us/sql/linux/quickstart-install-connect-docker?view=sql-server-ver15&pivots=cs1-powershell#pullandrun2019
sudo docker pull mcr.microsoft.com/mssql/server:2019-latest
sudo docker run -d --name example_sql_server -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=Strong.Pwd-123' -p 1433:1433 mcr.microsoft.com/mssql/server:2019-latest


# Wait 10 seconds because if we don't the server typically will not be ready
# to accept connections by the time we want to make them.
sleep 10
2 changes: 1 addition & 1 deletion docs/release_notes/release_notes-0.14.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Key Features and Updates
* REFACTOR-#4093: Refactor base to be smaller (#4220)
* REFACTOR-#4047: Rename `cluster` directory to `cloud` in examples (#4212)
* Pandas API implementations and improvements
*
* FEAT-#979: Enable reading from SQL server (#4279)
* OmniSci enhancements
*
* XGBoost enhancements
Expand Down
1 change: 1 addition & 0 deletions environment-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies:
- cloudpickle
- boto3
- scikit-learn
- pymssql
- pip:
- xgboost>=1.3
- modin-spreadsheet>=0.1.1
Expand Down
7 changes: 7 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,13 @@ def get(cls):
return min_partition_size


class TestReadFromSqlServer(EnvironmentVariable, type=bool):
"""Set to true to test reading from SQL server."""

varname = "MODIN_TEST_READ_FROM_SQL_SERVER"
default = False


def _check_vars():
"""
Check validity of environment variables.
Expand Down
20 changes: 9 additions & 11 deletions modin/core/io/sql/sql_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ def _read(cls, sql, con, index_col=None, **kwargs):
BaseQueryCompiler
Query compiler with imported data for further processing.
"""
is_modin_db_connection = isinstance(con, ModinDatabaseConnection)
if not (is_modin_db_connection or isinstance(con, str)):
if isinstance(con, str):
con = ModinDatabaseConnection("sqlalchemy", con)
if not isinstance(con, ModinDatabaseConnection):
warnings.warn(
"To use parallel implementation of `read_sql`, pass either "
+ "the SQL connection string or a ModinDatabaseConnection "
Expand All @@ -63,13 +64,12 @@ def _read(cls, sql, con, index_col=None, **kwargs):
+ "https://modin.readthedocs.io/en/latest/supported_apis/io_supported.html#connecting-to-a-database-for-read-sql"
)
return cls.single_worker_read(sql, con=con, index_col=index_col, **kwargs)
row_cnt_query = "SELECT COUNT(*) FROM ({}) as foo".format(sql)
connection_for_pandas = con.get_connection() if is_modin_db_connection else con
row_cnt = pandas.read_sql(row_cnt_query, connection_for_pandas).squeeze()
row_count_query = con.row_count_query(sql)
connection_for_pandas = con.get_connection()
colum_names_query = con.column_names_query(sql)
row_cnt = pandas.read_sql(row_count_query, connection_for_pandas).squeeze()
cols_names_df = pandas.read_sql(
"SELECT * FROM ({}) as foo LIMIT 0".format(sql),
connection_for_pandas,
index_col=index_col,
colum_names_query, connection_for_pandas, index_col=index_col
)
cols_names = cols_names_df.columns
num_partitions = NPartitions.get()
Expand All @@ -79,9 +79,7 @@ def _read(cls, sql, con, index_col=None, **kwargs):
limit = math.ceil(row_cnt / num_partitions)
for part in range(num_partitions):
offset = part * limit
query = "SELECT * FROM ({}) as foo LIMIT {} OFFSET {}".format(
sql, limit, offset
)
query = con.partition_query(sql, limit, offset)
partition_id = cls.deploy(
cls.parse,
num_partitions + 2,
Expand Down
86 changes: 85 additions & 1 deletion modin/db_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@


class UnsupportedDatabaseException(Exception):
"""Modin can't create a particular kind of database connection."""

pass


Expand All @@ -52,6 +54,30 @@ def __init__(self, lib, *args, **kwargs):
self.lib = lib
self.args = args
self.kwargs = kwargs
self._dialect_is_microsoft_sql_cache = None

def _dialect_is_microsoft_sql(self):
"""
Tell whether this connection requires Microsoft SQL dialect.
If this is a sqlalchemy connection, create an engine from args and
kwargs. If that engine's driver is pymssql or pyodbc, this
connection requires Microsoft SQL. Otherwise, it doesn't.
Returns
-------
Boolean
"""
if self._dialect_is_microsoft_sql_cache is None:
self._dialect_is_microsoft_sql_cache = False
if self.lib == _SQLALCHEMY_LIB_NAME:
from sqlalchemy import create_engine

self._dialect_is_microsoft_sql_cache = create_engine(
*self.args, **self.kwargs
).driver in ("pymssql", "pyodbc")

return self._dialect_is_microsoft_sql_cache

def get_connection(self):
"""
Expand All @@ -76,4 +102,62 @@ def get_connection(self):

return create_engine(*self.args, **self.kwargs).connect()

raise Exception("Unsupported database library")
raise UnsupportedDatabaseException("Unsupported database library")

def column_names_query(self, query):
"""
Get a query that gives the names of columns that `query` would produce.
Parameters
----------
query : str
The SQL query to check.
Returns
-------
str
"""
# This query looks odd, but it works in both PostgreSQL and Microsoft
# SQL, which doesn't let you use a "limit" clause to select 0 rows.
return f"SELECT * FROM ({query}) AS _ WHERE 1 = 0"

def row_count_query(self, query):
"""
Get a query that gives the names of rows that `query` would produce.
Parameters
----------
query : str
The SQL query to check.
Returns
-------
str
"""
return f"SELECT COUNT(*) FROM ({query}) AS _"

def partition_query(self, query, limit, offset):
"""
Get a query that partitions the original `query`.
Parameters
----------
query : str
The SQL query to get a partition.
limit : int
The size of the partition.
offset : int
Where the partition begins.
Returns
-------
str
"""
return (
(
f"SELECT * FROM ({query}) AS _ ORDER BY(SELECT NULL)"
+ f" OFFSET {offset} ROWS FETCH NEXT {limit} ROWS ONLY"
)
if self._dialect_is_microsoft_sql()
else f"SELECT * FROM ({query}) LIMIT {limit} OFFSET {offset}"
)
39 changes: 37 additions & 2 deletions modin/pandas/test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@
from pandas.core.dtypes.common import is_list_like
from pathlib import Path
from collections import OrderedDict
from modin.db_conn import ModinDatabaseConnection, UnsupportedDatabaseException
from modin.config import TestDatasetSize, Engine, StorageFormat, IsExperimental
from modin.db_conn import (
ModinDatabaseConnection,
UnsupportedDatabaseException,
)
from modin.config import (
TestDatasetSize,
Engine,
StorageFormat,
IsExperimental,
TestReadFromSqlServer,
)
from modin.utils import to_pandas
from modin.pandas.utils import from_arrow
from modin.test.test_utils import warns_that_defaulting_to_pandas
Expand Down Expand Up @@ -1834,6 +1843,32 @@ def test_read_sql(self, make_sql_connection):
pandas_df = pandas.read_sql(sql=query, con=sqlalchemy_connection)
df_equals(modin_df, pandas_df)

@pytest.mark.skipif(
not TestReadFromSqlServer.get(),
reason="Skip the test when the test SQL server is not set up.",
)
def test_read_sql_from_sql_server(self):
table_name = "test_1000x256"
query = f"SELECT * FROM {table_name}"
sqlalchemy_connection_string = (
"mssql+pymssql://sa:Strong.Pwd-123@0.0.0.0:1433/master"
)
pandas_df_to_read = pandas.DataFrame(
np.arange(
1000 * 256,
).reshape(1000, 256)
).add_prefix("col")
pandas_df_to_read.to_sql(
table_name, sqlalchemy_connection_string, if_exists="replace"
)
modin_df = pd.read_sql(
query,
ModinDatabaseConnection("sqlalchemy", sqlalchemy_connection_string),
)
pandas_df = pandas.read_sql(query, sqlalchemy_connection_string)
df_equals(modin_df, pandas_df)

def test_invalid_modin_database_connections(self):
with pytest.raises(UnsupportedDatabaseException):
ModinDatabaseConnection("unsupported_database")

Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ git+https://github.com/airspeed-velocity/asv.git@ef016e233cb9a0b19d517135104f49e
xgboost>=1.3
tqdm
modin-spreadsheet>=0.1.1
pymssql

0 comments on commit 2d40797

Please sign in to comment.