@@ -4,6 +4,7 @@ use analyser::AnalyserVisitorBuilder;
44use change:: StatementChange ;
55use dashmap:: { DashMap , DashSet } ;
66use document:: { Document , Statement } ;
7+ use futures:: { stream, StreamExt } ;
78use pg_analyse:: { AnalyserOptions , AnalysisFilter } ;
89use pg_analyser:: { Analyser , AnalyserConfig , AnalyserContext } ;
910use pg_diagnostics:: { serde:: Diagnostic as SDiagnostic , Diagnostic , DiagnosticExt , Severity } ;
@@ -13,6 +14,7 @@ use pg_schema_cache::SchemaCache;
1314use pg_typecheck:: TypecheckParams ;
1415use sqlx:: PgPool ;
1516use std:: sync:: LazyLock ;
17+ use text_size:: TextRange ;
1618use tokio:: runtime:: Runtime ;
1719use tracing:: info;
1820use tree_sitter:: TreeSitterStore ;
@@ -338,72 +340,95 @@ impl Workspace for WorkspaceServer {
338340 filter,
339341 } ) ;
340342
341- let con = self . connection . read ( ) . unwrap ( ) . get_pool ( ) ;
342-
343- let diagnostics: Vec < SDiagnostic > = doc
344- . iter_statements_with_text_and_range ( )
345- . flat_map ( |( stmt, r, text) | {
346- let mut stmt_diagnostics = self . pg_query . get_diagnostics ( & stmt) ;
347-
348- let ast = self . pg_query . get_ast ( & stmt) ;
349- let tree = self . tree_sitter . get_parse_tree ( & stmt) ;
350-
351- if let Some ( ast) = ast {
352- stmt_diagnostics. extend (
353- analyser
354- . run ( AnalyserContext { root : & ast } )
355- . into_iter ( )
356- . map ( SDiagnostic :: new)
357- . collect :: < Vec < _ > > ( ) ,
358- ) ;
359-
360- if let Some ( con) = con. clone ( ) {
361- let text = text. to_string ( ) ;
362- let ast_clone = ast. clone ( ) ;
363- let tree_clone = tree. clone ( ) ;
364- if let Ok ( typecheck_result) = run_async ( async move {
365- pg_typecheck:: check_sql ( TypecheckParams {
366- conn : & con,
367- sql : & text,
368- ast : & ast_clone,
369- tree : tree_clone. as_deref ( ) ,
370- } )
371- . await
372- } ) {
373- if let Some ( typecheck_result) = typecheck_result. map ( SDiagnostic :: new) {
374- stmt_diagnostics. push ( typecheck_result) ;
343+ let mut diagnostics: Vec < SDiagnostic > = vec ! [ ] ;
344+
345+ // run diagnostics for each statement in parallel if its mostly i/o work
346+ if let Some ( pool) = self . connection . read ( ) . unwrap ( ) . get_pool ( ) {
347+ let typecheck_params: Vec < _ > = doc
348+ . iter_statements_with_text_and_range ( )
349+ . map ( |( stmt, range, text) | {
350+ let ast = self . pg_query . get_ast ( & stmt) ;
351+ let tree = self . tree_sitter . get_parse_tree ( & stmt) ;
352+ ( text. to_string ( ) , ast, tree, * range)
353+ } )
354+ . collect ( ) ;
355+
356+ let pool_clone = pool. clone ( ) ;
357+ let path_clone = params. path . clone ( ) ;
358+ let async_results = run_async ( async move {
359+ stream:: iter ( typecheck_params)
360+ . map ( |( text, ast, tree, range) | {
361+ let pool = pool_clone. clone ( ) ;
362+ let path = path_clone. clone ( ) ;
363+ async move {
364+ if let Some ( ast) = ast {
365+ pg_typecheck:: check_sql ( TypecheckParams {
366+ conn : & pool,
367+ sql : & text,
368+ ast : & ast,
369+ tree : tree. as_deref ( ) ,
370+ } )
371+ . await
372+ . map ( |d| {
373+ let r = d. location ( ) . span . map ( |span| span + range. start ( ) ) ;
374+
375+ d. with_file_path ( path. as_path ( ) . display ( ) . to_string ( ) )
376+ . with_file_span ( r. unwrap_or ( range) )
377+ } )
378+ } else {
379+ None
375380 }
376381 }
377- }
378- }
379-
380- stmt_diagnostics
381- . into_iter ( )
382- . map ( |d| {
383- // We do now check if the severity of the diagnostics should be changed.
384- // The configuration allows to change the severity of the diagnostics emitted by rules.
385- let severity = d
386- . category ( )
387- . filter ( |category| category. name ( ) . starts_with ( "lint/" ) )
388- . map_or_else (
389- || d. severity ( ) ,
390- |category| {
391- settings
392- . as_ref ( )
393- . get_severity_from_rule_code ( category)
394- . unwrap_or ( Severity :: Warning )
395- } ,
396- ) ;
397-
398- SDiagnostic :: new (
399- d. with_file_path ( params. path . as_path ( ) . display ( ) . to_string ( ) )
400- . with_file_span ( r)
401- . with_severity ( severity) ,
402- )
403382 } )
383+ . buffer_unordered ( 10 )
404384 . collect :: < Vec < _ > > ( )
405- } )
406- . collect ( ) ;
385+ . await
386+ } ) ?;
387+
388+ for result in async_results. into_iter ( ) . flatten ( ) {
389+ diagnostics. push ( SDiagnostic :: new ( result) ) ;
390+ }
391+ }
392+
393+ diagnostics. extend ( doc. iter_statements_with_range ( ) . flat_map ( |( stmt, r) | {
394+ let mut stmt_diagnostics = self . pg_query . get_diagnostics ( & stmt) ;
395+
396+ let ast = self . pg_query . get_ast ( & stmt) ;
397+
398+ if let Some ( ast) = ast {
399+ stmt_diagnostics. extend (
400+ analyser
401+ . run ( AnalyserContext { root : & ast } )
402+ . into_iter ( )
403+ . map ( SDiagnostic :: new)
404+ . collect :: < Vec < _ > > ( ) ,
405+ ) ;
406+ }
407+
408+ stmt_diagnostics
409+ . into_iter ( )
410+ . map ( |d| {
411+ let severity = d
412+ . category ( )
413+ . filter ( |category| category. name ( ) . starts_with ( "lint/" ) )
414+ . map_or_else (
415+ || d. severity ( ) ,
416+ |category| {
417+ settings
418+ . as_ref ( )
419+ . get_severity_from_rule_code ( category)
420+ . unwrap_or ( Severity :: Warning )
421+ } ,
422+ ) ;
423+
424+ SDiagnostic :: new (
425+ d. with_file_path ( params. path . as_path ( ) . display ( ) . to_string ( ) )
426+ . with_file_span ( r)
427+ . with_severity ( severity) ,
428+ )
429+ } )
430+ . collect :: < Vec < _ > > ( )
431+ } ) ) ;
407432
408433 let errors = diagnostics
409434 . iter ( )
0 commit comments