From 2e0d667c5a8927564c744f7671589af167e31f8b Mon Sep 17 00:00:00 2001 From: Lingsong Zeng Date: Mon, 7 Jul 2025 23:29:37 +0800 Subject: [PATCH 1/4] Fix issue 3570 and increasing db connections in PG vector & Record manager --- .../PostgresRecordManager/PostgresRecordManager.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts index cf239522f5b..dcf271785ef 100644 --- a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts +++ b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts @@ -222,10 +222,12 @@ class PostgresRecordManager implements RecordManagerInterface { } async createSchema(): Promise { + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() + const tableName = this.sanitizeTableName(this.tableName) + try { - const dataSource = await this.getDataSource() - const queryRunner = dataSource.createQueryRunner() - const tableName = this.sanitizeTableName(this.tableName) + await queryRunner.query('CREATE EXTENSION IF NOT EXISTS pgcrypto;') await queryRunner.manager.query(` CREATE TABLE IF NOT EXISTS "${tableName}" ( @@ -251,6 +253,8 @@ class PostgresRecordManager implements RecordManagerInterface { return } throw e + } finally { + await dataSource.destroy() } } From f370062c7961943f890c6d4122c2662e9e9ea1db Mon Sep 17 00:00:00 2001 From: Lingsong Zeng Date: Mon, 7 Jul 2025 23:39:35 +0800 Subject: [PATCH 2/4] Update Postgres.ts --- .../nodes/vectorstores/Postgres/Postgres.ts | 86 +++++-------------- 1 file changed, 21 insertions(+), 65 deletions(-) diff --git a/packages/components/nodes/vectorstores/Postgres/Postgres.ts b/packages/components/nodes/vectorstores/Postgres/Postgres.ts index ad0f82bb057..7c3dee67ba6 100644 --- a/packages/components/nodes/vectorstores/Postgres/Postgres.ts +++ b/packages/components/nodes/vectorstores/Postgres/Postgres.ts @@ -6,32 +6,11 @@ import { index } from '../../../src/indexing' import { howToUseFileUpload } from '../VectorStoreUtils' import { VectorStore } from '@langchain/core/vectorstores' import { VectorStoreDriver } from './driver/Base' -import { TypeORMDriver } from './driver/TypeORM' -// import { PGVectorDriver } from './driver/PGVector' +import { PGVectorDriver } from './driver/PGVector' import { getContentColumnName, getDatabase, getHost, getPort, getTableName } from './utils' const serverCredentialsExists = !!process.env.POSTGRES_VECTORSTORE_USER && !!process.env.POSTGRES_VECTORSTORE_PASSWORD -// added temporarily to fix the base class return for VectorStore when postgres node is using TypeORM -function getVectorStoreBaseClasses() { - // Try getting base classes through the utility function - const baseClasses = getBaseClasses(VectorStore) - - // If we got results, return them - if (baseClasses && baseClasses.length > 0) { - return baseClasses - } - - // If VectorStore is recognized as a class but getBaseClasses returned nothing, - // return the known inheritance chain - if (VectorStore instanceof Function) { - return ['VectorStore'] - } - - // Fallback to minimum required class - return ['VectorStore'] -} - class Postgres_VectorStores implements INode { label: string name: string @@ -119,25 +98,6 @@ class Postgres_VectorStores implements INode { additionalParams: true, optional: true }, - /*{ - label: 'Driver', - name: 'driver', - type: 'options', - default: 'typeorm', - description: 'Different option to connect to Postgres', - options: [ - { - label: 'TypeORM', - name: 'typeorm' - }, - { - label: 'PGVector', - name: 'pgvector' - } - ], - optional: true, - additionalParams: true - },*/ { label: 'Distance Strategy', name: 'distanceStrategy', @@ -215,11 +175,7 @@ class Postgres_VectorStores implements INode { { label: 'Postgres Vector Store', name: 'vectorStore', - baseClasses: [ - this.type, - // ...getBaseClasses(VectorStore), // disabled temporarily for using TypeORM - ...getVectorStoreBaseClasses() // added temporarily for using TypeORM - ] + baseClasses: [this.type, ...getBaseClasses(VectorStore)] } ] } @@ -247,20 +203,28 @@ class Postgres_VectorStores implements INode { try { if (recordManager) { - const vectorStore = await vectorStoreDriver.instanciate() + const vectorStore: any = await vectorStoreDriver.instanciate() await recordManager.createSchema() - const res = await index({ - docsSource: finalDocs, - recordManager, - vectorStore, - options: { - cleanup: recordManager?.cleanup, - sourceIdKey: recordManager?.sourceIdKey ?? 'source', - vectorStoreName: tableName + let res: Partial + try { + res = await index({ + docsSource: finalDocs, + recordManager, + vectorStore, + options: { + cleanup: recordManager?.cleanup, + sourceIdKey: recordManager?.sourceIdKey ?? 'source', + vectorStoreName: tableName + } + }) + } finally { + if (vectorStore?.client) { + vectorStore.client.release() + vectorStore.client = undefined } - }) + } return res } else { @@ -332,15 +296,7 @@ class Postgres_VectorStores implements INode { } static getDriverFromConfig(nodeData: INodeData, options: ICommonObject): VectorStoreDriver { - /*switch (nodeData.inputs?.driver) { - case 'typeorm': - return new TypeORMDriver(nodeData, options) - case 'pgvector': - return new PGVectorDriver(nodeData, options) - default: - return new TypeORMDriver(nodeData, options) - }*/ - return new TypeORMDriver(nodeData, options) + return new PGVectorDriver(nodeData, options) } } From cca41a24f910a62b3dfd17feea32a50c5b2bae93 Mon Sep 17 00:00:00 2001 From: Lingsong Zeng Date: Mon, 7 Jul 2025 23:40:31 +0800 Subject: [PATCH 3/4] Update PGVector.ts --- .../vectorstores/Postgres/driver/PGVector.ts | 65 ++++++++++--------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts b/packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts index 608858a1923..34bddde2d21 100644 --- a/packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts +++ b/packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts @@ -1,7 +1,3 @@ -/* -* Temporary disabled due to increasing open connections without releasing them -* Use TypeORM instead - import { VectorStoreDriver } from './Base' import { FLOWISE_CHATID } from '../../../../src' import { DistanceStrategy, PGVectorStore, PGVectorStoreArgs } from '@langchain/community/vectorstores/pgvector' @@ -87,41 +83,50 @@ export class PGVectorDriver extends VectorStoreDriver { instance.client = await instance.pool.connect() } - const whereClauseRegex = /WHERE ([^\n]+)/ - let chatflowOr = '' - - // Match chatflow uploaded file and keep filtering on other files: - // https://github.com/FlowiseAI/Flowise/pull/3367#discussion_r1804229295 - if (chatId) { - parameters.push({ [FLOWISE_CHATID]: chatId }) + try { + const whereClauseRegex = /WHERE ([^\n]+)/ + let chatflowOr = '' - chatflowOr = `OR metadata @> $${parameters.length}` - } + // Match chatflow uploaded file and keep filtering on other files: + // https://github.com/FlowiseAI/Flowise/pull/3367#discussion_r1804229295 + if (chatId) { + parameters.push({ [FLOWISE_CHATID]: chatId }) + chatflowOr = `OR metadata @> $${parameters.length}` + } - if (queryString.match(whereClauseRegex)) { - queryString = queryString.replace(whereClauseRegex, `WHERE (($1) AND NOT (metadata ? '${FLOWISE_CHATID}')) ${chatflowOr}`) - } else { - const orderByClauseRegex = /ORDER BY (.*)/ - // Insert WHERE clause before ORDER BY - queryString = queryString.replace( - orderByClauseRegex, - `WHERE (metadata @> '{}' AND NOT (metadata ? '${FLOWISE_CHATID}')) ${chatflowOr} + if (queryString.match(whereClauseRegex)) { + queryString = queryString.replace( + whereClauseRegex, + `WHERE (($1) AND NOT (metadata ? '${FLOWISE_CHATID}')) ${chatflowOr}` + ) + } else { + const orderByClauseRegex = /ORDER BY (.*)/ + // Insert WHERE clause before ORDER BY + queryString = queryString.replace( + orderByClauseRegex, + `WHERE (metadata @> '{}' AND NOT (metadata ? '${FLOWISE_CHATID}')) ${chatflowOr} ORDER BY $1 ` - ) - } + ) + } - // Run base function - const queryResult = await basePoolQueryFn(queryString, parameters) + // Run base function + const queryResult = await basePoolQueryFn(queryString, parameters) - // ensure connection is released - instance.client.release() - instance.client = undefined + // ensure connection is released + instance.client.release() + instance.client = undefined - return queryResult + return queryResult + } catch (e) { + console.error(e) + // ensure connection is released + instance.client?.release() + instance.client = undefined + throw e + } } return instance } } -*/ From 97e6907f067b4db676f48bcddcb73b35dd3e3056 Mon Sep 17 00:00:00 2001 From: Lingsong Zeng Date: Mon, 7 Jul 2025 23:48:15 +0800 Subject: [PATCH 4/4] Update PostgresRecordManager.ts --- .../PostgresRecordManager/PostgresRecordManager.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts index dcf271785ef..1693fd9f27d 100644 --- a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts +++ b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts @@ -227,8 +227,6 @@ class PostgresRecordManager implements RecordManagerInterface { const tableName = this.sanitizeTableName(this.tableName) try { - await queryRunner.query('CREATE EXTENSION IF NOT EXISTS pgcrypto;') - await queryRunner.manager.query(` CREATE TABLE IF NOT EXISTS "${tableName}" ( uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(),