diff --git a/src/hnsw.c b/src/hnsw.c index 9276db57e..fcf3895d0 100644 --- a/src/hnsw.c +++ b/src/hnsw.c @@ -243,7 +243,7 @@ Datum hnsw_handler(PG_FUNCTION_ARGS __attribute__((unused))) amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; - amroutine->amcaninclude = false; + amroutine->amcaninclude = true; /* supports INCLUDE clauses, for index-only scans */ #if PG_VERSION_NUM >= 130000 amroutine->amusemaintenanceworkmem = false; /* not used during VACUUM */ amroutine->amparallelvacuumoptions = VACUUM_OPTION_PARALLEL_BULKDEL; @@ -255,7 +255,7 @@ Datum hnsw_handler(PG_FUNCTION_ARGS __attribute__((unused))) amroutine->aminsert = ldb_aminsert; amroutine->ambulkdelete = ldb_ambulkdelete; amroutine->amvacuumcleanup = ldb_amvacuumcleanup; - amroutine->amcanreturn = NULL; + amroutine->amcanreturn = ldb_canreturn; amroutine->amcostestimate = hnswcostestimate; amroutine->amoptions = ldb_amoptions; amroutine->amproperty = NULL; @@ -397,3 +397,16 @@ float4 *DatumGetSizedFloatArray(Datum datum, HnswColumnType type, int dimensions elog(ERROR, "Unsupported type"); } } + +/* +* Check whether we support index-only scans. +* +* We always do, so return true. +*/ +bool +ldb_canreturn(Relation index, int attno) +{ + LDB_UNUSED(index); + LDB_UNUSED(attno); + return true; +} \ No newline at end of file diff --git a/src/hnsw.h b/src/hnsw.h index 2f448687c..6ca176992 100644 --- a/src/hnsw.h +++ b/src/hnsw.h @@ -34,6 +34,7 @@ PGDLLEXPORT Datum cos_dist(PG_FUNCTION_ARGS); HnswColumnType GetIndexColumnType(Relation index); float4 *DatumGetSizedFloatArray(Datum datum, HnswColumnType type, int dimensions); +bool ldb_canreturn(Relation index, int attno); #define LDB_UNUSED(x) (void)(x) diff --git a/src/hnsw/external_index.c b/src/hnsw/external_index.c index e7868e76d..0163d48e5 100644 --- a/src/hnsw/external_index.c +++ b/src/hnsw/external_index.c @@ -160,6 +160,7 @@ void StoreExternalIndexBlockMapGroup(Relation index, // note: even if the condition is true, nodepage may be too large // as the condition does not take into account the flexible array component + // todo:: can we make this estimate for the nodepage more accurate by being conservative with the node_level? If we assume some level, we can compute the exact size of the nodepage here while(PageGetFreeSpace(page) > sizeof(HnswIndexTuple) + dimension * sizeof(float)) { if(node_id >= first_node_index + num_added_vectors) break; memset(bufferpage, 0, BLCKSZ); @@ -343,6 +344,7 @@ HnswIndexTuple *PrepareIndexTuple(Relation index_rel, usearch_metadata_t *metadata, uint32 new_tuple_id, uint32 new_tuple_level, + uint32 extra_columns_size, HnswInsertState *insertstate) { // if any data blocks exist, the last one's buffer will be read into this @@ -363,10 +365,11 @@ HnswIndexTuple *PrepareIndexTuple(Relation index_rel, // allocate buffer to construct the new node // note that we allocate more than sizeof(HnswIndexTuple) since the struct has a flexible array member // which depends on parameters passed into UsearchNodeBytes above - alloced_tuple = (HnswIndexTuple *)palloc0(sizeof(HnswIndexTuple) + new_tuple_size); + alloced_tuple = (HnswIndexTuple *)palloc0(sizeof(HnswIndexTuple) + new_tuple_size + extra_columns_size); alloced_tuple->id = new_tuple_id; alloced_tuple->level = new_tuple_level; alloced_tuple->size = new_tuple_size; + alloced_tuple->extra_columns_size = extra_columns_size; /*** Add a new tuple corresponding to the added vector to the list of tuples in the index * (create new page if necessary) ***/ @@ -385,7 +388,7 @@ HnswIndexTuple *PrepareIndexTuple(Relation index_rel, PageInit(page, BufferGetPageSize(new_dblock), sizeof(HnswIndexPageSpecialBlock)); extra_dirtied_add(insertstate->retriever_ctx->extra_dirted, new_vector_blockno, new_dblock, page); - new_tup_at = HnswIndexPageAddVector(page, alloced_tuple, alloced_tuple->size); + new_tup_at = HnswIndexPageAddVector(page, alloced_tuple, alloced_tuple->size + alloced_tuple->extra_columns_size); MarkBufferDirty(new_dblock); } else { @@ -402,11 +405,12 @@ HnswIndexTuple *PrepareIndexTuple(Relation index_rel, const uint32 blockmaps_are_enough = new_tuple_id / HNSW_BLOCKMAP_BLOCKS_PER_PAGE + 1 < ((uint32)1 << (hdr->blockmap_page_groups + 1)); - if(PageGetFreeSpace(page) > sizeof(HnswIndexTuple) + alloced_tuple->size && blockmaps_are_enough) { + if(PageGetFreeSpace(page) > sizeof(HnswIndexTuple) + alloced_tuple->size + alloced_tuple->extra_columns_size && blockmaps_are_enough) { + // there is enough space in the last page to fit the new vector // so we just append it to the page ldb_dlog("InsertBranching: we adding element to existing page"); - new_tup_at = HnswIndexPageAddVector(page, alloced_tuple, alloced_tuple->size); + new_tup_at = HnswIndexPageAddVector(page, alloced_tuple, alloced_tuple->size + alloced_tuple->extra_columns_size); new_vector_blockno = BufferGetBlockNumber(last_dblock); assert(new_vector_blockno == hdr->last_data_block); @@ -427,7 +431,7 @@ HnswIndexTuple *PrepareIndexTuple(Relation index_rel, // check the count of blockmaps, see if there's place to add the block id, if yes add, if no create a // new group check if already existing blockmaps are not enough new_tuple_id / // HNSW_BLOCKMAP_BLOCKS_PER_PAGE + 1 is kth blockmap we check if k is more than already created 2^groups - if(new_tuple_id / HNSW_BLOCKMAP_BLOCKS_PER_PAGE + 1 >= ((uint32)1 << (hdr->blockmap_page_groups + 1))) { + if(!blockmaps_are_enough) { CreateBlockMapGroup(hdr, index_rel, MAIN_FORKNUM, new_tuple_id, hdr->blockmap_page_groups + 1); } @@ -452,7 +456,7 @@ HnswIndexTuple *PrepareIndexTuple(Relation index_rel, PageInit(page, BufferGetPageSize(new_dblock), sizeof(HnswIndexPageSpecialBlock)); extra_dirtied_add(insertstate->retriever_ctx->extra_dirted, new_vector_blockno, new_dblock, page); - new_tup_at = HnswIndexPageAddVector(page, alloced_tuple, alloced_tuple->size); + new_tup_at = HnswIndexPageAddVector(page, alloced_tuple, alloced_tuple->size + alloced_tuple->extra_columns_size); MarkBufferDirty(new_dblock); } @@ -463,6 +467,7 @@ HnswIndexTuple *PrepareIndexTuple(Relation index_rel, assert(new_tup_ref->id == new_tuple_id); assert(new_tup_ref->level == new_tuple_level); assert(new_tup_ref->size == new_tuple_size); + assert(new_tup_ref->extra_columns_size == extra_columns_size); page = NULL; // to avoid its accidental use /*** Update pagemap with the information of the added page ***/ { diff --git a/src/hnsw/external_index.h b/src/hnsw/external_index.h index d2dba03ec..c0b9bad24 100644 --- a/src/hnsw/external_index.h +++ b/src/hnsw/external_index.h @@ -62,8 +62,15 @@ typedef struct HnswIndexTuple { uint32 id; uint32 level; - // stores size of the flexible array member + + // stores size of the vector data uint32 size; + + // stores size of the non-key column tuple data as well (written, sequentially, right after the vector data in the + // flexible array member) + uint32 extra_columns_size; + + // note that the total size of the flexible array member is size + extra_columns_size char node[ FLEXIBLE_ARRAY_MEMBER ]; } HnswIndexTuple; @@ -126,6 +133,7 @@ HnswIndexTuple *PrepareIndexTuple(Relation index_rel, usearch_metadata_t *metadata, uint32 new_tuple_id, uint32 new_tuple_level, + uint32 extra_columns_size, HnswInsertState *insertstate); #endif // LDB_HNSW_EXTERNAL_INDEX_H diff --git a/src/hnsw/insert.c b/src/hnsw/insert.c index 94cd19b47..2b016c783 100644 --- a/src/hnsw/insert.c +++ b/src/hnsw/insert.c @@ -3,6 +3,7 @@ #include "insert.h" #include +#include #include #if PG_VERSION_NUM >= 150000 #include @@ -70,6 +71,12 @@ bool ldb_aminsert(Relation index, uint32 new_tuple_id; HnswIndexTuple *new_tuple; usearch_init_options_t opts = {0}; + TupleDesc tupdesc = RelationGetDescr(index); + uint32 num_attributes = tupdesc->natts; + bool extra_columns_present + = num_attributes > 1; /* whether we have non-key columns to insert, for index-only scans*/ + IndexTuple itup = NULL; + uint32 extra_columns_size = 0; LDB_UNUSED(heap); LDB_UNUSED(indexInfo); #if PG_VERSION_NUM >= 140000 @@ -135,6 +142,7 @@ bool ldb_aminsert(Relation index, datum = PointerGetDatum(PG_DETOAST_DATUM(values[ 0 ])); float4 *vector = DatumGetSizedFloatArray(datum, insertstate->columnType, opts.dimensions); + #if LANTERNDB_COPYNODES // currently not fully ported to the latest changes assert(false); @@ -151,6 +159,26 @@ bool ldb_aminsert(Relation index, elog(ERROR, "usearch newnode error: %s", error); } + // create a postgres IndexTuple containing the extra non-key column data (hence why we ignore values[0]) + // the vector is the key column, which comes before non-key columns in this function-- we can also only have one key + // for now, so the vector must be the 0th entry + if(extra_columns_present) { + // ignore the first entry which is the vector, because we already store it + + // make a new copy of isnull in case something else references it (as opposed to setting first entry to true and then back) + // todo:: this could be too cautious though... naive thing might work here as well + bool* fakeisnull = (bool*)palloc(sizeof(bool) * num_attributes); + memcpy(fakeisnull, isnull, sizeof(bool) * num_attributes); + fakeisnull[0] = true; + + itup = index_form_tuple(tupdesc, values, fakeisnull); + itup->t_tid = *heap_tid; + + extra_columns_size = IndexTupleSize(itup); + + pfree(fakeisnull); + } + new_tuple_id = hdr->num_vectors; // we are adding the following pages to the Generic XLog // 1) the header page @@ -158,7 +186,14 @@ bool ldb_aminsert(Relation index, // 3) (sometimes) the page that used to be last page of the index // 4) The blockmap page for the block in which the vector was added // Generic XLog supports up to 4 pages in a single commit, so we are good. - new_tuple = PrepareIndexTuple(index, state, hdr, &meta, new_tuple_id, level, insertstate); + new_tuple = PrepareIndexTuple(index, state, hdr, &meta, new_tuple_id, level, extra_columns_size, insertstate); + + // copy the extra non-key column data so we can store it in our tuple + if(extra_columns_present) { + char *extra_columns_tape = new_tuple->node + new_tuple->size; + memcpy(extra_columns_tape, itup, new_tuple->extra_columns_size); + pfree(itup); + } usearch_add_external( uidx, *(unsigned long *)heap_tid, vector, new_tuple->node, usearch_scalar_f32_k, level, &error); diff --git a/src/hnsw/scan.c b/src/hnsw/scan.c index 1bf384f0f..ffc6d8310 100644 --- a/src/hnsw/scan.c +++ b/src/hnsw/scan.c @@ -6,6 +6,8 @@ #include #include +#include + #include "bench.h" #include "build.h" #include "external_index.h" @@ -87,6 +89,7 @@ IndexScanDesc ldb_ambeginscan(Relation index, int nkeys, int norderbys) UnlockReleaseBuffer(buf); scan->opaque = scanstate; + scan->xs_itup = NULL; return scan; } @@ -119,6 +122,10 @@ void ldb_amendscan(IndexScanDesc scan) if(scanstate->labels) pfree(scanstate->labels); + if(scanstate->tapes) pfree(scanstate->tapes); + + if(scan->xs_itup) pfree(scan->xs_itup); + pfree(scanstate); scan->opaque = NULL; } @@ -133,6 +140,7 @@ void ldb_amrescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, { HnswScanState *scanstate = (HnswScanState *)scan->opaque; scanstate->first = true; + scan->xs_itup = NULL; LDB_UNUSED(norderbys); LDB_UNUSED(nkeys); @@ -191,10 +199,13 @@ bool ldb_amgettuple(IndexScanDesc scan, ScanDirection dir) if(scanstate->labels == NULL) { scanstate->labels = palloc(k * sizeof(usearch_label_t)); } + if (scanstate->tapes == NULL) { + scanstate->tapes = palloc(k * sizeof(char*)); + } ldb_dlog("LANTERN querying index for %d elements", k); - num_returned = usearch_search( - scanstate->usearch_index, vec, usearch_scalar_f32_k, k, scanstate->labels, scanstate->distances, &error); + num_returned = usearch_search_with_tapes( + scanstate->usearch_index, vec, usearch_scalar_f32_k, k, scanstate->labels, scanstate->distances, scanstate->tapes, &error); ldb_wal_retriever_area_reset(scanstate->retriever_ctx, NULL); scanstate->count = num_returned; @@ -226,10 +237,11 @@ bool ldb_amgettuple(IndexScanDesc scan, ScanDirection dir) /* double k and reallocate arrays to account for increased size */ scanstate->distances = repalloc(scanstate->distances, k * sizeof(float)); scanstate->labels = repalloc(scanstate->labels, k * sizeof(usearch_label_t)); + scanstate->tapes = repalloc(scanstate->tapes, k * sizeof(char*)); ldb_dlog("LANTERN - querying index for %d elements", k); - num_returned = usearch_search( - scanstate->usearch_index, vec, usearch_scalar_f32_k, k, scanstate->labels, scanstate->distances, &error); + num_returned = usearch_search_with_tapes( + scanstate->usearch_index, vec, usearch_scalar_f32_k, k, scanstate->labels, scanstate->distances, scanstate->tapes, &error); ldb_wal_retriever_area_reset(scanstate->retriever_ctx, NULL); scanstate->count = num_returned; @@ -249,6 +261,72 @@ bool ldb_amgettuple(IndexScanDesc scan, ScanDirection dir) #else scan->xs_ctup.t_self = *tid; #endif + // TODO: check if this is also compatible with the old version of postgres + // if the scan (index-only scan) requests the actual tuple, we set that information here + if(scan->xs_want_itup) { + + scan->xs_itupdesc = RelationGetDescr(scan->indexRelation); + uint32 num_attributes = scan->xs_itupdesc->natts; + + char* tape = scanstate->tapes[ scanstate->current ]; + uint32 vector_size = *(uint32*)(tape - (offsetof(HnswIndexTuple, node) - offsetof(HnswIndexTuple, size))); + + // this is the IndexTuple we created when we inserted the row... it is missing the vector data. We need to add it here + IndexTuple olditup = (IndexTuple)(scanstate->tapes[ scanstate->current ] + vector_size); + + Datum* new_values = (Datum*) palloc(sizeof(Datum) * num_attributes); + bool* new_isnull = (bool*) palloc(sizeof(bool) * num_attributes); + + // copy the old values and isnulls into the rest of the array + if(num_attributes > 1) { + index_deform_tuple(olditup, scan->xs_itupdesc, new_values, new_isnull); + } + + // set/modify the first entries, corresponding to the vector + new_isnull[0] = false; + + // vector_size corresponds to the entire "usearch" schema of storing a vector, which includes metadata + // the last dim*sizeof(float) entries of this schema is the actual vector data + // we process the array to be float4s when inserting regardless of type, so we do the same here + float4* vector = (float4*)(scanstate->tapes[ scanstate->current ] + vector_size - (sizeof(float4) * scanstate->dimensions)); + + // we build a proper Datum from this vector + // can't just cast to Datum because postgres processes it internally, like using TOAST + uint32 array_length = scanstate->dimensions; + ArrayType *array; + Datum *elem_datums; + + elem_datums = (Datum *) palloc(array_length * sizeof(Datum)); + for (uint32 i = 0; i < array_length; i++) { + elem_datums[i] = Float4GetDatum(vector[i]); + } + + // we want a 1-D array + int dims[] = {array_length}; + // lower bounds for each dimension; usually 1 for PostgreSQL arrays + int lbs[] = {1}; + + array = construct_md_array(elem_datums, NULL, 1, dims, lbs, FLOAT4OID, sizeof(float4), true, 'i'); + Datum firstvalue = PointerGetDatum(array); + + new_values[0] = firstvalue; + + // todo:: are we leaking memory here? + IndexTuple newitup = index_form_tuple(scan->xs_itupdesc, new_values, new_isnull); + newitup->t_tid = olditup->t_tid; + + // clean up IndexTuple created from previous row + if(scan->xs_itup) { + pfree(scan->xs_itup); + } + + scan->xs_itup = newitup; + + pfree(elem_datums); + pfree(new_values); + pfree(new_isnull); + } + // todo:: there is a mid-sized designed issue with index storage // labels must be large enought to store relblockno+ indexblockno diff --git a/src/hnsw/scan.h b/src/hnsw/scan.h index fa31cb3db..b32f0d036 100644 --- a/src/hnsw/scan.h +++ b/src/hnsw/scan.h @@ -16,6 +16,7 @@ typedef struct HnswScanState ItemPointer iptr; float *distances; usearch_label_t *labels; + char **tapes; HnswColumnType columnType; int dimensions; // indicates whether we are retrieving the first tuple diff --git a/src/hooks/executor_start.c b/src/hooks/executor_start.c index 866cae499..34fb2974b 100644 --- a/src/hooks/executor_start.c +++ b/src/hooks/executor_start.c @@ -17,22 +17,22 @@ ExecutorStart_hook_type original_ExecutorStart_hook = NULL; typedef struct { List *oidList; - bool isIndexScan; + int indexScanCount; } OperatorUsedCorrectlyContext; static bool operator_used_incorrectly_walker(Node *node, void *context) { OperatorUsedCorrectlyContext *context_typed = (OperatorUsedCorrectlyContext *)context; if(node == NULL) return false; - if(IsA(node, IndexScan)) { - context_typed->isIndexScan = true; + if(IsA(node, IndexScan) || IsA(node, IndexOnlyScan)) { + context_typed->indexScanCount++; bool status = plan_tree_walker((Plan *)node, operator_used_incorrectly_walker, context); - context_typed->isIndexScan = false; + context_typed->indexScanCount--; return status; } if(IsA(node, OpExpr)) { OpExpr *opExpr = (OpExpr *)node; - if(list_member_oid(context_typed->oidList, opExpr->opno) && !context_typed->isIndexScan) { + if(list_member_oid(context_typed->oidList, opExpr->opno) && context_typed->indexScanCount == 0) { return true; } } @@ -57,7 +57,7 @@ static void validate_operator_usage(Plan *plan, List *oidList) { OperatorUsedCorrectlyContext context; context.oidList = oidList; - context.isIndexScan = false; + context.indexScanCount = 0; if(operator_used_incorrectly_walker((Node *)plan, (void *)&context)) { elog(ERROR, "Operator <-> has no standalone meaning and is reserved for use in vector index lookups only"); } diff --git a/third_party/usearch b/third_party/usearch index b617cf312..e1f92fe54 160000 --- a/third_party/usearch +++ b/third_party/usearch @@ -1 +1 @@ -Subproject commit b617cf312b7a67b292516ca274e77d85e96d3e32 +Subproject commit e1f92fe540b4ebff82852330ef8c69a0ce492ee3