Skip to content

Commit c97e4af

Browse files
committed
perf: QSem based connection pool
1 parent 50eec77 commit c97e4af

File tree

4 files changed

+328
-33
lines changed

4 files changed

+328
-33
lines changed

postgrest.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ library
9595
PostgREST.Response.GucHeader
9696
PostgREST.Response.Performance
9797
PostgREST.Version
98+
PostgREST.SemPool
9899
build-depends: base >= 4.9 && < 4.20
99100
, HTTP >= 4000.3.7 && < 4000.5
100101
, Ranged-sets >= 0.3 && < 0.5
@@ -265,6 +266,7 @@ test-suite spec
265266
, bytestring >= 0.10.8 && < 0.13
266267
, case-insensitive >= 1.2 && < 1.3
267268
, containers >= 0.5.7 && < 0.7
269+
, hasql >= 1.6.1.1 && < 1.7
268270
, hasql-pool >= 1.0.1 && < 1.1
269271
, hasql-transaction >= 1.0.1 && < 1.2
270272
, heredoc >= 0.2 && < 0.3

src/PostgREST/AppState.hs

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ module PostgREST.AppState
3333
import qualified Data.ByteString.Char8 as BS
3434
import Data.Either.Combinators (whenLeft)
3535
import qualified Data.Text as T (unpack)
36-
import qualified Hasql.Pool as SQL
37-
import qualified Hasql.Pool.Config as SQL
3836
import qualified Hasql.Session as SQL
3937
import qualified Hasql.Transaction.Sessions as SQL
4038
import qualified Network.HTTP.Types.Status as HTTP
@@ -74,11 +72,14 @@ import PostgREST.Unix (createAndBindDomainSocket)
7472

7573
import Data.Streaming.Network (bindPortTCP, bindRandomPortTCP)
7674
import Data.String (IsString (..))
75+
import qualified Hasql.Connection as SQL
76+
import qualified PostgREST.SemPool as Sem
7777
import Protolude
78+
import Hasql.Pool (UsageError(..))
7879

7980
data AppState = AppState
8081
-- | Database connection pool
81-
{ statePool :: SQL.Pool
82+
{ statePool :: Sem.Pool SQL.ConnectionError SQL.Connection
8283
-- | Database server version
8384
, statePgVersion :: IORef PgVersion
8485
-- | Schema cache
@@ -132,7 +133,7 @@ init conf@AppConfig{configLogLevel, configDbPoolSize} = do
132133
state' <- initWithPool (sock, adminSock) pool conf loggerState metricsState observer
133134
pure state' { stateSocketREST = sock, stateSocketAdmin = adminSock}
134135

135-
initWithPool :: AppSockets -> SQL.Pool -> AppConfig -> Logger.LoggerState -> Metrics.MetricsState -> ObservationHandler -> IO AppState
136+
initWithPool :: AppSockets -> Sem.Pool SQL.ConnectionError SQL.Connection -> AppConfig -> Logger.LoggerState -> Metrics.MetricsState -> ObservationHandler -> IO AppState
136137
initWithPool (sock, adminSock) pool conf loggerState metricsState observer = do
137138

138139
appState <- AppState pool
@@ -200,35 +201,40 @@ initSockets AppConfig{..} = do
200201

201202
pure (sock, adminSock)
202203

203-
initPool :: AppConfig -> ObservationHandler -> IO SQL.Pool
204-
initPool AppConfig{..} observer = do
205-
SQL.acquire $ SQL.settings
206-
[ SQL.size configDbPoolSize
207-
, SQL.acquisitionTimeout $ fromIntegral configDbPoolAcquisitionTimeout
208-
, SQL.agingTimeout $ fromIntegral configDbPoolMaxLifetime
209-
, SQL.idlenessTimeout $ fromIntegral configDbPoolMaxIdletime
210-
, SQL.staticConnectionSettings (toUtf8 $ addFallbackAppName prettyVersion configDbUri)
211-
, SQL.observationHandler $ observer . HasqlPoolObs
212-
]
204+
initPool :: AppConfig -> ObservationHandler -> IO (Sem.Pool SQL.ConnectionError SQL.Connection)
205+
initPool AppConfig{..} _ = do
206+
Sem.pool configDbPoolSize configDbPoolAcquisitionTimeout (SQL.acquire (toUtf8 $ addFallbackAppName prettyVersion configDbUri)) (const $ pure mempty) SQL.release
207+
-- where
208+
-- settings =
209+
-- SQL.settings
210+
-- [ SQL.size configDbPoolSize
211+
-- , SQL.acquisitionTimeout $ fromIntegral configDbPoolAcquisitionTimeout
212+
-- , SQL.agingTimeout $ fromIntegral configDbPoolMaxLifetime
213+
-- , SQL.idlenessTimeout $ fromIntegral configDbPoolMaxIdletime
214+
-- , SQL.staticConnectionSettings (toUtf8 $ addFallbackAppName prettyVersion configDbUri)
215+
-- , SQL.observationHandler $ observer . HasqlPoolObs
216+
-- ]
213217

214218
-- | Run an action with a database connection.
215-
usePool :: AppState -> SQL.Session a -> IO (Either SQL.UsageError a)
219+
usePool :: AppState -> SQL.Session a -> IO (Either UsageError a)
216220
usePool AppState{stateObserver=observer, stateMainThreadId=mainThreadId, ..} sess = do
217221
observer PoolRequest
218222

219-
res <- SQL.use statePool sess
223+
res <- join . first (\case
224+
Sem.AcquireTimeout -> AcquisitionTimeoutUsageError
225+
(Sem.ResourceError e) -> ConnectionUsageError e) <$> Sem.use statePool (fmap (first SessionUsageError ) . SQL.run sess)
220226

221227
observer PoolRequestFullfilled
222228

223229
whenLeft res (\case
224-
SQL.AcquisitionTimeoutUsageError ->
225-
observer $ PoolAcqTimeoutObs SQL.AcquisitionTimeoutUsageError
226-
err@(SQL.ConnectionUsageError e) ->
230+
AcquisitionTimeoutUsageError ->
231+
observer $ PoolAcqTimeoutObs AcquisitionTimeoutUsageError
232+
err@(ConnectionUsageError e) ->
227233
let failureMessage = BS.unpack $ fromMaybe mempty e in
228234
when (("FATAL: password authentication failed" `isInfixOf` failureMessage) || ("no password supplied" `isInfixOf` failureMessage)) $ do
229235
observer $ ExitDBFatalError ServerAuthError err
230236
killThread mainThreadId
231-
err@(SQL.SessionUsageError (SQL.QueryError tpl _ (SQL.ResultError resultErr))) -> do
237+
err@(SessionUsageError (SQL.QueryError tpl _ (SQL.ResultError resultErr))) -> do
232238
case resultErr of
233239
SQL.UnexpectedResult{} -> do
234240
observer $ ExitDBFatalError ServerPgrstBug err
@@ -261,7 +267,7 @@ usePool AppState{stateObserver=observer, stateMainThreadId=mainThreadId, ..} ses
261267
SQL.ServerError{} ->
262268
when (Error.status (Error.PgError False err) >= HTTP.status500) $
263269
observer $ QueryErrorCodeHighObs err
264-
err@(SQL.SessionUsageError (SQL.QueryError _ _ (SQL.ClientError _))) ->
270+
err@(SessionUsageError (SQL.QueryError _ _ (SQL.ClientError _))) ->
265271
-- An error on the client-side, usually indicates problems wth connection
266272
observer $ QueryErrorCodeHighObs err
267273
)
@@ -271,11 +277,11 @@ usePool AppState{stateObserver=observer, stateMainThreadId=mainThreadId, ..} ses
271277
-- | Flush the connection pool so that any future use of the pool will
272278
-- use connections freshly established after this call.
273279
flushPool :: AppState -> IO ()
274-
flushPool AppState{..} = SQL.release statePool
280+
flushPool AppState{..} = Sem.release statePool
275281

276282
-- | Destroy the pool on shutdown.
277283
destroyPool :: AppState -> IO ()
278-
destroyPool AppState{..} = SQL.release statePool
284+
destroyPool AppState{..} = Sem.release statePool
279285

280286
getPgVersion :: AppState -> IO PgVersion
281287
getPgVersion = readIORef . statePgVersion

0 commit comments

Comments
 (0)