Skip to content

Commit 5304010

Browse files
authored
feat: psql \l queries (#175)
* feat: psql \l queries * feat: hard code libc as locale provider
1 parent eddf428 commit 5304010

File tree

3 files changed

+76
-11
lines changed

3 files changed

+76
-11
lines changed

datafusion-postgres/src/pg_catalog.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use std::sync::Arc;
44

55
use async_trait::async_trait;
66
use datafusion::arrow::array::{
7-
as_boolean_array, ArrayRef, BooleanBuilder, RecordBatch, StringArray, StringBuilder,
7+
as_boolean_array, ArrayRef, AsArray, BooleanBuilder, RecordBatch, StringArray, StringBuilder,
88
};
9-
use datafusion::arrow::datatypes::{DataType, Field, SchemaRef};
9+
use datafusion::arrow::datatypes::{DataType, Field, Int32Type, SchemaRef};
1010
use datafusion::arrow::ipc::reader::FileReader;
1111
use datafusion::catalog::streaming::StreamingTable;
1212
use datafusion::catalog::{MemTable, SchemaProvider, TableFunctionImpl};
@@ -1109,7 +1109,7 @@ pub fn create_pg_get_statisticsobjdef_columns_udf() -> ScalarUDF {
11091109
let args = ColumnarValue::values_to_arrays(args)?;
11101110
let oid = &args[0];
11111111

1112-
let mut builder = BooleanBuilder::new();
1112+
let mut builder = StringBuilder::new();
11131113
for _ in 0..oid.len() {
11141114
builder.append_null();
11151115
}
@@ -1128,6 +1128,34 @@ pub fn create_pg_get_statisticsobjdef_columns_udf() -> ScalarUDF {
11281128
)
11291129
}
11301130

1131+
pub fn create_pg_encoding_to_char_udf() -> ScalarUDF {
1132+
let func = move |args: &[ColumnarValue]| {
1133+
let args = ColumnarValue::values_to_arrays(args)?;
1134+
let encoding = &args[0].as_primitive::<Int32Type>();
1135+
1136+
let mut builder = StringBuilder::new();
1137+
for i in 0..encoding.len() {
1138+
if encoding.value(i) == 6 {
1139+
builder.append_value("UTF-8");
1140+
} else {
1141+
builder.append_null();
1142+
}
1143+
}
1144+
1145+
let array: ArrayRef = Arc::new(builder.finish());
1146+
1147+
Ok(ColumnarValue::Array(array))
1148+
};
1149+
1150+
create_udf(
1151+
"pg_encoding_to_char",
1152+
vec![DataType::Int32],
1153+
DataType::Utf8,
1154+
Volatility::Stable,
1155+
Arc::new(func),
1156+
)
1157+
}
1158+
11311159
/// Install pg_catalog and postgres UDFs to current `SessionContext`
11321160
pub fn setup_pg_catalog(
11331161
session_context: &SessionContext,
@@ -1169,6 +1197,7 @@ pub fn setup_pg_catalog(
11691197
session_context.register_udf(create_pg_get_partkeydef_udf());
11701198
session_context.register_udf(create_pg_relation_is_publishable_udf());
11711199
session_context.register_udf(create_pg_get_statisticsobjdef_columns_udf());
1200+
session_context.register_udf(create_pg_encoding_to_char_udf());
11721201

11731202
Ok(())
11741203
}

datafusion-postgres/src/pg_catalog/pg_database.rs

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ use std::collections::HashMap;
22
use std::sync::atomic::{AtomicU32, Ordering};
33
use std::sync::Arc;
44

5-
use datafusion::arrow::array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
6-
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
5+
use datafusion::arrow::array::{
6+
ArrayRef, BooleanArray, Int32Array, ListArray, RecordBatch, StringArray,
7+
};
8+
use datafusion::arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef};
79
use datafusion::error::Result;
810
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
911
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
@@ -36,16 +38,23 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
3638
Field::new("datname", DataType::Utf8, false), // Database name
3739
Field::new("datdba", DataType::Int32, false), // Database owner's user ID
3840
Field::new("encoding", DataType::Int32, false), // Character encoding
41+
Field::new("datlocprovider", DataType::Utf8, false),
3942
Field::new("datcollate", DataType::Utf8, false), // LC_COLLATE for this database
40-
Field::new("datctype", DataType::Utf8, false), // LC_CTYPE for this database
43+
Field::new("datctype", DataType::Utf8, false), // LC_CTYPE for this database
4144
Field::new("datistemplate", DataType::Boolean, false), // If true, database can be used as a template
4245
Field::new("datallowconn", DataType::Boolean, false), // If false, no one can connect to this database
4346
Field::new("datconnlimit", DataType::Int32, false), // Max number of concurrent connections (-1=no limit)
4447
Field::new("datlastsysoid", DataType::Int32, false), // Last system OID in database
4548
Field::new("datfrozenxid", DataType::Int32, false), // Frozen XID for this database
4649
Field::new("datminmxid", DataType::Int32, false), // Minimum multixact ID
4750
Field::new("dattablespace", DataType::Int32, false), // Default tablespace for this database
48-
Field::new("datacl", DataType::Utf8, true), // Access privileges
51+
Field::new("daticulocale", DataType::Utf8, true),
52+
Field::new("daticurules", DataType::Utf8, true),
53+
Field::new(
54+
"datacl",
55+
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
56+
true,
57+
), // Access privileges
4958
]));
5059

5160
Self {
@@ -63,6 +72,7 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
6372
let mut datnames = Vec::new();
6473
let mut datdbas = Vec::new();
6574
let mut encodings = Vec::new();
75+
let mut datlocproviders = Vec::new();
6676
let mut datcollates = Vec::new();
6777
let mut datctypes = Vec::new();
6878
let mut datistemplates = Vec::new();
@@ -72,7 +82,9 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
7282
let mut datfrozenxids = Vec::new();
7383
let mut datminmxids = Vec::new();
7484
let mut dattablespaces = Vec::new();
75-
let mut datacles: Vec<Option<String>> = Vec::new();
85+
let mut daticulocales: Vec<Option<String>> = Vec::new();
86+
let mut daticurules: Vec<Option<String>> = Vec::new();
87+
let mut datacls: Vec<Option<Vec<Option<i32>>>> = Vec::new();
7688

7789
// to store all schema-oid mapping temporarily before adding to global oid cache
7890
let mut catalog_oid_cache = HashMap::new();
@@ -93,6 +105,7 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
93105
datnames.push(catalog_name.clone());
94106
datdbas.push(10); // Default owner (assuming 10 = postgres user)
95107
encodings.push(6); // 6 = UTF8 in PostgreSQL
108+
datlocproviders.push("libc".to_string());
96109
datcollates.push("en_US.UTF-8".to_string()); // Default collation
97110
datctypes.push("en_US.UTF-8".to_string()); // Default ctype
98111
datistemplates.push(false);
@@ -102,7 +115,9 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
102115
datfrozenxids.push(1); // Simplified transaction ID
103116
datminmxids.push(1); // Simplified multixact ID
104117
dattablespaces.push(1663); // Default tablespace (1663 = pg_default in PostgreSQL)
105-
datacles.push(None); // No specific ACLs
118+
daticulocales.push(None);
119+
daticurules.push(None);
120+
datacls.push(None); // No specific ACLs
106121
}
107122

108123
// Always include a "postgres" database entry if not already present
@@ -121,6 +136,7 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
121136
datnames.push(default_datname);
122137
datdbas.push(10);
123138
encodings.push(6);
139+
datlocproviders.push("libc".to_string());
124140
datcollates.push("en_US.UTF-8".to_string());
125141
datctypes.push("en_US.UTF-8".to_string());
126142
datistemplates.push(false);
@@ -130,7 +146,9 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
130146
datfrozenxids.push(1);
131147
datminmxids.push(1);
132148
dattablespaces.push(1663);
133-
datacles.push(None);
149+
daticulocales.push(None);
150+
daticurules.push(None);
151+
datacls.push(None);
134152
}
135153

136154
// Create Arrow arrays from the collected data
@@ -139,6 +157,7 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
139157
Arc::new(StringArray::from(datnames)),
140158
Arc::new(Int32Array::from(datdbas)),
141159
Arc::new(Int32Array::from(encodings)),
160+
Arc::new(StringArray::from(datlocproviders)),
142161
Arc::new(StringArray::from(datcollates)),
143162
Arc::new(StringArray::from(datctypes)),
144163
Arc::new(BooleanArray::from(datistemplates)),
@@ -148,7 +167,11 @@ impl<C: CatalogInfo> PgDatabaseTable<C> {
148167
Arc::new(Int32Array::from(datfrozenxids)),
149168
Arc::new(Int32Array::from(datminmxids)),
150169
Arc::new(Int32Array::from(dattablespaces)),
151-
Arc::new(StringArray::from_iter(datacles.into_iter())),
170+
Arc::new(StringArray::from(daticulocales)),
171+
Arc::new(StringArray::from(daticurules)),
172+
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(
173+
datacls.into_iter(),
174+
)),
152175
];
153176

154177
// Create a full record batch

datafusion-postgres/tests/psql.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,19 @@ const PSQL_QUERIES: &[&str] = &[
9292
FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i
9393
WHERE c.oid = i.inhrelid AND i.inhparent = '16384'
9494
ORDER BY pg_catalog.pg_get_expr(c.relpartbound, c.oid) = 'DEFAULT', c.oid::pg_catalog.regclass::pg_catalog.text;",
95+
96+
r#"SELECT
97+
d.datname as "Name",
98+
pg_catalog.pg_get_userbyid(d.datdba) as "Owner",
99+
pg_catalog.pg_encoding_to_char(d.encoding) as "Encoding",
100+
CASE d.datlocprovider WHEN 'b' THEN 'builtin' WHEN 'c' THEN 'libc' WHEN 'i' THEN 'icu' END AS "Locale Provider",
101+
d.datcollate as "Collate",
102+
d.datctype as "Ctype",
103+
d.daticulocale as "Locale",
104+
d.daticurules as "ICU Rules",
105+
CASE WHEN pg_catalog.array_length(d.datacl, 1) = 0 THEN '(none)' ELSE pg_catalog.array_to_string(d.datacl, E'\n') END AS "Access privileges"
106+
FROM pg_catalog.pg_database d
107+
ORDER BY 1;"#,
95108
];
96109

97110
#[tokio::test]

0 commit comments

Comments
 (0)