From 8054ec51a1f2fa2bc3eb6334eef0f137e6754f69 Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Tue, 25 Apr 2023 15:17:16 -0400 Subject: [PATCH] Use pqarrow in zio/parquetio Reading and writing are much faster with it than with github.com/fraugster/parquet-go. Its only apparent drawback is that it offers no easy way to support Zed's duration and float16 types, and writing a value containing either produces a cryptic error. $ echo '{a:1.(float16)}' | zq -f parquet - parquetio: unsupported type: not implemented yet Closes #764, closes #4278, and closes #4527. --- docs/commands/zq.md | 2 +- go.mod | 1 - go.sum | 25 --- zed_test.go | 9 +- zio/arrowio/writer.go | 21 +- zio/parquetio/builder.go | 88 -------- zio/parquetio/data.go | 109 ---------- zio/parquetio/reader.go | 51 ++--- zio/parquetio/schemadefinition.go | 277 ------------------------ zio/parquetio/type.go | 195 ----------------- zio/parquetio/writer.go | 69 +++--- zio/parquetio/ztests/non-record.yaml | 2 +- zio/parquetio/ztests/types.yaml | 10 +- zio/parquetio/ztests/writer-errors.yaml | 15 ++ 14 files changed, 88 insertions(+), 786 deletions(-) delete mode 100644 zio/parquetio/builder.go delete mode 100644 zio/parquetio/data.go delete mode 100644 zio/parquetio/schemadefinition.go delete mode 100644 zio/parquetio/type.go create mode 100644 zio/parquetio/ztests/writer-errors.yaml diff --git a/docs/commands/zq.md b/docs/commands/zq.md index 9de8667e65..43fc6bd559 100644 --- a/docs/commands/zq.md +++ b/docs/commands/zq.md @@ -302,7 +302,7 @@ echo '{x:1}{s:"hello"}' | zq -o out.parquet -f parquet - ``` causes this error ```mdtest-output -Parquet output requires uniform records but multiple types encountered (consider 'fuse') +parquetio: encountered multiple types (consider 'fuse'): {x:int64} and {s:string} ``` #### 3.4.1 Fusing Schemas diff --git a/go.mod b/go.mod index 4a6bd1638c..6abe261331 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de github.com/aws/aws-sdk-go v1.36.17 github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f - github.com/fraugster/parquet-go v0.10.1-0.20220222153523-e6b70a8a7212 github.com/go-redis/redis/v8 v8.4.11 github.com/golang-jwt/jwt/v4 v4.4.3 github.com/golang/mock v1.5.0 diff --git a/go.sum b/go.sum index 2574a23651..65264225a4 100644 --- a/go.sum +++ b/go.sum @@ -55,7 +55,6 @@ github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de h1:FxWPpzIjnTlhP github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= -github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/aws/aws-sdk-go v1.36.17 h1:8zTvseyGhgs3uQAzkgnFy7dvTo+ZnZLYmrhnopFxYME= github.com/aws/aws-sdk-go v1.36.17/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/axiomhq/hyperloglog v0.0.0-20191112132149-a4c4c47bc57f h1:y06x6vGnFYfXUoVMbrcP1Uzpj4JG01eB5vRps9G8agM= @@ -75,10 +74,6 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= -github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= -github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= -github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= -github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -93,8 +88,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/fraugster/parquet-go v0.10.1-0.20220222153523-e6b70a8a7212 h1:u7X3aZRlWSm18x0EysX9szRULhH7QYQv7UkxW1yHbik= -github.com/fraugster/parquet-go v0.10.1-0.20220222153523-e6b70a8a7212/go.mod h1:dGzUxdNqXsAijatByVgbAWVPlFirnhknQbdazcUIjY0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -189,10 +182,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru/v2 v2.0.1 h1:5pv5N1lT1fjLg2VQ5KWc7kmucp2x/kvFOnxuVTqZ6x4= github.com/hashicorp/golang-lru/v2 v2.0.1/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= -github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb v1.7.6/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= @@ -225,7 +216,6 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= -github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -238,8 +228,6 @@ github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpsp github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= -github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -261,7 +249,6 @@ github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/pbnjay/memory v0.0.0-20190104145345-974d429e7ae4 h1:MfIUBZ1bz7TgvQLVa/yPJZOGeKEgs6eTKUjz3zB4B+U= github.com/pbnjay/memory v0.0.0-20190104145345-974d429e7ae4/go.mod h1:RMU2gJXhratVxBDTFeOdNhd540tG57lt9FIUV0YLvIQ= -github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/peterh/liner v1.1.0 h1:f+aAedNJA6uk7+6rXsYBnhdo4Xux7ESLe+kcuVUF5os= github.com/peterh/liner v1.1.0/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0= github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= @@ -306,7 +293,6 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rs/cors v1.8.0 h1:P2KMzcFwrPoSjkF1WLRPsp3UMLyql8L4v9hQpVeK5so= github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM= -github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg= github.com/segmentio/ksuid v1.0.2 h1:9yBfKyw4ECGTdALaF09Snw3sLJmYIX6AbPJrAy6MrDc= github.com/segmentio/ksuid v1.0.2/go.mod h1:BXuJDr2byAiHuQaQtSKoXh1J0YmUDurywOXgB2w+OSU= @@ -314,13 +300,6 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= -github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= -github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= -github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= -github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= -github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= -github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= @@ -334,11 +313,9 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= -github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= -github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -362,7 +339,6 @@ go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95a go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -461,7 +437,6 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/zed_test.go b/zed_test.go index adfc4473e6..133b8d1351 100644 --- a/zed_test.go +++ b/zed_test.go @@ -17,7 +17,6 @@ import ( "github.com/brimdata/zed/zio" "github.com/brimdata/zed/zio/anyio" "github.com/brimdata/zed/zio/arrowio" - "github.com/brimdata/zed/zio/parquetio" "github.com/brimdata/zed/zio/zngio" "github.com/brimdata/zed/ztest" "github.com/stretchr/testify/assert" @@ -154,13 +153,7 @@ func runOneBoomerang(t *testing.T, format, data string) { if err != nil { if errors.Is(err, arrowio.ErrMultipleTypes) || errors.Is(err, arrowio.ErrNotRecord) || - errors.Is(err, arrowio.ErrUnsupportedType) || - errors.Is(err, parquetio.ErrEmptyRecordType) || - errors.Is(err, parquetio.ErrNullType) || - errors.Is(err, parquetio.ErrUnionType) || - strings.Contains(err.Error(), "Parquet output encountered non-record value") || - strings.Contains(err.Error(), "Parquet output requires uniform records but multiple types encountered") || - strings.Contains(err.Error(), "column has no name") { + errors.Is(err, arrowio.ErrUnsupportedType) { t.Skipf("skipping due to expected error: %s", err) } t.Fatalf("unexpected error writing %s baseline: %s", format, err) diff --git a/zio/arrowio/writer.go b/zio/arrowio/writer.go index ec3629ad4f..bacac41954 100644 --- a/zio/arrowio/writer.go +++ b/zio/arrowio/writer.go @@ -35,15 +35,27 @@ var ( // dictionaries are not part of the Zed data model, write support could be added // using a named type.) type Writer struct { + NewWriterFunc func(io.Writer, *arrow.Schema) (WriteCloser, error) w io.WriteCloser - writer *ipc.Writer + writer WriteCloser builder *array.RecordBuilder unionTagMappings map[zed.Type][]int typ *zed.TypeRecord } +type WriteCloser interface { + Write(rec arrow.Record) error + Close() error +} + func NewWriter(w io.WriteCloser) *Writer { - return &Writer{w: w, unionTagMappings: map[zed.Type][]int{}} + return &Writer{ + NewWriterFunc: func(w io.Writer, s *arrow.Schema) (WriteCloser, error) { + return ipc.NewWriter(w, ipc.WithSchema(s)), nil + }, + w: w, + unionTagMappings: map[zed.Type][]int{}, + } } func (w *Writer) Close() error { @@ -78,7 +90,10 @@ func (w *Writer) Write(val *zed.Value) error { schema := arrow.NewSchema(dt.(*arrow.StructType).Fields(), nil) w.builder = array.NewRecordBuilder(memory.DefaultAllocator, schema) w.builder.Reserve(recordBatchSize) - w.writer = ipc.NewWriter(w.w, ipc.WithSchema(schema)) + w.writer, err = w.NewWriterFunc(w.w, schema) + if err != nil { + return err + } } else if w.typ != recType { return fmt.Errorf("%w: %s and %s", ErrMultipleTypes, zson.FormatType(w.typ), zson.FormatType(recType)) } diff --git a/zio/parquetio/builder.go b/zio/parquetio/builder.go deleted file mode 100644 index b1b50dfcbb..0000000000 --- a/zio/parquetio/builder.go +++ /dev/null @@ -1,88 +0,0 @@ -package parquetio - -import ( - "fmt" - - "github.com/brimdata/zed" - "github.com/brimdata/zed/zcode" -) - -type builder struct { - zcode.Builder - buf []byte -} - -func (b *builder) appendValue(typ zed.Type, v interface{}) { - switch v := v.(type) { - case nil: - b.Append(nil) - case []byte: - b.Append(v) - case bool: - b.buf = zed.AppendBool(b.buf[:0], v) - b.Append(b.buf) - case float32: - b.buf = zed.AppendFloat32(b.buf[:0], v) - b.Append(b.buf) - case float64: - b.buf = zed.AppendFloat64(b.buf[:0], v) - b.Append(b.buf) - case int32: - if zed.IsSigned(typ.ID()) { - b.buf = zed.AppendInt(b.buf[:0], int64(v)) - } else { - b.buf = zed.AppendUint(b.buf[:0], uint64(v)) - } - b.Append(b.buf) - case int64: - if zed.IsSigned(typ.ID()) { - b.buf = zed.AppendInt(b.buf[:0], v) - } else { - b.buf = zed.AppendUint(b.buf[:0], uint64(v)) - } - b.Append(b.buf) - case [12]uint8: - // This is an INT96. - b.Append(v[:]) - case map[string]interface{}: - switch typ := zed.TypeUnder(typ).(type) { - case *zed.TypeArray: - switch v := v["list"].(type) { - case nil: - b.Append(nil) - case []map[string]interface{}: - b.BeginContainer() - for _, m := range v { - b.appendValue(typ.Type, m["element"]) - } - b.EndContainer() - default: - panic(fmt.Sprintf("unknown type %T", v)) - } - case *zed.TypeMap: - switch v := v["key_value"].(type) { - case nil: - b.Append(nil) - case []map[string]interface{}: - b.BeginContainer() - for _, m := range v { - b.appendValue(typ.KeyType, m["key"]) - b.appendValue(typ.ValType, m["value"]) - } - b.EndContainer() - default: - panic(fmt.Sprintf("unknown type %T", v)) - } - case *zed.TypeRecord: - b.BeginContainer() - for _, f := range typ.Fields { - b.appendValue(f.Type, v[f.Name]) - } - b.EndContainer() - default: - panic(fmt.Sprintf("unknown type %T", typ)) - } - default: - panic(fmt.Sprintf("unknown type %T", v)) - } -} diff --git a/zio/parquetio/data.go b/zio/parquetio/data.go deleted file mode 100644 index afeb72d6dc..0000000000 --- a/zio/parquetio/data.go +++ /dev/null @@ -1,109 +0,0 @@ -package parquetio - -import ( - "errors" - "fmt" - - "github.com/brimdata/zed" - "github.com/brimdata/zed/zcode" - "github.com/brimdata/zed/zson" - "golang.org/x/exp/slices" -) - -func newData(typ zed.Type, zb zcode.Bytes) (interface{}, error) { - if zb == nil { - return nil, nil - } - switch typ := zed.TypeUnder(typ).(type) { - case *zed.TypeOfUint8, *zed.TypeOfUint16, *zed.TypeOfUint32: - return int32(zed.DecodeUint(zb)), nil - case *zed.TypeOfUint64: - return int64(zed.DecodeUint(zb)), nil - case *zed.TypeOfInt8, *zed.TypeOfInt16, *zed.TypeOfInt32: - return int32(zed.DecodeInt(zb)), nil - case *zed.TypeOfInt64, *zed.TypeOfDuration, *zed.TypeOfTime: - return zed.DecodeInt(zb), nil - case *zed.TypeOfFloat16: - return zed.DecodeFloat16(zb), nil - case *zed.TypeOfFloat32: - return zed.DecodeFloat32(zb), nil - case *zed.TypeOfFloat64: - return zed.DecodeFloat64(zb), nil - // XXX add TypeDecimal - case *zed.TypeOfBool: - return zed.DecodeBool(zb), nil - case *zed.TypeOfBytes, *zed.TypeOfString: - // Copy zb since we don't own it. - return []byte(slices.Clone(zb)), nil - case *zed.TypeOfIP: - return []byte(zed.DecodeIP(zb).String()), nil - case *zed.TypeOfNet: - return []byte(zed.DecodeNet(zb).String()), nil - case *zed.TypeOfType: - return []byte(zson.FormatTypeValue(zb)), nil - case *zed.TypeOfNull: - return nil, ErrNullType - case *zed.TypeRecord: - return newRecordData(typ, zb) - case *zed.TypeArray: - return newListData(typ.Type, zb) - case *zed.TypeSet: - return newListData(typ.Type, zb) - case *zed.TypeUnion: - return nil, ErrUnionType - case *zed.TypeEnum: - id := zed.DecodeUint(zb) - if id >= uint64(len(typ.Symbols)) { - return nil, errors.New("enum index out of range") - } - return []byte(typ.Symbols[id]), nil - case *zed.TypeMap: - return newMapData(typ.KeyType, typ.ValType, zb) - case *zed.TypeError: - return []byte(zson.String(zed.Value{Type: typ, Bytes: zb})), nil - } - panic(fmt.Sprintf("unknown type %T", typ)) -} - -func newListData(typ zed.Type, zb zcode.Bytes) (map[string]interface{}, error) { - var elements []map[string]interface{} - for it := zb.Iter(); !it.Done(); { - v, err := newData(typ, it.Next()) - if err != nil { - return nil, err - } - elements = append(elements, map[string]interface{}{"element": v}) - } - return map[string]interface{}{"list": elements}, nil -} - -func newMapData(keyType, valType zed.Type, zb zcode.Bytes) (map[string]interface{}, error) { - var elements []map[string]interface{} - for i, it := 0, zb.Iter(); !it.Done(); i++ { - key, err := newData(keyType, it.Next()) - if err != nil { - return nil, err - } - val, err := newData(valType, it.Next()) - if err != nil { - return nil, err - } - elements = append(elements, map[string]interface{}{ - "key": key, - "value": val, - }) - } - return map[string]interface{}{"key_value": elements}, nil -} - -func newRecordData(typ *zed.TypeRecord, zb zcode.Bytes) (map[string]interface{}, error) { - m := make(map[string]interface{}, len(typ.Fields)) - for i, it := 0, zb.Iter(); !it.Done(); i++ { - v, err := newData(typ.Fields[i].Type, it.Next()) - if err != nil { - return nil, err - } - m[typ.Fields[i].Name] = v - } - return m, nil -} diff --git a/zio/parquetio/reader.go b/zio/parquetio/reader.go index 83becf0b07..1a3ccc600c 100644 --- a/zio/parquetio/reader.go +++ b/zio/parquetio/reader.go @@ -1,52 +1,45 @@ package parquetio import ( + "context" "errors" "io" + "github.com/apache/arrow/go/v12/arrow/memory" + "github.com/apache/arrow/go/v12/parquet" + "github.com/apache/arrow/go/v12/parquet/file" + "github.com/apache/arrow/go/v12/parquet/pqarrow" "github.com/brimdata/zed" - goparquet "github.com/fraugster/parquet-go" + "github.com/brimdata/zed/zio/arrowio" ) -type Reader struct { - fr *goparquet.FileReader - typ *zed.TypeRecord - - builder builder - val zed.Value -} - -func NewReader(zctx *zed.Context, r io.Reader) (*Reader, error) { - rs, ok := r.(io.ReadSeeker) +func NewReader(zctx *zed.Context, r io.Reader) (*arrowio.Reader, error) { + ras, ok := r.(parquet.ReaderAtSeeker) if !ok { return nil, errors.New("reader cannot seek") } - fr, err := goparquet.NewFileReader(rs) + pr, err := file.NewParquetReader(ras) if err != nil { return nil, err } - typ, err := newRecordType(zctx, fr.GetSchemaDefinition().RootColumn.Children) + props := pqarrow.ArrowReadProperties{ + Parallel: true, + BatchSize: 256 * 1024, + } + fr, err := pqarrow.NewFileReader(pr, props, memory.DefaultAllocator) if err != nil { + pr.Close() return nil, err } - return &Reader{ - fr: fr, - typ: typ, - }, nil -} - -func (r *Reader) Read() (*zed.Value, error) { - data, err := r.fr.NextRow() + rr, err := fr.GetRecordReader(context.TODO(), nil, nil) if err != nil { - if err == io.EOF { - return nil, nil - } + pr.Close() return nil, err } - r.builder.Truncate() - for _, f := range r.typ.Fields { - r.builder.appendValue(f.Type, data[f.Name]) + ar, err := arrowio.NewReaderFromRecordReader(zctx, rr) + if err != nil { + pr.Close() + return nil, err } - r.val = *zed.NewValue(r.typ, r.builder.Bytes()) - return &r.val, nil + return ar, nil } diff --git a/zio/parquetio/schemadefinition.go b/zio/parquetio/schemadefinition.go deleted file mode 100644 index 314d59ff0e..0000000000 --- a/zio/parquetio/schemadefinition.go +++ /dev/null @@ -1,277 +0,0 @@ -package parquetio - -import ( - "errors" - "fmt" - "math" - - "github.com/brimdata/zed" - "github.com/fraugster/parquet-go/parquet" - "github.com/fraugster/parquet-go/parquetschema" -) - -var ( - ErrEmptyRecordType = errors.New("empty record type unsupported") - ErrNullType = errors.New("null type unimplemented") - ErrUnionType = errors.New("union type unsupported") -) - -var ( - repetitionRequired = parquet.FieldRepetitionTypePtr(parquet.FieldRepetitionType_REQUIRED) - repetitionOptional = parquet.FieldRepetitionTypePtr(parquet.FieldRepetitionType_OPTIONAL) - repetitionRepeated = parquet.FieldRepetitionTypePtr(parquet.FieldRepetitionType_REPEATED) - - convertedUTF8 = parquet.ConvertedTypePtr(parquet.ConvertedType_UTF8) - convertedMap = parquet.ConvertedTypePtr(parquet.ConvertedType_MAP) - convertedMapKeyValue = parquet.ConvertedTypePtr(parquet.ConvertedType_MAP_KEY_VALUE) - convertedList = parquet.ConvertedTypePtr(parquet.ConvertedType_LIST) - convertedEnum = parquet.ConvertedTypePtr(parquet.ConvertedType_ENUM) - convertedDate = parquet.ConvertedTypePtr(parquet.ConvertedType_DATE) - convertedTimeMillis = parquet.ConvertedTypePtr(parquet.ConvertedType_TIME_MILLIS) - convertedTimeMicros = parquet.ConvertedTypePtr(parquet.ConvertedType_TIME_MICROS) - convertedTimestampMillis = parquet.ConvertedTypePtr(parquet.ConvertedType_TIMESTAMP_MILLIS) - convertedTimestampMicros = parquet.ConvertedTypePtr(parquet.ConvertedType_TIMESTAMP_MICROS) - convertedUint8 = parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_8) - convertedUint16 = parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_16) - convertedUint32 = parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_32) - convertedUint64 = parquet.ConvertedTypePtr(parquet.ConvertedType_UINT_64) - convertedInt8 = parquet.ConvertedTypePtr(parquet.ConvertedType_INT_8) - convertedInt16 = parquet.ConvertedTypePtr(parquet.ConvertedType_INT_16) - convertedInt32 = parquet.ConvertedTypePtr(parquet.ConvertedType_INT_32) - convertedInt64 = parquet.ConvertedTypePtr(parquet.ConvertedType_INT_64) - convertedJSON = parquet.ConvertedTypePtr(parquet.ConvertedType_JSON) - convertedBSON = parquet.ConvertedTypePtr(parquet.ConvertedType_BSON) - convertedInterval = parquet.ConvertedTypePtr(parquet.ConvertedType_INTERVAL) - - logicalString = &parquet.LogicalType{STRING: &parquet.StringType{}} - logicalMap = &parquet.LogicalType{MAP: &parquet.MapType{}} - logicalList = &parquet.LogicalType{LIST: &parquet.ListType{}} - logicalEnum = &parquet.LogicalType{ENUM: &parquet.EnumType{}} - logicalDate = &parquet.LogicalType{DATE: &parquet.DateType{}} - logicalTimeMillis = &parquet.LogicalType{TIME: &parquet.TimeType{Unit: timeUnitMillis}} - logicalTimeMicros = &parquet.LogicalType{TIME: &parquet.TimeType{Unit: timeUnitMicros}} - logicalTimeNanos = &parquet.LogicalType{TIME: &parquet.TimeType{Unit: timeUnitNanos}} - logicalTimestampMillis = &parquet.LogicalType{TIMESTAMP: &parquet.TimestampType{Unit: timeUnitMillis}} - logicalTimestampMicros = &parquet.LogicalType{TIMESTAMP: &parquet.TimestampType{Unit: timeUnitMicros}} - logicalTimestampNanos = &parquet.LogicalType{TIMESTAMP: &parquet.TimestampType{Unit: timeUnitNanos}} - logicalUint8 = &parquet.LogicalType{INTEGER: &parquet.IntType{BitWidth: 8}} - logicalUint16 = &parquet.LogicalType{INTEGER: &parquet.IntType{BitWidth: 16}} - logicalUint32 = &parquet.LogicalType{INTEGER: &parquet.IntType{BitWidth: 32}} - logicalUint64 = &parquet.LogicalType{INTEGER: &parquet.IntType{BitWidth: 64}} - logicalInt8 = &parquet.LogicalType{INTEGER: &parquet.IntType{BitWidth: 8, IsSigned: true}} - logicalInt16 = &parquet.LogicalType{INTEGER: &parquet.IntType{BitWidth: 16, IsSigned: true}} - logicalInt32 = &parquet.LogicalType{INTEGER: &parquet.IntType{BitWidth: 32, IsSigned: true}} - logicalInt64 = &parquet.LogicalType{INTEGER: &parquet.IntType{BitWidth: 64, IsSigned: true}} - logicalBSON = &parquet.LogicalType{BSON: &parquet.BsonType{}} - logicalJSON = &parquet.LogicalType{JSON: &parquet.JsonType{}} - logicalUUID = &parquet.LogicalType{UUID: &parquet.UUIDType{}} - - timeUnitMillis = &parquet.TimeUnit{MILLIS: &parquet.MilliSeconds{}} - timeUnitMicros = &parquet.TimeUnit{MICROS: &parquet.MicroSeconds{}} - timeUnitNanos = &parquet.TimeUnit{NANOS: &parquet.NanoSeconds{}} -) - -func newSchemaDefinition(typ *zed.TypeRecord) (*parquetschema.SchemaDefinition, error) { - c, err := newColumnDefinition("", typ) - if err != nil { - return nil, err - } - s := &parquetschema.SchemaDefinition{ - RootColumn: &parquetschema.ColumnDefinition{ - Children: c.Children, - SchemaElement: &parquet.SchemaElement{ - Name: "zq", - }, - }, - } - return s, s.ValidateStrict() -} - -func newColumnDefinition(name string, typ zed.Type) (*parquetschema.ColumnDefinition, error) { - switch typ := typ.(type) { - case *zed.TypeNamed: - switch id := typ.Type.ID(); { - case typ.Name == "date" && id == zed.IDInt32: - return newPrimitiveColumnDefinition(name, parquet.Type_INT32, convertedDate, logicalDate) - case typ.Name == "bson" && id == zed.IDBytes: - return newPrimitiveColumnDefinition(name, parquet.Type_BYTE_ARRAY, convertedBSON, logicalBSON) - case typ.Name == "interval" && id == zed.IDBytes: - return newPrimitiveColumnDefinition(name, parquet.Type_BYTE_ARRAY, convertedInterval, nil) - case typ.Name == "json" && id == zed.IDString: - return newPrimitiveColumnDefinition(name, parquet.Type_BYTE_ARRAY, convertedJSON, logicalJSON) - case typ.Name == "enum" && id == zed.IDString: - return newColumnDefinition(name, &zed.TypeEnum{}) - case typ.Name == "float" && id == zed.IDFloat64: - return newPrimitiveColumnDefinition(name, parquet.Type_FLOAT, nil, nil) - case typ.Name == "int96" && id == zed.IDBytes: - return newPrimitiveColumnDefinition(name, parquet.Type_INT96, nil, nil) - case typ.Name == "time_millis" && id == zed.IDInt32: - return newPrimitiveColumnDefinition( - name, parquet.Type_INT32, convertedTimeMillis, logicalTimeMillis) - case name == "time_micros" && id == zed.IDInt64: - return newPrimitiveColumnDefinition( - name, parquet.Type_INT64, convertedTimeMicros, logicalTimeMicros) - case name == "time_nanos" && id == zed.IDInt64: - return newPrimitiveColumnDefinition(name, parquet.Type_INT64, nil, logicalTimeNanos) - case name == "timestamp_millis" && id == zed.IDInt64: - return newPrimitiveColumnDefinition( - name, parquet.Type_INT64, convertedTimestampMillis, logicalTimestampMillis) - case name == "timestamp_micros" && id == zed.IDInt64: - return newPrimitiveColumnDefinition( - name, parquet.Type_INT64, convertedTimestampMicros, logicalTimestampMicros) - case name == "uuid" && id == zed.IDBytes: - return newPrimitiveColumnDefinition(name, parquet.Type_BYTE_ARRAY, nil, logicalUUID) - } - return newColumnDefinition(name, typ.Type) - case *zed.TypeOfUint8: - return newPrimitiveColumnDefinition(name, parquet.Type_INT32, convertedUint8, logicalUint8) - case *zed.TypeOfUint16: - return newPrimitiveColumnDefinition(name, parquet.Type_INT32, convertedUint16, logicalUint16) - case *zed.TypeOfUint32: - return newPrimitiveColumnDefinition(name, parquet.Type_INT32, convertedUint32, logicalUint32) - case *zed.TypeOfUint64: - return newPrimitiveColumnDefinition(name, parquet.Type_INT64, convertedUint64, logicalUint64) - case *zed.TypeOfInt8: - return newPrimitiveColumnDefinition(name, parquet.Type_INT32, convertedInt8, logicalInt8) - case *zed.TypeOfInt16: - return newPrimitiveColumnDefinition(name, parquet.Type_INT32, convertedInt16, logicalInt16) - case *zed.TypeOfInt32: - return newPrimitiveColumnDefinition(name, parquet.Type_INT32, convertedInt32, logicalInt32) - case *zed.TypeOfInt64, *zed.TypeOfDuration: - return newPrimitiveColumnDefinition(name, parquet.Type_INT64, convertedInt64, logicalInt64) - case *zed.TypeOfTime: - return newPrimitiveColumnDefinition(name, parquet.Type_INT64, nil, logicalTimestampNanos) - case *zed.TypeOfFloat16, *zed.TypeOfFloat32: - return newPrimitiveColumnDefinition(name, parquet.Type_FLOAT, nil, nil) - case *zed.TypeOfFloat64: - return newPrimitiveColumnDefinition(name, parquet.Type_DOUBLE, nil, nil) - // XXX add TypeDecimal - case *zed.TypeOfBool: - return newPrimitiveColumnDefinition(name, parquet.Type_BOOLEAN, nil, nil) - case *zed.TypeOfBytes: - return newPrimitiveColumnDefinition(name, parquet.Type_BYTE_ARRAY, nil, nil) - case *zed.TypeOfString, *zed.TypeOfIP, *zed.TypeOfNet, *zed.TypeOfType: - return newPrimitiveColumnDefinition(name, parquet.Type_BYTE_ARRAY, convertedUTF8, logicalString) - case *zed.TypeOfNull: - return nil, ErrNullType - case *zed.TypeRecord: - return newRecordColumnDefinition(name, typ) - case *zed.TypeArray: - return newListColumnDefinition(name, typ.Type) - case *zed.TypeSet: - return newListColumnDefinition(name, typ.Type) - case *zed.TypeUnion: - return nil, ErrUnionType - case *zed.TypeEnum: - return newPrimitiveColumnDefinition(name, parquet.Type_BYTE_ARRAY, convertedEnum, logicalEnum) - case *zed.TypeMap: - return newMapColumnDefinition(name, typ.KeyType, typ.ValType) - case *zed.TypeError: - // Errors are formatted as string using their ZSON representation. - return newPrimitiveColumnDefinition(name, parquet.Type_BYTE_ARRAY, convertedUTF8, logicalString) - default: - panic(fmt.Sprintf("unknown type %T", typ)) - } -} - -func newPrimitiveColumnDefinition(name string, t parquet.Type, c *parquet.ConvertedType, l *parquet.LogicalType) (*parquetschema.ColumnDefinition, error) { - return &parquetschema.ColumnDefinition{ - SchemaElement: &parquet.SchemaElement{ - Type: parquet.TypePtr(t), - RepetitionType: repetitionOptional, - Name: name, - ConvertedType: c, - LogicalType: l, - }, - }, nil -} - -func newListColumnDefinition(name string, typ zed.Type) (*parquetschema.ColumnDefinition, error) { - element, err := newColumnDefinition("element", typ) - if err != nil { - return nil, err - } - return &parquetschema.ColumnDefinition{ - Children: []*parquetschema.ColumnDefinition{ - { - Children: []*parquetschema.ColumnDefinition{element}, - SchemaElement: &parquet.SchemaElement{ - RepetitionType: repetitionRepeated, - Name: "list", - NumChildren: int32Ptr(1), - }, - }, - }, - SchemaElement: &parquet.SchemaElement{ - RepetitionType: repetitionOptional, - Name: name, - NumChildren: int32Ptr(1), - ConvertedType: convertedList, - LogicalType: logicalList, - }, - }, nil -} - -func newMapColumnDefinition(name string, keyType, valueType zed.Type) (*parquetschema.ColumnDefinition, error) { - key, err := newColumnDefinition("key", keyType) - if err != nil { - return nil, err - } - key.SchemaElement.RepetitionType = repetitionRequired - value, err := newColumnDefinition("value", valueType) - if err != nil { - return nil, err - } - value.SchemaElement.RepetitionType = repetitionRequired - // xxx maybe set key.RepetitionType and value.RepetitionType to repeated - return &parquetschema.ColumnDefinition{ - Children: []*parquetschema.ColumnDefinition{ - { - Children: []*parquetschema.ColumnDefinition{key, value}, - SchemaElement: &parquet.SchemaElement{ - RepetitionType: repetitionRepeated, - Name: "key_value", - NumChildren: int32Ptr(2), - ConvertedType: convertedMapKeyValue, - }, - }, - }, - SchemaElement: &parquet.SchemaElement{ - RepetitionType: repetitionOptional, - Name: name, - NumChildren: int32Ptr(1), - ConvertedType: convertedMap, - LogicalType: logicalMap, - }, - }, nil -} - -func newRecordColumnDefinition(name string, typ *zed.TypeRecord) (*parquetschema.ColumnDefinition, error) { - if len(typ.Fields) == 0 { - return nil, ErrEmptyRecordType - } - var children []*parquetschema.ColumnDefinition - for _, f := range typ.Fields { - c, err := newColumnDefinition(f.Name, f.Type) - if err != nil { - return nil, err - } - children = append(children, c) - } - return &parquetschema.ColumnDefinition{ - Children: children, - SchemaElement: &parquet.SchemaElement{ - RepetitionType: repetitionOptional, - Name: name, - NumChildren: int32Ptr(len(children)), - }, - }, nil -} - -func int32Ptr(i int) *int32 { - if i > math.MaxInt32 || i < math.MinInt32 { - panic(i) - } - i32 := int32(i) - return &i32 -} diff --git a/zio/parquetio/type.go b/zio/parquetio/type.go deleted file mode 100644 index e56db272aa..0000000000 --- a/zio/parquetio/type.go +++ /dev/null @@ -1,195 +0,0 @@ -package parquetio - -import ( - "errors" - "fmt" - - "github.com/brimdata/zed" - "github.com/fraugster/parquet-go/parquet" - "github.com/fraugster/parquet-go/parquetschema" -) - -func newRecordType(zctx *zed.Context, children []*parquetschema.ColumnDefinition) (*zed.TypeRecord, error) { - var cols []zed.Field - for _, c := range children { - typ, err := newType(zctx, c) - if err != nil { - return nil, fmt.Errorf("%s: %w", c.SchemaElement.Name, err) - } - cols = append(cols, zed.Field{ - Name: c.SchemaElement.Name, - Type: typ, - }) - } - return zctx.LookupTypeRecord(cols) -} - -func newType(zctx *zed.Context, cd *parquetschema.ColumnDefinition) (zed.Type, error) { - se := cd.SchemaElement - if se.Type != nil { - return newPrimitiveType(zctx, se) - } - if se.ConvertedType != nil { - switch *se.ConvertedType { - case parquet.ConvertedType_MAP: - keyType, err := newType(zctx, cd.Children[0].Children[0]) - if err != nil { - return nil, fmt.Errorf("%s: map key: %w", cd.SchemaElement.Name, err) - } - valType, err := newType(zctx, cd.Children[0].Children[1]) - if err != nil { - return nil, fmt.Errorf("%s: map value: %w", cd.SchemaElement.Name, err) - } - return zctx.LookupTypeMap(keyType, valType), nil - - case parquet.ConvertedType_LIST: - typ, err := newType(zctx, cd.Children[0].Children[0]) - if err != nil { - return nil, err - } - return zctx.LookupTypeArray(typ), nil - } - } - return newRecordType(zctx, cd.Children) - -} - -func newPrimitiveType(zctx *zed.Context, s *parquet.SchemaElement) (zed.Type, error) { - if s.IsSetLogicalType() && s.LogicalType.IsSetDECIMAL() || - s.GetConvertedType() == parquet.ConvertedType_DECIMAL { - return nil, errors.New("DECIMAL type is unimplemented") - } - switch *s.Type { - case parquet.Type_BOOLEAN: - return zed.TypeBool, nil - case parquet.Type_INT32: - if s.IsSetLogicalType() { - switch l := s.LogicalType; { - case l.IsSetDATE(): - zctx.LookupTypeNamed("date", zed.TypeInt32) - case l.IsSetINTEGER(): - switch i := l.INTEGER; { - case i.BitWidth == 8 && i.IsSigned: - return zed.TypeInt8, nil - case i.BitWidth == 8: - return zed.TypeUint8, nil - case i.BitWidth == 16 && i.IsSigned: - return zed.TypeInt16, nil - case i.BitWidth == 16: - return zed.TypeUint16, nil - case i.BitWidth == 32 && i.IsSigned: - return zed.TypeInt32, nil - case i.BitWidth == 32: - return zed.TypeUint32, nil - } - case l.IsSetTIME() && l.TIME.IsSetUnit() && l.TIME.Unit.IsSetMILLIS(): - return zctx.LookupTypeNamed("time_millis", zed.TypeInt32) - } - } - if s.IsSetConvertedType() { - switch *s.ConvertedType { - case parquet.ConvertedType_DATE: - return zctx.LookupTypeNamed("date", zed.TypeInt32) - case parquet.ConvertedType_UINT_8: - return zed.TypeUint8, nil - case parquet.ConvertedType_UINT_16: - return zed.TypeUint16, nil - case parquet.ConvertedType_UINT_32: - return zed.TypeUint32, nil - case parquet.ConvertedType_INT_8: - return zed.TypeInt8, nil - case parquet.ConvertedType_INT_16: - return zed.TypeInt16, nil - case parquet.ConvertedType_INT_32: - return zed.TypeInt32, nil - case parquet.ConvertedType_TIME_MILLIS: - return zctx.LookupTypeNamed("time_millis", zed.TypeInt32) - } - } - return zed.TypeInt32, nil - case parquet.Type_INT64: - if s.IsSetLogicalType() { - switch l := s.LogicalType; { - case l.IsSetINTEGER(): - switch { - case l.INTEGER.BitWidth == 64 && l.INTEGER.IsSigned: - return zed.TypeInt64, nil - case l.INTEGER.BitWidth == 64: - return zed.TypeUint64, nil - } - case l.IsSetTIME() && l.TIME.IsSetUnit(): - switch { - case l.TIME.Unit.IsSetMICROS(): - return zctx.LookupTypeNamed("time_micros", zed.TypeInt64) - case l.TIME.Unit.IsSetNANOS(): - return zctx.LookupTypeNamed("time_nanos", zed.TypeInt64) - } - case l.IsSetTIMESTAMP() && l.TIMESTAMP.IsSetUnit(): - switch { - case l.TIMESTAMP.Unit.IsSetMILLIS(): - return zctx.LookupTypeNamed("timestamp_millis", zed.TypeInt64) - case l.TIMESTAMP.Unit.IsSetMICROS(): - return zctx.LookupTypeNamed("timestamp_micros", zed.TypeInt64) - case l.TIMESTAMP.Unit.IsSetNANOS(): - return zed.TypeTime, nil - } - } - } - if s.IsSetConvertedType() { - switch *s.ConvertedType { - case parquet.ConvertedType_UINT_64: - return zed.TypeUint64, nil - case parquet.ConvertedType_INT_64: - return zed.TypeInt64, nil - case parquet.ConvertedType_TIME_MICROS: - return zctx.LookupTypeNamed("time_micros", zed.TypeInt64) - case parquet.ConvertedType_TIMESTAMP_MILLIS: - return zctx.LookupTypeNamed("timestamp_millis", zed.TypeInt32) - case parquet.ConvertedType_TIMESTAMP_MICROS: - return zctx.LookupTypeNamed("timestamp_micros", zed.TypeInt64) - } - } - return zed.TypeInt64, nil - case parquet.Type_INT96: - return zctx.LookupTypeNamed("int96", zed.TypeBytes) - case parquet.Type_FLOAT: - return zed.TypeFloat32, nil - case parquet.Type_DOUBLE: - return zed.TypeFloat64, nil - case parquet.Type_BYTE_ARRAY: - if s.IsSetLogicalType() { - switch l := s.LogicalType; { - case l.IsSetBSON(): - return zctx.LookupTypeNamed("bson", zed.TypeBytes) - case l.IsSetENUM(): - return zctx.LookupTypeNamed("enum", zed.TypeString) - case l.IsSetJSON(): - return zctx.LookupTypeNamed("json", zed.TypeString) - case l.IsSetSTRING(): - return zed.TypeString, nil - } - } - if s.IsSetConvertedType() { - switch *s.ConvertedType { - case parquet.ConvertedType_BSON: - return zctx.LookupTypeNamed("bson", zed.TypeBytes) - case parquet.ConvertedType_JSON: - return zctx.LookupTypeNamed("json", zed.TypeString) - case parquet.ConvertedType_ENUM: - return zctx.LookupTypeNamed("enum", zed.TypeString) - case parquet.ConvertedType_UTF8: - return zed.TypeString, nil - } - } - return zed.TypeBytes, nil - case parquet.Type_FIXED_LEN_BYTE_ARRAY: - switch { - case s.GetTypeLength() == 16 && s.IsSetLogicalType() && s.LogicalType.IsSetUUID(): - return zctx.LookupTypeNamed("uuid", zed.TypeBytes) - case s.GetTypeLength() == 12 && s.GetConvertedType() == parquet.ConvertedType_INTERVAL: - return zctx.LookupTypeNamed("interval", zed.TypeBytes) - } - return zctx.LookupTypeNamed(fmt.Sprintf("fixed_len_byte_array_%d", *s.TypeLength), zed.TypeBytes) - } - panic(s.Type.String()) -} diff --git a/zio/parquetio/writer.go b/zio/parquetio/writer.go index 9325c0ddd8..0922f19474 100644 --- a/zio/parquetio/writer.go +++ b/zio/parquetio/writer.go @@ -1,59 +1,46 @@ package parquetio import ( - "errors" "fmt" "io" + "strings" + "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/parquet/pqarrow" "github.com/brimdata/zed" - "github.com/brimdata/zed/zson" - goparquet "github.com/fraugster/parquet-go" - "github.com/fraugster/parquet-go/parquet" + "github.com/brimdata/zed/zio" + "github.com/brimdata/zed/zio/arrowio" ) type Writer struct { - w io.WriteCloser - - fw *goparquet.FileWriter - typ *zed.TypeRecord + *arrowio.Writer } -func NewWriter(w io.WriteCloser) *Writer { - return &Writer{w: w} +func NewWriter(wc io.WriteCloser) *Writer { + w := arrowio.NewWriter(wc) + w.NewWriterFunc = func(w io.Writer, s *arrow.Schema) (arrowio.WriteCloser, error) { + fw, err := pqarrow.NewFileWriter(s, zio.NopCloser(w), nil, pqarrow.DefaultWriterProps()) + if err != nil { + return nil, fmt.Errorf("%w: %s", arrowio.ErrUnsupportedType, err) + } + return fw, nil + } + return &Writer{w} } -func (w *Writer) Close() error { - var err error - if w.fw != nil { - err = w.fw.Close() +func (w *Writer) Write(val *zed.Value) error { + if err := w.Writer.Write(val); err != nil { + return parquetioError{err} } - if err2 := w.w.Close(); err == nil { - err = err2 - } - return err + return nil } -func (w *Writer) Write(rec *zed.Value) error { - recType, ok := zed.TypeUnder(rec.Type).(*zed.TypeRecord) - if !ok { - return fmt.Errorf("Parquet output encountered non-record value: %s", zson.String(rec)) - } - if w.typ == nil { - w.typ = recType - sd, err := newSchemaDefinition(recType) - if err != nil { - return err - } - w.fw = goparquet.NewFileWriter(w.w, - goparquet.WithCompressionCodec(parquet.CompressionCodec_SNAPPY), - goparquet.WithSchemaDefinition(sd)) - } else if w.typ != recType { - return errors.New( - "Parquet output requires uniform records but multiple types encountered (consider 'fuse')") - } - data, err := newRecordData(recType, rec.Bytes) - if err != nil { - return err - } - return w.fw.AddData(data) +type parquetioError struct { + err error } + +func (p parquetioError) Error() string { + return "parquetio: " + strings.TrimPrefix(p.err.Error(), "arrowio: ") +} + +func (p parquetioError) Unwrap() error { return p.err } diff --git a/zio/parquetio/ztests/non-record.yaml b/zio/parquetio/ztests/non-record.yaml index dbd80d0613..a44ee1e6fb 100644 --- a/zio/parquetio/ztests/non-record.yaml +++ b/zio/parquetio/ztests/non-record.yaml @@ -10,4 +10,4 @@ inputs: outputs: - name: stderr data: | - Parquet output encountered non-record value: 1 + parquetio: not a record: 1 diff --git a/zio/parquetio/ztests/types.yaml b/zio/parquetio/ztests/types.yaml index 71c8746433..09fda76a47 100644 --- a/zio/parquetio/ztests/types.yaml +++ b/zio/parquetio/ztests/types.yaml @@ -14,9 +14,9 @@ inputs: i16: -16 (int16), i32: -32 (int32), i64: -64, - dur: 0s, + // duration is not supported by pqarrow. tim: 1970-01-01T00:00:00Z, - f16: 16. (float16), + // float16 is not supported by pqarrow. f32: 32. (float32), f64: 64., boo: false, @@ -37,9 +37,7 @@ inputs: i16: null, i32: null, i64: null, - dur: null, tim: null, - f16: null, f32: null, f64: null, boo: null, @@ -65,9 +63,7 @@ outputs: i16: -16 (int16), i32: -32 (int32), i64: -64, - dur: 0, tim: 1970-01-01T00:00:00Z, - f16: 16. (float32), f32: 32. (float32), f64: 64., boo: false, @@ -90,9 +86,7 @@ outputs: i16: null (int16), i32: null (int32), i64: null (int64), - dur: null (int64), tim: null (time), - f16: null (float32), f32: null (float32), f64: null (float64), boo: null (bool), diff --git a/zio/parquetio/ztests/writer-errors.yaml b/zio/parquetio/ztests/writer-errors.yaml new file mode 100644 index 0000000000..9475f2bd2d --- /dev/null +++ b/zio/parquetio/ztests/writer-errors.yaml @@ -0,0 +1,15 @@ +script: | + ! echo '{a:1} {b:2}' | zq -f parquet - + ! echo 1 | zq -f parquet - + ! echo {} | zq -f parquet - + ! echo {a:1s} | zq -f parquet - + ! echo '{a:1.(float16)}' | zq -f parquet - + +outputs: + - name: stderr + data: | + parquetio: encountered multiple types (consider 'fuse'): {a:int64} and {b:int64} + parquetio: not a record: 1 + parquetio: unsupported type: empty record + parquetio: unsupported type: not implemented yet + parquetio: unsupported type: not implemented yet