diff --git a/Makefile b/Makefile deleted file mode 100644 index 82bf10c..0000000 --- a/Makefile +++ /dev/null @@ -1,35 +0,0 @@ -# mongo_fdw/Makefile -# -# Copyright (c) 2012-2014 Citus Data, Inc. -# - -MODULE_big = mongo_fdw - -# -# We assume we are running on a POSIX compliant system (Linux, OSX). If you are -# on another platform, change env_posix.os in MONGO_OBJS with the appropriate -# environment object file. -# - -MONGO_DRIVER = mongo-c-driver-v0.6 -MONGO_PATH = $(MONGO_DRIVER)/src -MONGO_OBJS = $(MONGO_PATH)/bson.os $(MONGO_PATH)/encoding.os $(MONGO_PATH)/md5.os \ - $(MONGO_PATH)/mongo.os $(MONGO_PATH)/numbers.os $(MONGO_PATH)/env_posix.os - -PG_CPPFLAGS = --std=c99 -I$(MONGO_PATH) -OBJS = connection.o option.o mongo_fdw.o mongo_query.o $(MONGO_OBJS) - -EXTENSION = mongo_fdw -DATA = mongo_fdw--1.0.sql - -$(MONGO_DRIVER)/%.os: - $(MAKE) -C $(MONGO_DRIVER) $*.os - -# -# Users need to specify their Postgres installation path through pg_config. For -# example: /usr/local/pgsql/bin/pg_config or /usr/lib/postgresql/9.1/bin/pg_config -# - -PG_CONFIG = pg_config -PGXS := $(shell $(PG_CONFIG) --pgxs) -include $(PGXS) diff --git a/mongo_fdw.c b/mongo_fdw.c index 75593eb..2bc67b1 100644 --- a/mongo_fdw.c +++ b/mongo_fdw.c @@ -10,12 +10,9 @@ *------------------------------------------------------------------------- */ -#include "postgres.h" -#include "mongo_fdw.h" -#include "mongo_query.h" - #include "postgres.h" #include "bson.h" +#include "mongo_wrapper.h" #include "mongo_fdw.h" #include "mongo_query.h" @@ -68,14 +65,14 @@ /* Local functions forward declarations */ static void MongoGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, - Oid foreignTableId); + Oid foreignTableId); static void MongoGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, - Oid foreignTableId); + Oid foreignTableId); static ForeignScan * MongoGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, - Oid foreignTableId, ForeignPath *bestPath, - List *targetList, List *restrictionClauses); + Oid foreignTableId, ForeignPath *bestPath, + List *targetList, List *restrictionClauses); static void MongoExplainForeignScan(ForeignScanState *scanState, - ExplainState *explainState); + ExplainState *explainState); static void MongoBeginForeignScan(ForeignScanState *scanState, int executorFlags); static TupleTableSlot * MongoIterateForeignScan(ForeignScanState *scanState); static void MongoEndForeignScan(ForeignScanState *scanState); @@ -90,7 +87,7 @@ static TupleTableSlot *MongoExecForeignDelete(EState *estate, TupleTableSlot *slot, TupleTableSlot *planSlot); static void MongoEndForeignModify(EState *estate, - ResultRelInfo *resultRelInfo); + ResultRelInfo *resultRelInfo); static void MongoAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, @@ -103,35 +100,33 @@ static void MongoBeginForeignModify(ModifyTableState *mtstate, int eflags); static TupleTableSlot *MongoExecForeignInsert(EState *estate, - ResultRelInfo *resultRelInfo, - TupleTableSlot *slot, - TupleTableSlot *planSlot); + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); static List *MongoPlanForeignModify(PlannerInfo *root, - ModifyTable *plan, - Index resultRelation, - int subplan_index); + ModifyTable *plan, + Index resultRelation, + int subplan_index); static void MongoExplainForeignModify(ModifyTableState *mtstate, - ResultRelInfo *rinfo, - List *fdw_private, - int subplan_index, - ExplainState *es); + ResultRelInfo *rinfo, List *fdw_private, + int subplan_index, ExplainState *es); /* local functions */ static double ForeignTableDocumentCount(Oid foreignTableId); static HTAB * ColumnMappingHash(Oid foreignTableId, List *columnList); -static void FillTupleSlot(const bson *bsonDocument, const char *bsonDocumentKey, - HTAB *columnMappingHash, Datum *columnValues, - bool *columnNulls); -static bool ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId); -static Datum ColumnValueArray(bson_iterator *bsonIterator, Oid valueTypeId); -static Datum ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, +static void FillTupleSlot(const BSON *bsonDocument, const char *bsonDocumentKey, + HTAB *columnMappingHash, Datum *columnValues, + bool *columnNulls); +static bool ColumnTypesCompatible(BSON_TYPE bsonType, Oid columnTypeId); +static Datum ColumnValueArray(BSON_ITERATOR *bsonIterator, Oid valueTypeId); +static Datum ColumnValue(BSON_ITERATOR *bsonIterator, Oid columnTypeId, int32 columnTypeMod); static void MongoFreeScanState(MongoFdwModifyState *fmstate); static bool MongoAnalyzeForeignTable(Relation relation, - AcquireSampleRowsFunc *acquireSampleRowsFunc, + AcquireSampleRowsFunc *acquireSampleRowsFunc, BlockNumber *totalPageCount); static int MongoAcquireSampleRows(Relation relation, int errorLevel, HeapTuple *sampleRows, int targetRowCount, @@ -151,10 +146,9 @@ extern PGDLLEXPORT void _PG_init(void); memset(y, 'A', sizeof(y));\ y[24] = 0; \ strcpy(y, x); \ - bson_oid_from_string(&z, y);\ + BsonOidFromString(&z, y);\ } while(0); - /* declarations for dynamic loading */ PG_MODULE_MAGIC; @@ -198,7 +192,6 @@ mongo_fdw_handler(PG_FUNCTION_ARGS) /* support for EXPLAIN */ fdwRoutine->ExplainForeignScan = MongoExplainForeignScan; - fdwRoutine->ExplainForeignModify = NULL; fdwRoutine->ExplainForeignModify = MongoExplainForeignModify; /* support for ANALYSE */ @@ -339,7 +332,7 @@ MongoGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId, ForeignScan *foreignScan = NULL; List *foreignPrivateList = NIL; List *opExpressionList = NIL; - bson *queryDocument = NULL; + BSON *queryDocument = NULL; List *columnList = NIL; /* @@ -364,13 +357,13 @@ MongoGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreignTableId, columnList = ColumnList(baserel); /* construct foreign plan with query document and column list */ - foreignPrivateList = list_make2(columnList, restrictionClauses); + foreignPrivateList = list_make2(columnList, opExpressionList); - /* only clean up the query struct, but not its data */ - bson_dispose(queryDocument); + /* only clean up the query struct */ + BsonDestroy(queryDocument); /* create the foreign scan node */ - foreignScan = make_foreignscan(targetList, restrictionClauses, + foreignScan = make_foreignscan(targetList, restrictionClauses, scanRangeTableIndex, NIL, /* no expressions to evaluate */ foreignPrivateList); @@ -435,25 +428,23 @@ MongoExplainForeignModify(ModifyTableState *mtstate, static void MongoBeginForeignScan(ForeignScanState *scanState, int executorFlags) { - mongo *mongoConnection = NULL; - mongo_cursor *mongoCursor = NULL; + MONGO_CONN *mongoConnection = NULL; + MONGO_CURSOR *mongoCursor = NULL; Oid foreignTableId = InvalidOid; List *columnList = NIL; HTAB *columnMappingHash = NULL; char *addressName = NULL; int32 portNumber = 0; - StringInfo namespaceName = NULL; ForeignScan *foreignScan = NULL; List *foreignPrivateList = NIL; - bson *queryDocument = NULL; + BSON *queryDocument = NULL; MongoFdwOptions *mongoFdwOptions = NULL; MongoFdwModifyState *fmstate = NULL; + List *opExpressionList = NIL; /* if Explain with no Analyze, do nothing */ if (executorFlags & EXEC_FLAG_EXPLAIN_ONLY) - { return; - } foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation); mongoFdwOptions = MongoGetOptions(foreignTableId); @@ -467,24 +458,20 @@ MongoBeginForeignScan(ForeignScanState *scanState, int executorFlags) * establish new connection if necessary. */ mongoConnection = GetConnection(addressName, portNumber); + foreignScan = (ForeignScan *) scanState->ss.ps.plan; foreignPrivateList = foreignScan->fdw_private; - Assert(list_length(foreignPrivateList) == 1); + Assert(list_length(foreignPrivateList) == 2); columnList = list_nth(foreignPrivateList, 0); + opExpressionList = list_nth(foreignPrivateList, 1); - queryDocument = QueryDocument(foreignTableId, NIL); + queryDocument = QueryDocument(foreignTableId, opExpressionList); columnMappingHash = ColumnMappingHash(foreignTableId, columnList); - namespaceName = makeStringInfo(); - appendStringInfo(namespaceName, "%s.%s", mongoFdwOptions->databaseName, - mongoFdwOptions->collectionName); - /* create cursor for collection name and set query */ - mongoCursor = mongo_cursor_create(); - mongo_cursor_init(mongoCursor, mongoConnection, namespaceName->data); - mongo_cursor_set_query(mongoCursor, queryDocument); + mongoCursor = MongoCursorCreate(mongoConnection, mongoFdwOptions->databaseName, mongoFdwOptions->collectionName, queryDocument); /* create and set foreign execution state */ fmstate = (MongoFdwModifyState *) palloc0(sizeof(MongoFdwModifyState)); @@ -508,9 +495,8 @@ MongoIterateForeignScan(ForeignScanState *scanState) { MongoFdwModifyState *fmstate = (MongoFdwModifyState *) scanState->fdw_state; TupleTableSlot *tupleSlot = scanState->ss.ss_ScanTupleSlot; - mongo_cursor *mongoCursor = fmstate->mongoCursor; + MONGO_CURSOR *mongoCursor = fmstate->mongoCursor; HTAB *columnMappingHash = fmstate->columnMappingHash; - int32 cursorStatus = MONGO_ERROR; TupleDesc tupleDescriptor = tupleSlot->tts_tupleDescriptor; Datum *columnValues = tupleSlot->tts_values; @@ -529,32 +515,33 @@ MongoIterateForeignScan(ForeignScanState *scanState) memset(columnValues, 0, columnCount * sizeof(Datum)); memset(columnNulls, true, columnCount * sizeof(bool)); - cursorStatus = mongo_cursor_next(mongoCursor); - if (cursorStatus == MONGO_OK) + if (MongoCursorNext(mongoCursor, NULL)) { - const bson *bsonDocument = mongo_cursor_bson(mongoCursor); + const BSON *bsonDocument = MongoCursorBson(mongoCursor); const char *bsonDocumentKey = NULL; /* top level document */ FillTupleSlot(bsonDocument, bsonDocumentKey, - columnMappingHash, columnValues, columnNulls); + columnMappingHash, columnValues, columnNulls); ExecStoreVirtualTuple(tupleSlot); } else { + #ifndef META_DRIVER /* * The following is a courtesy check. In practice when Mongo shuts down, * mongo_cursor_next() could possibly crash. This function first frees - * cursor->reply, and then references reply in mongo_cursor_destroy(). + * cursor->reply, and then references reply in mongo_cursor__destroy(). */ + mongo_cursor_error_t errorCode = mongoCursor->err; if (errorCode != MONGO_CURSOR_EXHAUSTED) { MongoFreeScanState(fmstate); - ereport(ERROR, (errmsg("could not iterate over mongo collection"), - errhint("Mongo driver cursor error code: %d", errorCode))); + errhint("Mongo driver cursor error code: %d", errorCode))); } + #endif } return tupleSlot; @@ -573,6 +560,11 @@ MongoEndForeignScan(ForeignScanState *scanState) /* if we executed a query, reclaim mongo related resources */ if (fmstate != NULL) { + if (fmstate->mongoFdwOptions) + { + MongoFreeOptions(fmstate->mongoFdwOptions); + fmstate->mongoFdwOptions = NULL; + } MongoFreeScanState(fmstate); } } @@ -587,31 +579,23 @@ static void MongoReScanForeignScan(ForeignScanState *scanState) { MongoFdwModifyState *fmstate = (MongoFdwModifyState *) scanState->fdw_state; - mongo *mongoConnection = fmstate->mongoConnection; + MONGO_CONN *mongoConnection = fmstate->mongoConnection; MongoFdwOptions *mongoFdwOptions = NULL; - mongo_cursor *mongoCursor = NULL; - StringInfo namespaceName = NULL; Oid foreignTableId = InvalidOid; /* close down the old cursor */ - mongo_cursor_destroy(fmstate->mongoCursor); - mongo_cursor_dispose(fmstate->mongoCursor); + MongoCursorDestroy(fmstate->mongoCursor); /* reconstruct full collection name */ foreignTableId = RelationGetRelid(scanState->ss.ss_currentRelation); mongoFdwOptions = MongoGetOptions(foreignTableId); - namespaceName = makeStringInfo(); - appendStringInfo(namespaceName, "%s.%s", mongoFdwOptions->databaseName, - mongoFdwOptions->collectionName); - - MongoFreeOptions(mongoFdwOptions); - /* reconstruct cursor for collection name and set query */ - mongoCursor = mongo_cursor_create(); - mongo_cursor_init(mongoCursor, mongoConnection, namespaceName->data); - mongo_cursor_set_query(mongoCursor, fmstate->queryDocument); - fmstate->mongoCursor = mongoCursor; + fmstate->mongoCursor = MongoCursorCreate(mongoConnection, + fmstate->mongoFdwOptions->databaseName, + fmstate->mongoFdwOptions->collectionName, + fmstate->queryDocument); + MongoFreeOptions(mongoFdwOptions); } static List * @@ -623,7 +607,7 @@ MongoPlanForeignModify(PlannerInfo *root, CmdType operation = plan->operation; RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); Relation rel; - List *targetAttrs = NIL; + List* targetAttrs = NIL; /* * Core code already has some lock on each rel being planned, so we can @@ -646,7 +630,7 @@ MongoPlanForeignModify(PlannerInfo *root, } else if (operation == CMD_UPDATE) { - Bitmapset *tmpset = bms_copy(rte->modifiedCols); + Bitmapset *tmpset = bms_copy(rte->modifiedCols); AttrNumber col; while ((col = bms_first_member(tmpset)) >= 0) @@ -654,7 +638,6 @@ MongoPlanForeignModify(PlannerInfo *root, col += FirstLowInvalidHeapAttributeNumber; if (col <= InvalidAttrNumber) /* shouldn't happen */ elog(ERROR, "system-column update is not supported"); - /* * We also disallow updates to the first column which * happens to be the row identifier in MongoDb (_id) @@ -684,8 +667,7 @@ MongoPlanForeignModify(PlannerInfo *root, /* - * MongoBeginForeignModify - * Begin an insert/update/delete operation on a foreign table + * Begin an insert/update/delete operation on a foreign table */ static void MongoBeginForeignModify(ModifyTableState *mtstate, @@ -742,8 +724,7 @@ MongoBeginForeignModify(ModifyTableState *mtstate, /* - * MongoExecForeignInsert - * Insert one row into a foreign table + * Insert one row into a foreign table. */ static TupleTableSlot * MongoExecForeignInsert(EState *estate, @@ -752,9 +733,9 @@ MongoExecForeignInsert(EState *estate, TupleTableSlot *planSlot) { MongoFdwOptions *options = NULL; - mongo *mongoConnection = NULL; + MONGO_CONN *mongoConnection = NULL; Oid foreignTableId = InvalidOid; - bson *b = NULL; + BSON *b = NULL; bson_oid_t bsonObjectId; char *outputString; Oid outputFunctionId = InvalidOid; @@ -762,7 +743,7 @@ MongoExecForeignInsert(EState *estate, Oid typoid = InvalidOid; Datum value; bool isnull = false; - char qualname[255]; + MongoFdwModifyState *fmstate = (MongoFdwModifyState *) resultRelInfo->ri_FdwState; @@ -773,8 +754,7 @@ MongoExecForeignInsert(EState *estate, mongoConnection = GetConnection(options->addressName, options->portNumber); - b = bson_create(); - bson_init(b); + b = BsonCreate(); typoid = get_atttype(foreignTableId, 1); @@ -787,12 +767,13 @@ MongoExecForeignInsert(EState *estate, { int attnum = lfirst_int(lc); value = slot_getattr(slot, attnum, &isnull); - /* - * We also disallow null values to the first column which - * happens to be the row identifier in MongoDb (_id). - */ if (attnum == 1) { + /* + * We also disallow null values to the first column which + * happens to be the row identifier in MongoDb (_id). + */ + if (isnull) elog(ERROR, "null value for first column (row identifier column) is not supported"); @@ -805,46 +786,40 @@ MongoExecForeignInsert(EState *estate, UNIQUE_OID(outputString, bsonObjectId); - if(bson_append_oid(b, "_id", &bsonObjectId) == MONGO_ERROR) + /* Append rowid field which is "_id" in MongoDB */ + if(!BsonAppendOid(b, "_id", &bsonObjectId)) ereport(ERROR, (errcode(ERRCODE_FDW_ERROR), errmsg("insert failed, invalid value"))); } - if(AppenMongoValue(b, slot->tts_tupleDescriptor->attrs[attnum - 1]->attname.data, value, - isnull, slot->tts_tupleDescriptor->attrs[attnum -1]->atttypid) == MONGO_ERROR) - ereport(ERROR, (errmsg("failed to update row %d", slot->tts_tupleDescriptor->attrs[attnum -1]->atttypid), - errhint("Mongo driver insert error: %d", mongoConnection->err))); + AppenMongoValue(b, slot->tts_tupleDescriptor->attrs[attnum - 1]->attname.data, value, + isnull, slot->tts_tupleDescriptor->attrs[attnum -1]->atttypid); } } - bson_finish(b); - - sprintf(qualname,"%s.%s", options->databaseName, options->collectionName); + BsonFinish(b); - if (mongo_insert(mongoConnection, qualname, b , NULL) != MONGO_OK) - ereport(ERROR, - (errcode(ERRCODE_FDW_ERROR), - errmsg("insert failed"), - errhint("Mongo driver insert error: %d", mongoConnection->err))); + /* Now we are ready to insert tuple / document into MongoDB */ + MongoInsert(mongoConnection, options->databaseName, options->collectionName, b); - bson_destroy(b); - bson_dispose(b); + BsonDestroy(b); return slot; } /* - * MongoAddForeignUpdateTargets - * Add column(s) needed for update/delete on a foreign table + * Add column(s) needed for update/delete on a foreign table, we are using + * first column as row identification column, so we are adding that into target + * list. */ static void MongoAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation) { - Var *var; - const char *attrname; - TargetEntry *tle; + Var *var = NULL; + const char *attrname = NULL; + TargetEntry *tle = NULL; /* * What we need is the rowid which is the first column @@ -873,10 +848,6 @@ MongoAddForeignUpdateTargets(Query *parsetree, } -/* - * postgresExecForeignUpdate - * Update one row in a foreign table - */ static TupleTableSlot * MongoExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, @@ -884,15 +855,15 @@ MongoExecForeignUpdate(EState *estate, TupleTableSlot *planSlot) { MongoFdwOptions *options = NULL; - mongo *mongoConnection = NULL; + MONGO_CONN *mongoConnection = NULL; Datum datum = 0; bool isNull = false; Oid foreignTableId = InvalidOid; char *columnName = NULL; Oid typoid = InvalidOid; - bson *b = NULL; - bson *op = NULL; - char qualname[255]; + BSON *b = NULL; + BSON *op = NULL; + BSON set; MongoFdwModifyState *fmstate = (MongoFdwModifyState *) resultRelInfo->ri_FdwState; @@ -910,12 +881,10 @@ MongoExecForeignUpdate(EState *estate, typoid = get_atttype(foreignTableId, 1); - b = bson_create(); - bson_init(b); - - bson_append_start_object(b, "$set"); + b = BsonCreate(); + BsonAppendStartObject(b, "$set", &set); - /* Get following parameters from slot, and append to the bson object */ + /* get following parameters from slot */ if (slot != NULL && fmstate->target_attrs != NIL) { ListCell *lc; @@ -927,50 +896,36 @@ MongoExecForeignUpdate(EState *estate, bool isnull; value = slot_getattr(slot, attnum, &isnull); +#ifdef META_DRIVER + AppenMongoValue(&set, slot->tts_tupleDescriptor->attrs[attnum - 1]->attname.data, value, + isnull ? true : false, slot->tts_tupleDescriptor->attrs[attnum - 1]->atttypid); +#else AppenMongoValue(b, slot->tts_tupleDescriptor->attrs[attnum - 1]->attname.data, value, isnull ? true : false, slot->tts_tupleDescriptor->attrs[attnum - 1]->atttypid); +#endif } } - bson_append_finish_object(b); - bson_finish(b); + BsonAppendFinishObject(b, &set); + BsonFinish(b); - op = bson_create(); - bson_init(op); - - /* Append where clause in bson object for particular rowid */ - if (AppenMongoValue(op, columnName, datum, false, typoid) == MONGO_ERROR) + op = BsonCreate(); + if (!AppenMongoValue(op, columnName, datum, false, typoid)) { - bson_destroy(b); - bson_dispose(b); - - bson_destroy(op); - bson_dispose(op); - - ereport(ERROR, (errmsg("failed to update row"), - errhint("Mongo driver update error: %d", mongoConnection->err))); - + BsonDestroy(b); return NULL; } - bson_finish(op); - - sprintf(qualname,"%s.%s", options->databaseName, options->collectionName); + BsonFinish(op); /* We are ready to update the row into MongoDB */ - if (mongo_update(mongoConnection, qualname, op, b, MONGO_UPDATE_BASIC, 0) == MONGO_ERROR) - ereport(ERROR, (errmsg("failed to update row"), - errhint("Mongo driver update error: %d", mongoConnection->err))); + MongoUpdate(mongoConnection, options->databaseName, options->collectionName, op, b); - bson_destroy(b); - bson_dispose(b); - - bson_destroy(op); - bson_dispose(op); + BsonDestroy(op); + BsonDestroy(b); /* Return NULL if nothing was updated on the remote end */ return slot; } - /* * MongoExecForeignDelete * Delete one row from a foreign table @@ -982,14 +937,13 @@ MongoExecForeignDelete(EState *estate, TupleTableSlot *planSlot) { MongoFdwOptions *options = NULL; - mongo *mongoConnection = NULL; + MONGO_CONN *mongoConnection = NULL; Datum datum = 0; bool isNull = false; Oid foreignTableId = InvalidOid; char *columnName = NULL; Oid typoid = InvalidOid; - bson *b = NULL; - char qualname[255]; + BSON *b = NULL; MongoFdwModifyState *fmstate = (MongoFdwModifyState *) resultRelInfo->ri_FdwState; @@ -1007,35 +961,23 @@ MongoExecForeignDelete(EState *estate, typoid = get_atttype(foreignTableId, 1); - b = bson_create(); - bson_init(b); - - if (AppenMongoValue(b, columnName, datum, false, typoid) == MONGO_ERROR) + b = BsonCreate(); + if (!AppenMongoValue(b,columnName, datum, false, typoid)) { - bson_destroy(b); - bson_dispose(b); - ereport(ERROR, (errmsg("failed to delete row"), - errhint("Mongo driver delete error: %d", mongoConnection->err))); - + BsonDestroy(b); return NULL; } - bson_finish(b); - - sprintf(qualname,"%s.%s", options->databaseName, options->collectionName); + BsonFinish(b); /* Now we are ready to delete a single document from MongoDB */ - if (mongo_remove(mongoConnection, qualname, b , NULL) != MONGO_OK) - ereport(ERROR, (errmsg("failed to delete row"), - errhint("Mongo driver delete error: %d", mongoConnection->err))); + MongoDelete(mongoConnection, options->databaseName, options->collectionName, b); - bson_destroy(b); - bson_dispose(b); + BsonDestroy(b); /* Return NULL if nothing was updated on the remote end */ return slot; } - /* * MongoEndForeignModify * Finish an insert/update/delete operation on a foreign table @@ -1043,10 +985,19 @@ MongoExecForeignDelete(EState *estate, static void MongoEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo) { - + MongoFdwModifyState *fmstate = (MongoFdwModifyState *) resultRelInfo->ri_FdwState; + if (fmstate) + { + if (fmstate->mongoFdwOptions) + { + MongoFreeOptions(fmstate->mongoFdwOptions); + fmstate->mongoFdwOptions = NULL; + } + MongoFreeScanState(fmstate); + pfree(fmstate); + } } - /* * ForeignTableDocumentCount connects to the MongoDB server, and queries it for * the number of documents in the foreign collection. On success, the function @@ -1056,16 +1007,16 @@ static double ForeignTableDocumentCount(Oid foreignTableId) { MongoFdwOptions *options = NULL; - mongo *mongoConnection = NULL; - const bson *emptyQuery = NULL; + MONGO_CONN *mongoConnection = NULL; + const BSON *emptyQuery = NULL; double documentCount = 0.0; /* resolve foreign table options; and connect to mongo server */ options = MongoGetOptions(foreignTableId); mongoConnection = GetConnection(options->addressName, options->portNumber); - documentCount = mongo_count(mongoConnection, options->databaseName, - options->collectionName, emptyQuery); + + MongoAggregateCount(mongoConnection, options->databaseName, options->collectionName, emptyQuery); MongoFreeOptions(options); @@ -1133,16 +1084,16 @@ ColumnMappingHash(Oid foreignTableId, List *columnList) * passed as NULL. */ static void -FillTupleSlot(const bson *bsonDocument, const char *bsonDocumentKey, +FillTupleSlot(const BSON *bsonDocument, const char *bsonDocumentKey, HTAB *columnMappingHash, Datum *columnValues, bool *columnNulls) { - bson_iterator bsonIterator = { NULL, 0 }; - bson_iterator_init(&bsonIterator, bsonDocument); + BSON_ITERATOR bsonIterator = { NULL, 0 }; + BsonIterInit(&bsonIterator, (BSON*)bsonDocument); - while (bson_iterator_next(&bsonIterator)) + while (BsonIterNext(&bsonIterator)) { - const char *bsonKey = bson_iterator_key(&bsonIterator); - bson_type bsonType = bson_iterator_type(&bsonIterator); + const char *bsonKey = BsonIterKey(&bsonIterator); + BSON_TYPE bsonType = BsonIterType(&bsonIterator); ColumnMapping *columnMapping = NULL; Oid columnTypeId = InvalidOid; @@ -1168,22 +1119,22 @@ FillTupleSlot(const bson *bsonDocument, const char *bsonDocumentKey, } /* recurse into nested objects */ - if (bsonType == BSON_OBJECT) + if (bsonType == BSON_TYPE_DOCUMENT) { - bson subObject; - bson_iterator_subobject(&bsonIterator, &subObject); + BSON subObject; + BsonIterSubObject(&bsonIterator, &subObject); FillTupleSlot(&subObject, bsonFullKey, - columnMappingHash, columnValues, columnNulls); + columnMappingHash, columnValues, columnNulls); continue; } - /* look up the corresponding column for this bson key */ + /* look up the corresponding column for this BSON key */ hashKey = (void *) bsonFullKey; columnMapping = (ColumnMapping *) hash_search(columnMappingHash, hashKey, HASH_FIND, &handleFound); - /* if no corresponding column or null bson value, continue */ - if (columnMapping == NULL || bsonType == BSON_NULL) + /* if no corresponding column or null BSON value, continue */ + if (columnMapping == NULL || bsonType == BSON_TYPE_NULL) { continue; } @@ -1192,7 +1143,7 @@ FillTupleSlot(const bson *bsonDocument, const char *bsonDocumentKey, columnTypeId = columnMapping->columnTypeId; columnArrayTypeId = columnMapping->columnArrayTypeId; - if (OidIsValid(columnArrayTypeId) && bsonType == BSON_ARRAY) + if (OidIsValid(columnArrayTypeId) && bsonType == BSON_TYPE_ARRAY) { compatibleTypes = true; } @@ -1235,7 +1186,7 @@ FillTupleSlot(const bson *bsonDocument, const char *bsonDocumentKey, * internal conversions applied by BSON APIs. */ static bool -ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) +ColumnTypesCompatible(BSON_TYPE bsonType, Oid columnTypeId) { bool compatibleTypes = false; @@ -1246,8 +1197,8 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) case INT8OID: case FLOAT4OID: case FLOAT8OID: case NUMERICOID: { - if (bsonType == BSON_INT || bsonType == BSON_LONG || - bsonType == BSON_DOUBLE) + if (bsonType == BSON_TYPE_INT32 || bsonType == BSON_TYPE_INT64 || + bsonType == BSON_TYPE_DOUBLE) { compatibleTypes = true; } @@ -1255,8 +1206,8 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) } case BOOLOID: { - if (bsonType == BSON_INT || bsonType == BSON_LONG || - bsonType == BSON_DOUBLE || bsonType == BSON_BOOL) + if (bsonType == BSON_TYPE_INT32 || bsonType == BSON_TYPE_INT64 || + bsonType == BSON_TYPE_DOUBLE || bsonType == BSON_TYPE_BOOL) { compatibleTypes = true; } @@ -1266,7 +1217,7 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) case VARCHAROID: case TEXTOID: { - if (bsonType == BSON_STRING) + if (bsonType == BSON_TYPE_UTF8) { compatibleTypes = true; } @@ -1279,7 +1230,7 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) * object identifier. We can safely overload this 64-byte data type * since it's reserved for internal use in PostgreSQL. */ - if (bsonType == BSON_OID) + if (bsonType == BSON_TYPE_OID) { compatibleTypes = true; } @@ -1289,7 +1240,7 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) case TIMESTAMPOID: case TIMESTAMPTZOID: { - if (bsonType == BSON_DATE) + if (bsonType == BSON_TYPE_DATE_TIME) { compatibleTypes = true; } @@ -1303,7 +1254,7 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) * such as money or inet, do not have equivalents in MongoDB. */ ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_DATA_TYPE), - errmsg("cannot convert bson type to column type"), + errmsg("cannot convert BSON type to column type"), errhint("Column type: %u", (uint32) columnTypeId))); break; } @@ -1320,7 +1271,7 @@ ColumnTypesCompatible(bson_type bsonType, Oid columnTypeId) * datum from element datums, and returns the array datum. */ static Datum -ColumnValueArray(bson_iterator *bsonIterator, Oid valueTypeId) +ColumnValueArray(BSON_ITERATOR *bsonIterator, Oid valueTypeId) { Datum *columnValueArray = palloc0(INITIAL_ARRAY_CAPACITY * sizeof(Datum)); uint32 arrayCapacity = INITIAL_ARRAY_CAPACITY; @@ -1333,16 +1284,15 @@ ColumnValueArray(bson_iterator *bsonIterator, Oid valueTypeId) char typeAlignment = 0; int16 typeLength = 0; - bson_iterator bsonSubIterator = { NULL, 0 }; - bson_iterator_subiterator(bsonIterator, &bsonSubIterator); - - while (bson_iterator_next(&bsonSubIterator)) + BSON_ITERATOR bsonSubIterator = { NULL, 0 }; + BsonIterSubIter(bsonIterator, &bsonSubIterator); + while (BsonIterNext(&bsonSubIterator)) { - bson_type bsonType = bson_iterator_type(&bsonSubIterator); + BSON_TYPE bsonType = BsonIterType(&bsonSubIterator); bool compatibleTypes = false; compatibleTypes = ColumnTypesCompatible(bsonType, valueTypeId); - if (bsonType == BSON_NULL || !compatibleTypes) + if (bsonType == BSON_TYPE_NULL || !compatibleTypes) { continue; } @@ -1373,7 +1323,7 @@ ColumnValueArray(bson_iterator *bsonIterator, Oid valueTypeId) * datum. The function then returns this datum. */ static Datum -ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) +ColumnValue(BSON_ITERATOR *bsonIterator, Oid columnTypeId, int32 columnTypeMod) { Datum columnValue = 0; @@ -1381,37 +1331,37 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) { case INT2OID: { - int16 value = (int16) bson_iterator_int(bsonIterator); + int16 value = (int16) BsonIterInt32(bsonIterator); columnValue = Int16GetDatum(value); break; } case INT4OID: { - int32 value = bson_iterator_int(bsonIterator); + int32 value = BsonIterInt32(bsonIterator); columnValue = Int32GetDatum(value); break; } case INT8OID: { - int64 value = bson_iterator_long(bsonIterator); + int64 value = BsonIterInt64(bsonIterator); columnValue = Int64GetDatum(value); break; } case FLOAT4OID: { - float4 value = (float4) bson_iterator_double(bsonIterator); + float4 value = (float4) BsonIterDouble(bsonIterator); columnValue = Float4GetDatum(value); break; } case FLOAT8OID: { - float8 value = bson_iterator_double(bsonIterator); + float8 value = BsonIterDouble(bsonIterator); columnValue = Float8GetDatum(value); break; } case NUMERICOID: { - float8 value = bson_iterator_double(bsonIterator); + float8 value = BsonIterDouble(bsonIterator); Datum valueDatum = Float8GetDatum(value); /* overlook type modifiers for numeric */ @@ -1420,13 +1370,13 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) } case BOOLOID: { - bool value = bson_iterator_bool(bsonIterator); + bool value = BsonIterBool(bsonIterator); columnValue = BoolGetDatum(value); break; } case BPCHAROID: { - const char *value = bson_iterator_string(bsonIterator); + const char *value = BsonIterString(bsonIterator); Datum valueDatum = CStringGetDatum(value); columnValue = DirectFunctionCall3(bpcharin, valueDatum, @@ -1436,7 +1386,7 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) } case VARCHAROID: { - const char *value = bson_iterator_string(bsonIterator); + const char *value = BsonIterString(bsonIterator); Datum valueDatum = CStringGetDatum(value); columnValue = DirectFunctionCall3(varcharin, valueDatum, @@ -1446,16 +1396,16 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) } case TEXTOID: { - const char *value = bson_iterator_string(bsonIterator); + const char *value = BsonIterString(bsonIterator); columnValue = CStringGetTextDatum(value); break; } - case NAMEOID: + case NAMEOID: { char value[NAMEDATALEN]; Datum valueDatum = 0; - bson_oid_t *bsonObjectId = bson_iterator_oid(bsonIterator); + bson_oid_t *bsonObjectId = (bson_oid_t*) BsonIterOid(bsonIterator); bson_oid_to_string(bsonObjectId, value); valueDatum = CStringGetDatum(value); @@ -1466,7 +1416,7 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) } case DATEOID: { - int64 valueMillis = bson_iterator_date(bsonIterator); + int64 valueMillis = BsonIterDate(bsonIterator); int64 timestamp = (valueMillis * 1000L) - POSTGRES_TO_UNIX_EPOCH_USECS; Datum timestampDatum = TimestampGetDatum(timestamp); @@ -1476,7 +1426,7 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) case TIMESTAMPOID: case TIMESTAMPTZOID: { - int64 valueMillis = bson_iterator_date(bsonIterator); + int64 valueMillis = BsonIterDate(bsonIterator); int64 timestamp = (valueMillis * 1000L) - POSTGRES_TO_UNIX_EPOCH_USECS; /* overlook type modifiers for timestamp */ @@ -1486,7 +1436,7 @@ ColumnValue(bson_iterator *bsonIterator, Oid columnTypeId, int32 columnTypeMod) default: { ereport(ERROR, (errcode(ERRCODE_FDW_INVALID_DATA_TYPE), - errmsg("cannot convert bson type to column type"), + errmsg("cannot convert BSON type to column type"), errhint("Column type: %u", (uint32) columnTypeId))); break; } @@ -1504,15 +1454,20 @@ static void MongoFreeScanState(MongoFdwModifyState *fmstate) { if (fmstate == NULL) - { return; + + if (fmstate->queryDocument) + { + BsonDestroy(fmstate->queryDocument); + fmstate->queryDocument = NULL; } - bson_destroy(fmstate->queryDocument); - bson_dispose(fmstate->queryDocument); + if (fmstate->mongoCursor) + { + MongoCursorDestroy(fmstate->mongoCursor); + fmstate->mongoCursor = NULL; + } - mongo_cursor_destroy(fmstate->mongoCursor); - mongo_cursor_dispose(fmstate->mongoCursor); /* Release remote connection */ ReleaseConnection(fmstate->mongoConnection); } @@ -1535,6 +1490,7 @@ MongoAnalyzeForeignTable(Relation relation, double foreignTableSize = 0; foreignTableId = RelationGetRelid(relation); + documentCount = ForeignTableDocumentCount(foreignTableId); if (documentCount > 0.0) @@ -1597,8 +1553,8 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, AttrNumber columnCount = 0; AttrNumber columnId = 0; HTAB *columnMappingHash = NULL; - mongo_cursor *mongoCursor = NULL; - bson *queryDocument = NULL; + MONGO_CURSOR *mongoCursor = NULL; + BSON *queryDocument = NULL; List *columnList = NIL; ForeignScanState *scanState = NULL; List *foreignPrivateList = NIL; @@ -1635,7 +1591,7 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, foreignPrivateList = list_make1(columnList); /* only clean up the query struct, but not its data */ - bson_dispose(queryDocument); + BsonDestroy(queryDocument); foreignScan = makeNode(ForeignScan); foreignScan->fdw_private = foreignPrivateList; @@ -1666,8 +1622,6 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, for (;;) { - int32 cursorStatus = MONGO_ERROR; - /* check for user-requested abort or sleep */ vacuum_delay_point(); @@ -1675,10 +1629,9 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, memset(columnValues, 0, columnCount * sizeof(Datum)); memset(columnNulls, true, columnCount * sizeof(bool)); - cursorStatus = mongo_cursor_next(mongoCursor); - if (cursorStatus == MONGO_OK) + if(MongoCursorNext(mongoCursor, NULL)) { - const bson *bsonDocument = mongo_cursor_bson(mongoCursor); + const BSON *bsonDocument = MongoCursorBson(mongoCursor); const char *bsonDocumentKey = NULL; /* top level document */ /* fetch next tuple */ @@ -1686,25 +1639,27 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, MemoryContextSwitchTo(tupleContext); FillTupleSlot(bsonDocument, bsonDocumentKey, - columnMappingHash, columnValues, columnNulls); + columnMappingHash, columnValues, columnNulls); MemoryContextSwitchTo(oldContext); } else { + #ifndef META_DRIVER /* * The following is a courtesy check. In practice when Mongo shuts down, - * mongo_cursor_next() could possibly crash. + * mongo_cursor__next() could possibly crash. */ mongo_cursor_error_t errorCode = mongoCursor->err; + if (errorCode != MONGO_CURSOR_EXHAUSTED) { MongoFreeScanState(fmstate); - ereport(ERROR, (errmsg("could not iterate over mongo collection"), + ereport(ERROR, (errmsg("could not iterate over mongo 11collection"), errhint("Mongo driver cursor error code: %d", errorCode))); } - + #endif break; } @@ -1745,8 +1700,8 @@ MongoAcquireSampleRows(Relation relation, int errorLevel, heap_freetuple(sampleRows[rowIndex]); sampleRows[rowIndex] = heap_form_tuple(tupleDescriptor, - columnValues, - columnNulls); + columnValues, + columnNulls); } rowCountToSkip -= 1; diff --git a/mongo_fdw.h b/mongo_fdw.h index 8be4d10..f72af94 100644 --- a/mongo_fdw.h +++ b/mongo_fdw.h @@ -12,11 +12,16 @@ #ifndef MONGO_FDW_H #define MONGO_FDW_H +#include "config.h" +#include "mongo_wrapper.h" #include "bson.h" -#include "mongo.h" -#include "postgres.h" -#include "utils/hsearch.h" +#ifdef META_DRIVER + #include "mongoc.h" +#else + #include "mongo.h" +#endif + #include "fmgr.h" #include "catalog/pg_foreign_server.h" #include "catalog/pg_foreign_table.h" @@ -45,7 +50,29 @@ #include "utils/rel.h" #include "utils/memutils.h" - +#ifdef META_DRIVER + #define BSON bson_t + #define BSON_TYPE bson_type_t + #define BSON_ITERATOR bson_iter_t + #define MONGO_CONN mongoc_client_t + #define MONGO_CURSOR mongoc_cursor_t +#else + #define BSON bson + #define BSON_TYPE bson_type + #define BSON_ITERATOR bson_iterator + #define MONGO_CONN mongo + #define MONGO_CURSOR mongo_cursor + #define BSON_TYPE_DOCUMENT BSON_OBJECT + #define BSON_TYPE_NULL BSON_NULL + #define BSON_TYPE_ARRAY BSON_ARRAY + #define BSON_TYPE_INT32 BSON_INT + #define BSON_TYPE_INT64 BSON_LONG + #define BSON_TYPE_DOUBLE BSON_DOUBLE + #define BSON_TYPE_BOOL BSON_BOOL + #define BSON_TYPE_UTF8 BSON_STRING + #define BSON_TYPE_OID BSON_OID + #define BSON_TYPE_DATE_TIME BSON_DATE +#endif /* Defines for valid option names */ #define OPTION_NAME_ADDRESS "address" @@ -105,37 +132,35 @@ typedef struct MongoFdwOptions int32 portNumber; char *databaseName; char *collectionName; - } MongoFdwOptions; /* -* MongoFdwExecState keeps foreign data wrapper specific execution state that we -* create and hold onto when executing the query. -*/ + * MongoFdwExecState keeps foreign data wrapper specific execution state that we + * create and hold onto when executing the query. + */ /* -* Execution state of a foreign insert/update/delete operation. -*/ + * Execution state of a foreign insert/update/delete operation. + */ typedef struct MongoFdwModifyState { - Relation rel; /* relcache entry for the foreign table */ - - List *target_attrs; /* list of target attribute numbers */ + Relation rel; /* relcache entry for the foreign table */ + List *target_attrs; /* list of target attribute numbers */ /* info about parameters for prepared statement */ - int p_nums; /* number of parameters to transmit */ - FmgrInfo *p_flinfo; /* output conversion functions for them */ + int p_nums; /* number of parameters to transmit */ + FmgrInfo *p_flinfo; /* output conversion functions for them */ - struct HTAB *columnMappingHash; + struct HTAB *columnMappingHash; - mongo *mongoConnection; /* MongoDB connection */ - mongo_cursor *mongoCursor; /* MongoDB cursor */ - bson *queryDocument; /* Bson Document */ + MONGO_CONN *mongoConnection; /* MongoDB connection */ + MONGO_CURSOR *mongoCursor; /* MongoDB cursor */ + BSON *queryDocument; /* Bson Document */ - MongoFdwOptions *mongoFdwOptions; + MongoFdwOptions *mongoFdwOptions; /* working memory context */ - MemoryContext temp_cxt; /* context for per-tuple temporary data */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ } MongoFdwModifyState; @@ -155,19 +180,23 @@ typedef struct ColumnMapping } ColumnMapping; +extern MONGO_CONN *GetConnection(char *host, int32 port); +extern void cleanup_connection(void); +extern void ReleaseConnection(MONGO_CONN* conn); + +extern StringInfo OptionNamesString(Oid currentContextId); + /* Function declarations related to creating the mongo query */ extern List * ApplicableOpExpressionList(RelOptInfo *baserel); -extern bson * QueryDocument(Oid relationId, List *opExpressionList); +extern BSON * QueryDocument(Oid relationId, List *opExpressionList); extern List * ColumnList(RelOptInfo *baserel); +extern MongoFdwOptions * MongoGetOptions(Oid foreignTableId); +extern void MongoFreeOptions(MongoFdwOptions *mongoFdwOptions); + /* Function declarations for foreign data wrapper */ extern Datum mongo_fdw_handler(PG_FUNCTION_ARGS); extern Datum mongo_fdw_validator(PG_FUNCTION_ARGS); -extern MongoFdwOptions * MongoGetOptions(Oid foreignTableId); -extern void MongoFreeOptions(MongoFdwOptions *mongoFdwOptions); -mongo* GetConnection(char *host, int32 port); -void cleanup_connection(void); -void ReleaseConnection(mongo *conn); #endif /* MONGO_FDW_H */ diff --git a/mongo_query.c b/mongo_query.c index 426e801..a559a0b 100644 --- a/mongo_query.c +++ b/mongo_query.c @@ -12,6 +12,13 @@ */ #include "postgres.h" +#include "mongo_wrapper.h" + +#ifdef META_DRIVER + #include "mongoc.h" +#else + #include "mongo.h" +#endif #include "mongo_fdw.h" #include "mongo_query.h" @@ -33,7 +40,7 @@ static char * MongoOperatorName(const char *operatorName); static List * EqualityOperatorList(List *operatorList); static List * UniqueColumnList(List *operatorList); static List * ColumnOperatorList(Var *column, List *operatorList); -static void AppendConstantValue(bson *queryDocument, const char *keyName, +static void AppendConstantValue(BSON *queryDocument, const char *keyName, Const *constant); @@ -154,7 +161,7 @@ FindArgumentOfType(List *argumentList, NodeTag argumentType) * "l_shipdate >= date '1994-01-01' AND l_shipdate < date '1995-01-01'" become * "l_shipdate: { $gte: new Date(757382400000), $lt: new Date(788918400000) }". */ -bson * +BSON * QueryDocument(Oid relationId, List *opExpressionList) { List *equalityOperatorList = NIL; @@ -162,12 +169,9 @@ QueryDocument(Oid relationId, List *opExpressionList) List *columnList = NIL; ListCell *equalityOperatorCell = NULL; ListCell *columnCell = NULL; - bson *queryDocument = NULL; - int documentStatus = BSON_OK; - - queryDocument = bson_create(); - bson_init(queryDocument); + BSON *queryDocument = NULL; + queryDocument = BsonCreate(); /* * We distinguish between equality expressions and others since we need to * insert the latter (<, >, <=, >=, <>) as separate sub-documents into the @@ -210,6 +214,7 @@ QueryDocument(Oid relationId, List *opExpressionList) char *columnName = NULL; List *columnOperatorList = NIL; ListCell *columnOperatorCell = NULL; + BSON r; columnId = column->varattno; columnName = get_relid_attribute_name(relationId, columnId); @@ -218,7 +223,7 @@ QueryDocument(Oid relationId, List *opExpressionList) columnOperatorList = ColumnOperatorList(column, comparisonOperatorList); /* for comparison expressions, start a sub-document */ - bson_append_start_object(queryDocument, columnName); + BsonAppendStartObject(queryDocument, columnName, &r); foreach(columnOperatorCell, columnOperatorList) { @@ -234,15 +239,12 @@ QueryDocument(Oid relationId, List *opExpressionList) AppendConstantValue(queryDocument, mongoOperatorName, constant); } - - bson_append_finish_object(queryDocument); + BsonAppendFinishObject(queryDocument, &r); } - - documentStatus = bson_finish(queryDocument); - if (documentStatus != BSON_OK) + if (!BsonFinish(queryDocument)) { ereport(ERROR, (errmsg("could not create document for query"), - errhint("BSON error: %s", queryDocument->errstr))); + errhint("BSON error"))); } return queryDocument; @@ -362,24 +364,23 @@ ColumnOperatorList(Var *column, List *operatorList) * its MongoDB equivalent. */ static void -AppendConstantValue(bson *queryDocument, const char *keyName, Const *constant) +AppendConstantValue(BSON *queryDocument, const char *keyName, Const *constant) { if (constant->constisnull) { - bson_append_null(queryDocument, keyName); + BsonAppendNull(queryDocument, keyName); return; } - AppenMongoValue(queryDocument, keyName, constant->constvalue, constant->consttype, false); + AppenMongoValue(queryDocument, keyName, constant->constvalue, false, constant->consttype); } -int32 -AppenMongoValue(bson *queryDocument, const char *keyName, Datum value, bool isnull, Oid id) +bool +AppenMongoValue(BSON *queryDocument, const char *keyName, Datum value, bool isnull, Oid id) { - int32 status = MONGO_ERROR; - + bool status; if (isnull) { - status = bson_append_null(queryDocument, keyName); + status = BsonAppendNull(queryDocument, keyName); return status; } @@ -388,44 +389,44 @@ AppenMongoValue(bson *queryDocument, const char *keyName, Datum value, bool isnu case INT2OID: { int16 valueInt = DatumGetInt16(value); - status = bson_append_int(queryDocument, keyName, (int) valueInt); + status = BsonAppendInt32(queryDocument, keyName, (int) valueInt); break; } case INT4OID: { int32 valueInt = DatumGetInt32(value); - status = bson_append_int(queryDocument, keyName, valueInt); + status = BsonAppendInt32(queryDocument, keyName, valueInt); break; } case INT8OID: { int64 valueLong = DatumGetInt64(value); - status = bson_append_long(queryDocument, keyName, valueLong); + status = BsonAppendInt64(queryDocument, keyName, valueLong); break; } case FLOAT4OID: { float4 valueFloat = DatumGetFloat4(value); - status = bson_append_double(queryDocument, keyName, (double) valueFloat); + status = BsonAppendDouble(queryDocument, keyName, (double) valueFloat); break; } case FLOAT8OID: { float8 valueFloat = DatumGetFloat8(value); - status = bson_append_double(queryDocument, keyName, valueFloat); + status = BsonAppendDouble(queryDocument, keyName, valueFloat); break; } case NUMERICOID: { Datum valueDatum = DirectFunctionCall1(numeric_float8, value); float8 valueFloat = DatumGetFloat8(valueDatum); - status = bson_append_double(queryDocument, keyName, valueFloat); + status = BsonAppendDouble(queryDocument, keyName, valueFloat); break; } case BOOLOID: { bool valueBool = DatumGetBool(value); - status = bson_append_int(queryDocument, keyName, (int) valueBool); + status = BsonAppendBool(queryDocument, keyName, (int) valueBool); break; } case BPCHAROID: @@ -437,7 +438,7 @@ AppenMongoValue(bson *queryDocument, const char *keyName, Datum value, bool isnu bool typeVarLength = false; getTypeOutputInfo(id, &outputFunctionId, &typeVarLength); outputString = OidOutputFunctionCall(outputFunctionId, value); - status = bson_append_string(queryDocument, keyName, outputString); + status = BsonAppendUTF8(queryDocument, keyName, outputString); break; } case NAMEOID: @@ -449,8 +450,8 @@ AppenMongoValue(bson *queryDocument, const char *keyName, Datum value, bool isnu memset(bsonObjectId.bytes, 0, sizeof(bsonObjectId.bytes)); getTypeOutputInfo(id, &outputFunctionId, &typeVarLength); outputString = OidOutputFunctionCall(outputFunctionId, value); - bson_oid_from_string(&bsonObjectId, outputString); - status = bson_append_oid(queryDocument, keyName, &bsonObjectId); + BsonOidFromString(&bsonObjectId, outputString); + status = BsonAppendOid(queryDocument, keyName, &bsonObjectId); break; } case DATEOID: @@ -460,7 +461,7 @@ AppenMongoValue(bson *queryDocument, const char *keyName, Datum value, bool isnu int64 valueMicroSecs = valueTimestamp + POSTGRES_TO_UNIX_EPOCH_USECS; int64 valueMilliSecs = valueMicroSecs / 1000; - status = bson_append_date(queryDocument, keyName, valueMilliSecs); + status = BsonAppendDate(queryDocument, keyName, valueMilliSecs); break; } case TIMESTAMPOID: @@ -470,7 +471,7 @@ AppenMongoValue(bson *queryDocument, const char *keyName, Datum value, bool isnu int64 valueMicroSecs = valueTimestamp + POSTGRES_TO_UNIX_EPOCH_USECS; int64 valueMilliSecs = valueMicroSecs / 1000; - status = bson_append_date(queryDocument, keyName, valueMilliSecs); + status = BsonAppendDate(queryDocument, keyName, valueMilliSecs); break; } default: diff --git a/mongo_query.h b/mongo_query.h index 87b95e5..607bb2d 100644 --- a/mongo_query.h +++ b/mongo_query.h @@ -12,7 +12,6 @@ #ifndef MONGO_QUERY_H #define MONGO_QUERY_H -int32 AppenMongoValue(bson *queryDocument, const char *keyName, Datum value, bool isnull, Oid id); +bool AppenMongoValue(BSON *queryDocument, const char *keyName, Datum value, bool isnull, Oid id); - -#endif /* MONGO_QUERY_H */ +#endif /* MONGO_QUERY_H */