Skip to content

Fix issue 3570 and increasing db connections for pg vector and pg record manager #4808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,11 @@ class PostgresRecordManager implements RecordManagerInterface {
}

async createSchema(): Promise<void> {
try {
const dataSource = await this.getDataSource()
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)
const dataSource = await this.getDataSource()
const queryRunner = dataSource.createQueryRunner()
const tableName = this.sanitizeTableName(this.tableName)

try {
await queryRunner.manager.query(`
CREATE TABLE IF NOT EXISTS "${tableName}" (
uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
Expand All @@ -251,6 +251,8 @@ class PostgresRecordManager implements RecordManagerInterface {
return
}
throw e
} finally {
await dataSource.destroy()
}
}

Expand Down
86 changes: 21 additions & 65 deletions packages/components/nodes/vectorstores/Postgres/Postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,11 @@ import { index } from '../../../src/indexing'
import { howToUseFileUpload } from '../VectorStoreUtils'
import { VectorStore } from '@langchain/core/vectorstores'
import { VectorStoreDriver } from './driver/Base'
import { TypeORMDriver } from './driver/TypeORM'
// import { PGVectorDriver } from './driver/PGVector'
import { PGVectorDriver } from './driver/PGVector'
import { getContentColumnName, getDatabase, getHost, getPort, getTableName } from './utils'

const serverCredentialsExists = !!process.env.POSTGRES_VECTORSTORE_USER && !!process.env.POSTGRES_VECTORSTORE_PASSWORD

// added temporarily to fix the base class return for VectorStore when postgres node is using TypeORM
function getVectorStoreBaseClasses() {
// Try getting base classes through the utility function
const baseClasses = getBaseClasses(VectorStore)

// If we got results, return them
if (baseClasses && baseClasses.length > 0) {
return baseClasses
}

// If VectorStore is recognized as a class but getBaseClasses returned nothing,
// return the known inheritance chain
if (VectorStore instanceof Function) {
return ['VectorStore']
}

// Fallback to minimum required class
return ['VectorStore']
}

class Postgres_VectorStores implements INode {
label: string
name: string
Expand Down Expand Up @@ -119,25 +98,6 @@ class Postgres_VectorStores implements INode {
additionalParams: true,
optional: true
},
/*{
label: 'Driver',
name: 'driver',
type: 'options',
default: 'typeorm',
description: 'Different option to connect to Postgres',
options: [
{
label: 'TypeORM',
name: 'typeorm'
},
{
label: 'PGVector',
name: 'pgvector'
}
],
optional: true,
additionalParams: true
},*/
{
label: 'Distance Strategy',
name: 'distanceStrategy',
Expand Down Expand Up @@ -215,11 +175,7 @@ class Postgres_VectorStores implements INode {
{
label: 'Postgres Vector Store',
name: 'vectorStore',
baseClasses: [
this.type,
// ...getBaseClasses(VectorStore), // disabled temporarily for using TypeORM
...getVectorStoreBaseClasses() // added temporarily for using TypeORM
]
baseClasses: [this.type, ...getBaseClasses(VectorStore)]
}
]
}
Expand Down Expand Up @@ -247,20 +203,28 @@ class Postgres_VectorStores implements INode {

try {
if (recordManager) {
const vectorStore = await vectorStoreDriver.instanciate()
const vectorStore: any = await vectorStoreDriver.instanciate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this call creates new connection every time

maybe we could cache it by tableName?


await recordManager.createSchema()

const res = await index({
docsSource: finalDocs,
recordManager,
vectorStore,
options: {
cleanup: recordManager?.cleanup,
sourceIdKey: recordManager?.sourceIdKey ?? 'source',
vectorStoreName: tableName
let res: Partial<IndexingResult>
try {
res = await index({
docsSource: finalDocs,
recordManager,
vectorStore,
options: {
cleanup: recordManager?.cleanup,
sourceIdKey: recordManager?.sourceIdKey ?? 'source',
vectorStoreName: tableName
}
})
} finally {
if (vectorStore?.client) {
vectorStore.client.release()
vectorStore.client = undefined
}
})
}

return res
} else {
Expand Down Expand Up @@ -332,15 +296,7 @@ class Postgres_VectorStores implements INode {
}

static getDriverFromConfig(nodeData: INodeData, options: ICommonObject): VectorStoreDriver {
/*switch (nodeData.inputs?.driver) {
case 'typeorm':
return new TypeORMDriver(nodeData, options)
case 'pgvector':
return new PGVectorDriver(nodeData, options)
default:
return new TypeORMDriver(nodeData, options)
}*/
return new TypeORMDriver(nodeData, options)
return new PGVectorDriver(nodeData, options)
}
}

Expand Down
65 changes: 35 additions & 30 deletions packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
/*
* Temporary disabled due to increasing open connections without releasing them
* Use TypeORM instead

import { VectorStoreDriver } from './Base'
import { FLOWISE_CHATID } from '../../../../src'
import { DistanceStrategy, PGVectorStore, PGVectorStoreArgs } from '@langchain/community/vectorstores/pgvector'
Expand Down Expand Up @@ -87,41 +83,50 @@ export class PGVectorDriver extends VectorStoreDriver {
instance.client = await instance.pool.connect()
}

const whereClauseRegex = /WHERE ([^\n]+)/
let chatflowOr = ''

// Match chatflow uploaded file and keep filtering on other files:
// https://github.com/FlowiseAI/Flowise/pull/3367#discussion_r1804229295
if (chatId) {
parameters.push({ [FLOWISE_CHATID]: chatId })
try {
const whereClauseRegex = /WHERE ([^\n]+)/
let chatflowOr = ''

chatflowOr = `OR metadata @> $${parameters.length}`
}
// Match chatflow uploaded file and keep filtering on other files:
// https://github.com/FlowiseAI/Flowise/pull/3367#discussion_r1804229295
if (chatId) {
parameters.push({ [FLOWISE_CHATID]: chatId })
chatflowOr = `OR metadata @> $${parameters.length}`
}

if (queryString.match(whereClauseRegex)) {
queryString = queryString.replace(whereClauseRegex, `WHERE (($1) AND NOT (metadata ? '${FLOWISE_CHATID}')) ${chatflowOr}`)
} else {
const orderByClauseRegex = /ORDER BY (.*)/
// Insert WHERE clause before ORDER BY
queryString = queryString.replace(
orderByClauseRegex,
`WHERE (metadata @> '{}' AND NOT (metadata ? '${FLOWISE_CHATID}')) ${chatflowOr}
if (queryString.match(whereClauseRegex)) {
queryString = queryString.replace(
whereClauseRegex,
`WHERE (($1) AND NOT (metadata ? '${FLOWISE_CHATID}')) ${chatflowOr}`
)
} else {
const orderByClauseRegex = /ORDER BY (.*)/
// Insert WHERE clause before ORDER BY
queryString = queryString.replace(
orderByClauseRegex,
`WHERE (metadata @> '{}' AND NOT (metadata ? '${FLOWISE_CHATID}')) ${chatflowOr}
ORDER BY $1
`
)
}
)
}

// Run base function
const queryResult = await basePoolQueryFn(queryString, parameters)
// Run base function
const queryResult = await basePoolQueryFn(queryString, parameters)

// ensure connection is released
instance.client.release()
instance.client = undefined
// ensure connection is released
instance.client.release()
instance.client = undefined

return queryResult
return queryResult
} catch (e) {
console.error(e)
// ensure connection is released
instance.client?.release()
instance.client = undefined
throw e
}
}

return instance
}
}
*/