Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/main/java/org/polypheny/db/ddl/DdlManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,20 @@ public static DdlManager getInstance() {
*/
public abstract void addForeignKey( CatalogTable catalogTable, CatalogTable refTable, List<String> columnNames, List<String> refColumnNames, String constraintName, ForeignKeyOption onUpdate, ForeignKeyOption onDelete ) throws UnknownColumnException, GenericCatalogException;

/**
* Merge multiple columns into one new column
*
* @param catalogTable the table
* @param sourceColumnNames name of the columns to be merged
* @param newColumnName name of the new column to be added
* @param joinString the string to place between the values
* @param type the SQL data type specification of the merged column
* @param nullable if the merged column should be nullable
* @param defaultValue the new default value of the merged column
* @param statement the initial query statement
*/
public abstract void mergeColumns( CatalogTable catalogTable, List<String> sourceColumnNames, String newColumnName, String joinString, ColumnTypeInformation type, boolean nullable, String defaultValue, Statement statement ) throws UnknownColumnException, ColumnAlreadyExistsException, ColumnNotExistsException;

/**
* Adds an index to a table
*
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/java/org/polypheny/db/processing/DataMigrator.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,26 @@ void copyPartitionData(
List<Long> sourcePartitionIds,
List<Long> targetPartitionIds );

/**
* Used to merge columns in a relational table. The values of the source columns will be selected,
* concatenated and inserted into the target column.
*
* @param transaction Transactional scope
* @param store Target Store where data should be migrated to
* @param sourceColumns Columns to be merged
* @param targetColumn New column to be added
* @param joinString String delimiter between the values to be merged
*/
void mergeColumns( Transaction transaction, CatalogAdapter store, List<CatalogColumn> sourceColumns, CatalogColumn targetColumn, String joinString );

AlgRoot buildInsertStatement( Statement statement, List<CatalogColumnPlacement> to, long partitionId );

//is used within copyData
// is used within copyData
void executeQuery( List<CatalogColumn> columns, AlgRoot sourceRel, Statement sourceStatement, Statement targetStatement, AlgRoot targetRel, boolean isMaterializedView, boolean doesSubstituteOrderBy );

// is used within mergeColumns
void executeMergeQuery( List<CatalogColumn> primaryKeyColumns, List<CatalogColumn> sourceColumns, CatalogColumn targetColumn, String joinString, AlgRoot sourceRel, Statement sourceStatement, Statement targetStatement, AlgRoot targetRel, boolean isMaterializedView, boolean doesSubstituteOrderBy );

AlgRoot buildDeleteStatement( Statement statement, List<CatalogColumnPlacement> to, long partitionId );

AlgRoot getSourceIterator( Statement statement, Map<Long, List<CatalogColumnPlacement>> placementDistribution );
Expand Down
80 changes: 80 additions & 0 deletions dbms/src/main/java/org/polypheny/db/ddl/DdlManagerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -598,6 +599,85 @@ public void addForeignKey( CatalogTable catalogTable, CatalogTable refTable, Lis
}


public void mergeColumns( CatalogTable catalogTable, List<String> sourceColumnNames, String newColumnName, String joinString, ColumnTypeInformation type, boolean nullable, String defaultValue, Statement statement ) throws UnknownColumnException, ColumnAlreadyExistsException, ColumnNotExistsException {
if ( catalog.checkIfExistsColumn( catalogTable.id, newColumnName ) ) {
throw new ColumnAlreadyExistsException( newColumnName, catalogTable.name );
}

CatalogColumn afterColumn = getCatalogColumn( catalogTable.id, sourceColumnNames.get( sourceColumnNames.size() - 1 ) );
int position = updateAdjacentPositions( catalogTable, null, afterColumn );

long columnId = catalog.addColumn(
newColumnName,
catalogTable.id,
position,
type.type,
type.collectionType,
type.precision,
type.scale,
type.dimension,
type.cardinality,
nullable,
Collation.getDefaultCollation()
);

// Add default value
addDefaultValue( defaultValue, columnId );
CatalogColumn addedColumn = catalog.getColumn( columnId );

// Remove quotes from joinString
if ( joinString.startsWith( "'" ) ) {
joinString = joinString.substring( 1, joinString.length() - 1 );
}

// Ask router on which stores this column shall be placed
List<DataStore> stores = RoutingManager.getInstance().getCreatePlacementStrategy().getDataStoresForNewColumn( addedColumn );
DataMigrator dataMigrator = statement.getTransaction().getDataMigrator();

// Build catalog columns
List<CatalogColumn> sourceCatalogColumns = new LinkedList<>();
for ( String columnName : sourceColumnNames ) {
sourceCatalogColumns.add( catalog.getColumn( catalogTable.id, columnName ) );
}
CatalogColumn targetCatalogColumn = catalog.getColumn( catalogTable.id, newColumnName );

// Add column on underlying data stores and insert default value
for ( DataStore store : stores ) {
catalog.addColumnPlacement(
store.getAdapterId(),
addedColumn.id,
PlacementType.AUTOMATIC,
null,
null,
null
);
AdapterManager.getInstance().getStore( store.getAdapterId() ).addColumn( statement.getPrepareContext(), catalogTable, addedColumn );
// Call migrator
dataMigrator.mergeColumns( statement.getTransaction(), catalog.getAdapter( store.getAdapterId() ), sourceCatalogColumns, targetCatalogColumn, joinString );

for ( CatalogColumn sourceCatalogColumn : sourceCatalogColumns ) {
// Delete column from underlying data stores
for ( CatalogColumnPlacement dp : catalog.getColumnPlacementsByColumn( sourceCatalogColumn.id ) ) {
if ( catalogTable.entityType == EntityType.ENTITY ) {
AdapterManager.getInstance().getStore( dp.adapterId ).dropColumn( statement.getPrepareContext(), dp );
}
catalog.deleteColumnPlacement( dp.adapterId, dp.columnId, true );
}

// Delete from catalog
List<CatalogColumn> columns = catalog.getColumns( catalogTable.id );
catalog.deleteColumn( sourceCatalogColumn.id );
if ( sourceCatalogColumn.position != columns.size() ) {
// Update position of the other columns
IntStream.range( sourceCatalogColumn.position, columns.size() ).forEach( i -> catalog.setColumnPosition( columns.get( i ).id, i ) );
}
}
}

// Reset plan cache implementation cache & routing cache
statement.getQueryProcessor().resetCaches();
}

@Override
public void addIndex( CatalogTable catalogTable, String indexMethodName, List<String> columnNames, String indexName, boolean isUnique, DataStore location, Statement statement ) throws UnknownColumnException, UnknownIndexMethodException, GenericCatalogException, UnknownTableException, UnknownUserException, UnknownSchemaException, UnknownKeyException, UnknownDatabaseException, TransactionException, AlterSourceException, IndexExistsException, MissingColumnPlacementException {
List<Long> columnIds = new LinkedList<>();
Expand Down
164 changes: 150 additions & 14 deletions dbms/src/main/java/org/polypheny/db/processing/DataMigratorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,9 @@
package org.polypheny.db.processing;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import java.util.*;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.avatica.MetaImpl;
Expand All @@ -43,11 +36,7 @@
import org.polypheny.db.algebra.logical.lpg.LogicalLpgScan;
import org.polypheny.db.algebra.logical.lpg.LogicalLpgValues;
import org.polypheny.db.algebra.logical.relational.LogicalValues;
import org.polypheny.db.algebra.type.AlgDataTypeFactory;
import org.polypheny.db.algebra.type.AlgDataTypeField;
import org.polypheny.db.algebra.type.AlgDataTypeFieldImpl;
import org.polypheny.db.algebra.type.AlgDataTypeSystem;
import org.polypheny.db.algebra.type.AlgRecordType;
import org.polypheny.db.algebra.type.*;
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.entity.CatalogAdapter;
import org.polypheny.db.catalog.entity.CatalogColumn;
Expand Down Expand Up @@ -314,6 +303,115 @@ public void executeQuery( List<CatalogColumn> selectColumnList, AlgRoot sourceAl
}


@Override
public void executeMergeQuery( List<CatalogColumn> primaryKeyColumns, List<CatalogColumn> sourceColumns, CatalogColumn targetColumn, String joinString, AlgRoot sourceAlg, Statement sourceStatement, Statement targetStatement, AlgRoot targetAlg, boolean isMaterializedView, boolean doesSubstituteOrderBy ) {
try {
PolyImplementation result;
if ( isMaterializedView ) {
result = sourceStatement.getQueryProcessor().prepareQuery(
sourceAlg,
sourceAlg.alg.getCluster().getTypeFactory().builder().build(),
false,
false,
doesSubstituteOrderBy );
} else {
result = sourceStatement.getQueryProcessor().prepareQuery(
sourceAlg,
sourceAlg.alg.getCluster().getTypeFactory().builder().build(),
true,
false,
false );
}
final Enumerable<Object> enumerable = result.enumerable( sourceStatement.getDataContext() );
//noinspection unchecked
Iterator<Object> sourceIterator = enumerable.iterator();

// Get the mappings of the source columns from the Catalog
Map<Long, Integer> sourceColMapping = new LinkedHashMap<>();
for ( CatalogColumn catalogColumn : sourceColumns ) {
int i = 0;
for ( AlgDataTypeField metaData : result.getRowType().getFieldList() ) {
if ( metaData.getName().equalsIgnoreCase( catalogColumn.name ) ) {
sourceColMapping.put( catalogColumn.id, i );
}
i++;
}
}

if ( isMaterializedView ) {
for ( CatalogColumn catalogColumn : sourceColumns ) {
if ( !sourceColMapping.containsKey( catalogColumn.id ) ) {
int i = sourceColMapping.values().stream().mapToInt( v -> v ).max().orElseThrow( NoSuchElementException::new );
sourceColMapping.put( catalogColumn.id, i + 1 );
}
}
}

int batchSize = RuntimeConfig.DATA_MIGRATOR_BATCH_SIZE.getInteger();
int i = 0;
while ( sourceIterator.hasNext() ) {
List<List<Object>> rows = MetaImpl.collect( result.getCursorFactory(), LimitIterator.of( sourceIterator, batchSize ), new ArrayList<>() );
Map<Long, List<Object>> values = new LinkedHashMap<>();

// Read the values of the source columns from all rows
for ( List<Object> list : rows ) {
for ( Map.Entry<Long, Integer> entry : sourceColMapping.entrySet() ) {
if ( !values.containsKey( entry.getKey() ) ) {
values.put( entry.getKey(), new LinkedList<>() );
}
if ( isMaterializedView ) {
if ( entry.getValue() > list.size() - 1 ) {
values.get( entry.getKey() ).add( i );
i++;
} else {
values.get( entry.getKey() ).add( list.get( entry.getValue() ) );
}
} else {
values.get( entry.getKey() ).add( list.get( entry.getValue() ) );
}
}
}

// Combine the source values into a single string
final AlgDataTypeFactory typeFactory = new PolyTypeFactoryImpl( AlgDataTypeSystem.DEFAULT );
List<Object> mergedValueList = null;
for ( Map.Entry<Long, List<Object>> v : values.entrySet() ) {
if ( !primaryKeyColumns.stream().map( c -> c.id ).collect( Collectors.toList() ).contains( v.getKey() ) ) {
if ( mergedValueList == null ) {
mergedValueList = v.getValue();
} else {
int j = 0;
for ( Object value : mergedValueList ) {
mergedValueList.set( j, ((String) value).concat( joinString + v.getValue().get( j++ ) ) );
}
}
}
}
targetStatement.getDataContext().addParameterValues( targetColumn.id, targetColumn.getAlgDataType( typeFactory ), mergedValueList );

// Select the PK columns for the target statement
for ( CatalogColumn primaryKey : primaryKeyColumns ) {
AlgDataType primaryKeyAlgDataType = primaryKey.getAlgDataType( typeFactory );
List<Object> primaryKeyValues = values.get( primaryKey.id );
targetStatement.getDataContext().addParameterValues( primaryKey.id, primaryKeyAlgDataType, primaryKeyValues );
}

Iterator<?> iterator = targetStatement.getQueryProcessor()
.prepareQuery( targetAlg, sourceAlg.validatedRowType, true, false, false )
.enumerable( targetStatement.getDataContext() )
.iterator();
//noinspection WhileLoopReplaceableByForEach
while ( iterator.hasNext() ) {
iterator.next();
}
targetStatement.getDataContext().resetParameterValues();
}
} catch ( Throwable t ) {
throw new RuntimeException( t );
}
}


@Override
public AlgRoot buildDeleteStatement( Statement statement, List<CatalogColumnPlacement> to, long partitionId ) {
List<String> qualifiedTableName = ImmutableList.of(
Expand Down Expand Up @@ -774,4 +872,42 @@ public void copyPartitionData( Transaction transaction, CatalogAdapter store, Ca
}
}


@Override
public void mergeColumns( Transaction transaction, CatalogAdapter store, List<CatalogColumn> sourceColumns, CatalogColumn targetColumn, String joinString ) {
CatalogTable table = Catalog.getInstance().getTable( sourceColumns.get( 0 ).tableId );
CatalogPrimaryKey primaryKey = Catalog.getInstance().getPrimaryKey( table.primaryKey );

List<CatalogColumn> selectColumnList = new LinkedList<>( sourceColumns );
List<CatalogColumn> primaryKeyList = new LinkedList<>();

// Add primary keys to select column list
for ( long cid : primaryKey.columnIds ) {
CatalogColumn catalogColumn = Catalog.getInstance().getColumn( cid );
if ( !selectColumnList.contains( catalogColumn ) ) {
selectColumnList.add( catalogColumn );
}
primaryKeyList.add( catalogColumn );
}

// Get the placements of the source columns
Map<Long, List<CatalogColumnPlacement>> sourceColumnPlacements = new HashMap<>();
sourceColumnPlacements.put(
table.partitionProperty.partitionIds.get( 0 ),
selectSourcePlacements( table, selectColumnList, -1 ) );

// Get the placement of the newly added target column
CatalogColumnPlacement targetColumnPlacement = Catalog.getInstance().getColumnPlacement( store.id, targetColumn.id );
Map<Long, List<CatalogColumnPlacement>> subDistribution = new HashMap<>( sourceColumnPlacements );
subDistribution.keySet().retainAll( Arrays.asList( table.partitionProperty.partitionIds.get( 0 ) ) );

// Initialize statements for the reading and inserting
Statement sourceStatement = transaction.createStatement();
Statement targetStatement = transaction.createStatement();
AlgRoot sourceAlg = getSourceIterator( sourceStatement, subDistribution );
AlgRoot targetAlg = buildUpdateStatement( targetStatement, Collections.singletonList( targetColumnPlacement ), table.partitionProperty.partitionIds.get( 0 ) );

executeMergeQuery( primaryKeyList, selectColumnList, targetColumn, joinString, sourceAlg, sourceStatement, targetStatement, targetAlg, false, false );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ private String getPhysicalColumnName( CatalogColumnPlacement columnPlacement ) {
public void dropColumn( Context context, CatalogColumnPlacement columnPlacement ) {
commitAll();
for ( CatalogPartitionPlacement partitionPlacement : catalog.getPartitionPlacementsByTableOnAdapter( columnPlacement.adapterId, columnPlacement.tableId ) ) {
Document field = new Document().append( partitionPlacement.physicalTableName, 1 );
Document field = new Document().append( getPhysicalColumnName(columnPlacement.physicalColumnName, columnPlacement.columnId), 1 );
Document filter = new Document().append( "$unset", field );

context.getStatement().getTransaction().registerInvolvedAdapter( this );
Expand Down
2 changes: 2 additions & 0 deletions sql-language/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ data: {
"org.polypheny.db.sql.language.ddl.alterschema.SqlAlterSchemaRename"
"org.polypheny.db.sql.language.ddl.altertable.SqlAlterSourceTableAddColumn"
"org.polypheny.db.sql.language.ddl.altertable.SqlAlterTableAddColumn"
"org.polypheny.db.sql.language.ddl.altertable.SqlAlterTableMergeColumns"
"org.polypheny.db.sql.language.ddl.altertable.SqlAlterTableAddForeignKey"
"org.polypheny.db.sql.language.ddl.altertable.SqlAlterTableAddIndex"
"org.polypheny.db.sql.language.ddl.altertable.SqlAlterTableAddPartitions"
Expand Down Expand Up @@ -161,6 +162,7 @@ data: {
"CATALOG_NAME"
"CENTURY"
"CONFIG"
"COLUMNS"
"CHAIN"
"CHARACTER_SET_CATALOG"
"CHARACTER_SET_NAME"
Expand Down
Loading