11using FlowSynx . PluginCore ;
22using FlowSynx . PluginCore . Extensions ;
3- using FlowSynx . PluginCore . Helpers ;
43using FlowSynx . Plugins . PostgreSql . Models ;
54using FlowSynx . Plugins . PostgreSql . Services ;
65using Npgsql ;
76using NpgsqlTypes ;
87
98namespace FlowSynx . Plugins . PostgreSql ;
109
11- public class PostgreSqlPlugin : IPlugin
10+ public class PostgreSqlPlugin : IPlugin
1211{
13- private IPluginLogger ? _logger ;
12+ private readonly IGuidProvider _guidProvider ;
1413 private readonly IReflectionGuard _reflectionGuard ;
15- private PostgreSqlPluginSpecifications _postgreSqlSpecifications = null ! ;
14+ private IPluginLogger ? _logger ;
15+ private PostgreSqlPluginSpecifications _specifications = null ! ;
1616 private bool _isInitialized ;
1717
1818 public PostgreSqlPlugin ( )
19- : this ( new DefaultReflectionGuard ( ) ) { }
19+ : this ( new GuidProvider ( ) , new DefaultReflectionGuard ( ) ) { }
2020
21- internal PostgreSqlPlugin ( IReflectionGuard reflectionGuard )
21+ internal PostgreSqlPlugin ( IGuidProvider guidProvider , IReflectionGuard reflectionGuard )
2222 {
23+ _guidProvider = guidProvider ?? throw new ArgumentNullException ( nameof ( guidProvider ) ) ;
2324 _reflectionGuard = reflectionGuard ?? throw new ArgumentNullException ( nameof ( reflectionGuard ) ) ;
2425 }
2526
26- public PluginMetadata Metadata => new PluginMetadata
27+ public PluginMetadata Metadata => new ( )
2728 {
2829 Id = Guid . Parse ( "e2c349bc-6bfc-4e1e-acce-8dbda585abcf" ) ,
2930 Name = "PostgreSql" ,
3031 CompanyName = "FlowSynx" ,
3132 Description = Resources . PluginDescription ,
32- Version = new Version ( 1 , 1 , 1 ) ,
33+ Version = new Version ( 1 , 2 , 0 ) ,
3334 Category = PluginCategory . Database ,
3435 Authors = new List < string > { "FlowSynx" } ,
3536 Copyright = "© FlowSynx. All rights reserved." ,
3637 Icon = "flowsynx.png" ,
3738 ReadMe = "README.md" ,
3839 RepositoryUrl = "https://github.com/flowsynx/plugin-postgresql" ,
3940 ProjectUrl = "https://flowsynx.io" ,
40- Tags = new List < string > ( ) { "flowSynx" , "sql" , "database" , "data" , "postgresql" } ,
41+ Tags = new List < string > { "flowSynx" , "sql" , "database" , "data" , "postgresql" } ,
4142 MinimumFlowSynxVersion = new Version ( 1 , 1 , 1 )
4243 } ;
4344
4445 public PluginSpecifications ? Specifications { get ; set ; }
45-
4646 public Type SpecificationsType => typeof ( PostgreSqlPluginSpecifications ) ;
4747
48- private Dictionary < string , Func < InputParameter , CancellationToken , Task < object ? > > > OperationMap => new ( StringComparer . OrdinalIgnoreCase )
49- {
50- [ "query" ] = async ( parameters , cancellationToken ) => await ExecuteQueryAsync ( parameters , cancellationToken ) ,
51- [ "execute" ] = async ( parameters , cancellationToken ) => { await ExecuteNonQueryAsync ( parameters , cancellationToken ) ; return null ; }
52- } ;
48+ private Dictionary < string , Func < InputParameter , CancellationToken , Task < object ? > > > OperationMap =>
49+ new Dictionary < string , Func < InputParameter , CancellationToken , Task < object ? > > > ( StringComparer . OrdinalIgnoreCase )
50+ {
51+ [ "query" ] = async ( p , t ) => await ExecuteQueryAsync ( p , t ) ,
52+ [ "execute" ] = async ( p , t ) => { await ExecuteNonQueryAsync ( p , t ) ; return null ; }
53+ } ;
5354
5455 public IReadOnlyCollection < string > SupportedOperations => OperationMap . Keys ;
5556
5657 public Task Initialize ( IPluginLogger logger )
5758 {
5859 ThrowIfReflection ( ) ;
5960 ArgumentNullException . ThrowIfNull ( logger ) ;
60- _postgreSqlSpecifications = Specifications . ToObject < PostgreSqlPluginSpecifications > ( ) ;
61+
62+ _specifications = Specifications . ToObject < PostgreSqlPluginSpecifications > ( ) ;
6163 _logger = logger ;
6264 _isInitialized = true ;
65+
6366 return Task . CompletedTask ;
6467 }
6568
@@ -69,185 +72,182 @@ public Task Initialize(IPluginLogger logger)
6972 ThrowIfReflection ( ) ;
7073 ThrowIfNotInitialized ( ) ;
7174
72- var inputParameter = parameters . ToObject < InputParameter > ( ) ;
73- var operation = inputParameter . Operation ;
75+ var input = parameters . ToObject < InputParameter > ( ) ;
76+ if ( ! OperationMap . TryGetValue ( input . Operation , out var handler ) )
77+ throw new NotSupportedException ( $ "PostgreSQL plugin: Operation '{ input . Operation } ' is not supported.") ;
7478
75- if ( OperationMap . TryGetValue ( operation , out var handler ) )
76- {
77- return await handler ( inputParameter , cancellationToken ) ;
78- }
79-
80- throw new NotSupportedException ( $ "PostgreSQL plugin: Operation '{ operation } ' is not supported.") ;
79+ return await handler ( input , cancellationToken ) ;
8180 }
8281
83- private void ThrowIfReflection ( )
84- {
85- if ( _reflectionGuard . IsCalledViaReflection ( ) )
86- throw new InvalidOperationException ( Resources . ReflectionBasedAccessIsNotAllowed ) ;
87- }
88-
89- private void ThrowIfNotInitialized ( )
90- {
91- if ( ! _isInitialized )
92- throw new InvalidOperationException ( $ "Plugin '{ Metadata . Name } ' v{ Metadata . Version } is not initialized.") ;
93- }
82+ #region private methods
9483
95- private async Task ExecuteNonQueryAsync ( InputParameter parameters , CancellationToken cancellationToken )
84+ private async Task ExecuteNonQueryAsync ( InputParameter input , CancellationToken token )
9685 {
97- var ( sql , sqlParams ) = GetSqlAndParameters ( parameters ) ;
86+ var ( sql , sqlParams ) = ExtractSqlParameters ( input ) ;
87+ var connectionString = NormalizeConnectionString ( _specifications . ConnectionString ) ;
9888
9989 try
10090 {
101- var connectionString = NormalizePostgresConnectionString ( _postgreSqlSpecifications . ConnectionString ) ;
102- var connection = new NpgsqlConnection ( connectionString ) ;
103- await connection . OpenAsync ( ) ;
104- using var cmd = new NpgsqlCommand ( parameters . Sql , connection ) ;
105-
106- AddParameters ( cmd , sqlParams ) ;
91+ await using var connection = new NpgsqlConnection ( connectionString ) ;
92+ await connection . OpenAsync ( token ) ;
10793
108- cancellationToken . ThrowIfCancellationRequested ( ) ;
94+ var context = ParseInputData ( input . Data ) ;
10995
110- int affectedRows = await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
111- _logger ? . LogInfo ( $ "Non-query executed successfully. Rows affected: { affectedRows } .") ;
96+ if ( context . StructuredData ? . Count > 0 )
97+ {
98+ await ExecuteStructuredDataAsync ( connection , sql , context , token ) ;
99+ }
100+ else
101+ {
102+ await ExecuteSingleNonQueryAsync ( connection , sql , sqlParams , token ) ;
103+ }
112104 }
113105 catch ( Exception ex )
114106 {
115- _logger ? . LogError ( $ "Error executing PostgreSQL sql statement. Error : { ex . Message } ") ;
107+ _logger ? . LogError ( $ "Error executing PostgreSQL SQL statement: { ex . Message } ") ;
116108 throw ;
117109 }
118110 }
119111
120- private async Task < PluginContext > ExecuteQueryAsync ( InputParameter parameters , CancellationToken cancellationToken )
112+ private async Task < PluginContext > ExecuteQueryAsync ( InputParameter input , CancellationToken token )
121113 {
122- var ( sql , sqlParams ) = GetSqlAndParameters ( parameters ) ;
114+ var ( sql , sqlParams ) = ExtractSqlParameters ( input ) ;
115+ var connectionString = NormalizeConnectionString ( _specifications . ConnectionString ) ;
123116
124117 try
125118 {
126119 var result = new List < Dictionary < string , object > > ( ) ;
127- var connectionString = NormalizePostgresConnectionString ( _postgreSqlSpecifications . ConnectionString ) ;
128- var connection = new NpgsqlConnection ( connectionString ) ;
129- await connection . OpenAsync ( ) ;
130- using var cmd = new NpgsqlCommand ( sql , connection ) ;
120+ await using var connection = new NpgsqlConnection ( connectionString ) ;
121+ await connection . OpenAsync ( token ) ;
131122
123+ await using var cmd = new NpgsqlCommand ( sql , connection ) ;
132124 AddParameters ( cmd , sqlParams ) ;
133125
134- cancellationToken . ThrowIfCancellationRequested ( ) ;
135-
136- using var reader = await cmd . ExecuteReaderAsync ( cancellationToken ) ;
137-
138- while ( await reader . ReadAsync ( cancellationToken ) )
126+ await using var reader = await cmd . ExecuteReaderAsync ( token ) ;
127+ while ( await reader . ReadAsync ( token ) )
139128 {
140- var row = new Dictionary < string , object > ( ) ;
141- for ( int i = 0 ; i < reader . FieldCount ; i ++ )
142- {
143- row [ reader . GetName ( i ) ] = reader . GetValue ( i ) ;
144- }
129+ var row = Enumerable . Range ( 0 , reader . FieldCount )
130+ . ToDictionary ( reader . GetName , reader . GetValue ) ;
145131 result . Add ( row ) ;
146132 }
147133
148134 _logger ? . LogInfo ( $ "Query executed successfully. Rows returned: { result . Count } .") ;
149- string key = $ " { Guid . NewGuid ( ) . ToString ( ) } " ;
150- return new PluginContext ( key , "Data" )
135+
136+ return new PluginContext ( _guidProvider . NewGuid ( ) . ToString ( ) , "Data" )
151137 {
152138 Format = "Database" ,
153139 StructuredData = result
154140 } ;
155141 }
156142 catch ( Exception ex )
157143 {
158- _logger ? . LogError ( $ "Error executing PostgreSQL sql statement. Error : { ex . Message } ") ;
144+ _logger ? . LogError ( $ "Error executing PostgreSQL query : { ex . Message } ") ;
159145 throw ;
160146 }
161147 }
162148
163- private ( string Sql , Dictionary < string , object > Parameters ) GetSqlAndParameters ( InputParameter parameters )
149+ private async Task ExecuteStructuredDataAsync (
150+ NpgsqlConnection connection , string sql , PluginContext context , CancellationToken token )
164151 {
165- if ( string . IsNullOrEmpty ( parameters . Sql ) )
166- throw new ArgumentException ( "Missing 'sql' parameter." ) ;
167-
168- Dictionary < string , object > sqlParams = new ( ) ;
152+ int totalAffected = 0 ;
169153
170- if ( parameters . Params is Dictionary < string , object > paramDict )
154+ foreach ( var row in context . StructuredData )
171155 {
172- sqlParams = paramDict ;
156+ await using var cmd = new NpgsqlCommand ( sql , connection ) ;
157+ AddParameters ( cmd , row ) ;
158+ totalAffected += await cmd . ExecuteNonQueryAsync ( token ) ;
173159 }
174160
175- return ( parameters . Sql , sqlParams ) ;
161+ _logger ? . LogInfo ( $ "Executed structured SQL for { context . StructuredData . Count } rows. Total affected: { totalAffected } ") ;
162+ }
163+
164+ private async Task ExecuteSingleNonQueryAsync (
165+ NpgsqlConnection connection , string sql , Dictionary < string , object > sqlParams , CancellationToken token )
166+ {
167+ await using var cmd = new NpgsqlCommand ( sql , connection ) ;
168+ AddParameters ( cmd , sqlParams ) ;
169+ int affected = await cmd . ExecuteNonQueryAsync ( token ) ;
170+ _logger ? . LogInfo ( $ "Non-query executed successfully. Rows affected: { affected } .") ;
171+ }
172+
173+ private ( string Sql , Dictionary < string , object > Params ) ExtractSqlParameters ( InputParameter input )
174+ {
175+ if ( string . IsNullOrWhiteSpace ( input . Sql ) )
176+ throw new ArgumentException ( "Missing 'sql' parameter." ) ;
177+
178+ var sqlParams = input . Params as Dictionary < string , object > ?? new ( ) ;
179+ return ( input . Sql , sqlParams ) ;
176180 }
177181
178182 private void AddParameters ( NpgsqlCommand cmd , Dictionary < string , object > ? parameters )
179183 {
180- if ( parameters == null ) return ;
184+ if ( parameters is null ) return ;
181185
182- foreach ( var kvp in parameters )
186+ foreach ( var ( key , value ) in parameters )
183187 {
184- var paramName = kvp . Key ;
185- var paramValue = kvp . Value ;
188+ var name = key . StartsWith ( "@" ) ? key : "@" + key ;
186189
187- if ( paramValue == null )
188- {
189- cmd . Parameters . AddWithValue ( paramName , DBNull . Value ) ;
190- }
191- else if ( paramValue is Guid guidValue )
190+ cmd . Parameters . Add ( value switch
192191 {
193- cmd . Parameters . Add ( paramName , NpgsqlDbType . Uuid ) . Value = guidValue ;
194- }
195- else if ( paramValue is string strValue )
196- {
197- // Try to detect if string is Guid
198- if ( Guid . TryParse ( strValue , out var parsedGuid ) )
199- {
200- cmd . Parameters . Add ( paramName , NpgsqlDbType . Uuid ) . Value = parsedGuid ;
201- }
202- else
203- {
204- cmd . Parameters . AddWithValue ( paramName , strValue ) ;
205- }
206- }
207- else if ( paramValue is int || paramValue is long || paramValue is double || paramValue is decimal )
208- {
209- cmd . Parameters . AddWithValue ( paramName , paramValue ) ;
210- }
211- else if ( paramValue is bool boolValue )
212- {
213- cmd . Parameters . AddWithValue ( paramName , boolValue ) ;
214- }
215- else if ( paramValue is DateTime dateValue )
216- {
217- cmd . Parameters . AddWithValue ( paramName , NpgsqlDbType . Timestamp ) . Value = dateValue ;
218- }
219- else
220- {
221- throw new InvalidOperationException ( $ "Unsupported parameter type for '{ paramName } ': { paramValue . GetType ( ) } ") ;
222- }
192+ null => new NpgsqlParameter ( name , DBNull . Value ) ,
193+ Guid g => new NpgsqlParameter ( name , NpgsqlDbType . Uuid ) { Value = g } ,
194+ string s when Guid . TryParse ( s , out var parsed ) => new NpgsqlParameter ( name , NpgsqlDbType . Uuid ) { Value = parsed } ,
195+ string s => new NpgsqlParameter ( name , s ) ,
196+ int or long or double or decimal or bool => new NpgsqlParameter ( name , value ) ,
197+ DateTime dt => new NpgsqlParameter ( name , NpgsqlDbType . Timestamp ) { Value = dt } ,
198+ _ => throw new InvalidOperationException ( $ "Unsupported parameter type for '{ name } ': { value . GetType ( ) } ")
199+ } ) ;
223200 }
224201 }
225202
226- private string NormalizePostgresConnectionString ( string ? input )
203+ private string NormalizeConnectionString ( string ? input )
227204 {
228205 if ( string . IsNullOrWhiteSpace ( input ) )
229- throw new Exception ( "Database connectionstring is required!" ) ;
206+ throw new InvalidOperationException ( "Database connection string is required." ) ;
207+
208+ if ( ! input . StartsWith ( "postgres://" ) && ! input . StartsWith ( "postgresql://" ) )
209+ return input ;
210+
211+ var uri = new Uri ( input ) ;
212+ var userInfo = uri . UserInfo . Split ( ':' , 2 ) ;
213+ var username = userInfo [ 0 ] ;
214+ var password = userInfo . Length > 1 ? userInfo [ 1 ] : "" ;
230215
231- if ( input . StartsWith ( "postgres://" ) || input . StartsWith ( "postgresql://" ) )
216+ return new NpgsqlConnectionStringBuilder
232217 {
233- var uri = new Uri ( input ) ;
234- var userInfo = uri . UserInfo . Split ( ':' , 2 ) ;
235- var username = userInfo [ 0 ] ;
236- var password = userInfo . Length > 1 ? userInfo [ 1 ] : "" ;
237- var host = uri . Host ;
238- var port = uri . Port == - 1 ? 5432 : uri . Port ;
239-
240- return new NpgsqlConnectionStringBuilder
241- {
242- Host = host ,
243- Port = port ,
244- Username = username ,
245- Password = password ,
246- Database = uri . AbsolutePath . TrimStart ( '/' ) ,
247- SslMode = SslMode . Require
248- } . ConnectionString ;
249- }
218+ Host = uri . Host ,
219+ Port = uri . Port == - 1 ? 5432 : uri . Port ,
220+ Username = username ,
221+ Password = password ,
222+ Database = uri . AbsolutePath . TrimStart ( '/' ) ,
223+ SslMode = SslMode . Require
224+ } . ConnectionString ;
225+ }
226+
227+ private PluginContext ParseInputData ( object ? data )
228+ {
229+ if ( data is null )
230+ throw new ArgumentNullException ( nameof ( data ) , "Input data cannot be null." ) ;
250231
251- return input ;
232+ return data switch
233+ {
234+ PluginContext context => context ,
235+ IEnumerable < PluginContext > => throw new NotSupportedException ( "List of PluginContext is not supported." ) ,
236+ _ => throw new NotSupportedException ( "Unsupported input data format." )
237+ } ;
252238 }
239+
240+ private void ThrowIfReflection ( )
241+ {
242+ if ( _reflectionGuard . IsCalledViaReflection ( ) )
243+ throw new InvalidOperationException ( Resources . ReflectionBasedAccessIsNotAllowed ) ;
244+ }
245+
246+ private void ThrowIfNotInitialized ( )
247+ {
248+ if ( ! _isInitialized )
249+ throw new InvalidOperationException ( $ "Plugin '{ Metadata . Name } ' v{ Metadata . Version } is not initialized.") ;
250+ }
251+
252+ #endregion
253253}
0 commit comments