Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cabal-override.project
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ constraints: any.Diff ==0.4.1,
any.blaze-builder ==0.4.2.3,
any.blaze-html ==0.9.2.0,
any.blaze-markup ==0.8.3.0,
any.bloodhound ==0.19.1.0,
any.bloodhound ==0.25.0.0,
any.boring ==0.2.2,
any.bsb-http-chunked ==0.0.0.4,
any.bugzilla-redhat ==1.0.1.1,
Expand Down
2 changes: 1 addition & 1 deletion monocle.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ library
, blaze-markup >= 0.8.2.8
, blaze-html >= 0.9.1.2
, binary >= 0.8
, bloodhound ^>= 0.19
, bloodhound ^>= 0.25
, bugzilla-redhat ^>= 1.0
, byteslice >= 0.2
, bytestring >= 0.10
Expand Down
17 changes: 12 additions & 5 deletions nix/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,18 @@ let
serialise = pkgs.haskell.lib.doJailbreak
(pkgs.haskell.lib.dontCheck hpPrev.serialise);

# upgrade to bloodhound 0.20 needs some work
bloodhound = pkgs.haskell.lib.overrideCabal hpPrev.bloodhound {
version = "0.19.1.0";
sha256 = "sha256-QEN1wOLLUEsDKAbgz8ex0wfK/duNytvRYclwkBj/1G0=";
};
bloodhound = let
src = pkgs.fetchFromGitHub {
owner = "bitemyapp";
repo = "bloodhound";
rev = "3789abec56da76c7f5332de38f879299ca4bea3d"; # v0.25.0.0
sha256 = "sha256-3d+gMO8EeKR2j55VlXweS9uJtOX971ve6PJjgx3pPHw=";
};
pkg = hpPrev.callCabal2nix "bloodhound" src { };
in pkgs.lib.pipe pkg [
pkgs.haskell.lib.compose.doJailbreak
pkgs.haskell.lib.compose.dontCheck
];

# relax bound for doctest, ghc-prim, primitive, template-haskell, text and transformers
proto3-wire = pkgs.haskell.lib.doJailbreak hpPrev.proto3-wire;
Expand Down
217 changes: 77 additions & 140 deletions src/Database/Bloodhound/Raw.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,27 @@ module Database.Bloodhound.Raw (
aggWithDocValues,
mkAgg,
mkTermsCompositeAgg,
) where
)
where

import Control.Monad.Catch (MonadThrow, throwM)
import Data.Aeson
import Data.Aeson qualified as Aeson
import Data.Aeson.Casing.Internal qualified as AesonCasing
import Data.Aeson.KeyMap qualified as KM
import Data.Aeson.Types qualified as Aeson
import Data.Text qualified as Text
import Data.Map qualified as Map
import Data.Text qualified as T
import Data.Vector qualified as V
import Database.Bloodhound qualified as BH
import Json.Extras qualified as Json
import Database.Bloodhound.Common.Requests qualified as Query
import Monocle.Prelude
import Network.HTTP.Client qualified as HTTP
import Network.HTTP.Types.Method qualified as HTTP

data ScrollRequest = NoScroll | GetScroll ByteString

type QS = [(ByteString, Maybe ByteString)]

dispatch ::
BH.MonadBH m =>
HTTP.Method ->
Text ->
LByteString ->
QS ->
m BH.Reply
dispatch method url body qs = do
initReq <- liftIO $ HTTP.parseRequest (from url)
let request =
initReq
{ HTTP.method = method
, HTTP.requestHeaders =
("Content-Type", "application/json") : HTTP.requestHeaders initReq
, HTTP.requestBody = HTTP.RequestBodyLBS body
}
manager <- BH.bhManager <$> BH.getBHEnv
liftIO $ HTTP.httpLbs (setQs request) manager
where
setQs = case qs of
[] -> id
xs -> HTTP.setQueryString xs

-- | Utility function to advance in scroll result. We can use the BH library
-- because we no longer need to support a custom raw body once we have a scroll.
advance :: (MonadBH m, MonadThrow m, FromJSON resp) => BH.ScrollId -> m (BH.SearchResult resp)
advance :: (MonadBH m, FromJSON resp) => BH.ScrollId -> m (BH.SearchResult resp)
advance scroll = do
resp <- BH.advanceScroll scroll 60
resp <- BH.tryEsError $ BH.advanceScroll scroll 60
case resp of
Left err -> throwEsError "advance" err
Right x -> pure x
Expand All @@ -69,58 +42,32 @@ throwEsError :: MonadThrow m => LByteString -> BH.EsError -> m a
throwEsError resp err = throwM $ BH.EsProtocolException err.errorMessage resp

settings :: (MonadBH m, ToJSON body) => BH.IndexName -> body -> m ()
settings (BH.IndexName index) body = do
BH.Server s <- BH.bhServer <$> BH.getBHEnv
let url = Text.intercalate "/" [s, index, "_settings"]
settings index body = do
let endpoint = BH.mkEndpoint [BH.unIndexName index, "_settings"]
method = HTTP.methodPut
resp <- dispatch method url (Aeson.encode body) []
case HTTP.responseBody resp of
"{\"acknowledged\":true}" -> pure ()
resp <- BH.performBHRequest @_ @BH.StatusIndependant $ BH.mkFullRequest method endpoint (Aeson.encode body)
case resp of
BH.Acknowledged True -> pure ()
_ -> error $ "Settings apply failed: " <> show resp

search' :: (MonadBH m, ToJSON body) => BH.IndexName -> body -> QS -> m BH.Reply
search' (BH.IndexName index) body qs = do
BH.Server s <- BH.bhServer <$> BH.getBHEnv
let url = Text.intercalate "/" [s, index, "_search"]
method = HTTP.methodPost
dispatch method url (Aeson.encode body) qs

-- | Manual aeson casing implementation to create the search _source attribute
--
-- >>> aesonCasing "echangeCommitCount"
-- "commit_count"
aesonCasing :: String -> String
aesonCasing = AesonCasing.snakeCase . AesonCasing.dropFPrefix

search ::
forall resp m body.
(MonadBH m, MonadThrow m) =>
(Aeson.ToJSON body, FromJSONField resp) =>
forall resp m.
MonadBH m =>
FromJSONField resp =>
BH.IndexName ->
body ->
BH.Search ->
ScrollRequest ->
m (BH.SearchResult resp)
search index body scrollRequest = do
rawResp <- search' index newBody qs
resp <- BH.parseEsResponse rawResp
search index payload scrollRequest = do
let query = (Query.searchByIndex index payload {BH.source = Just fields}) {BH.bhRequestQueryStrings = qs}
resp <- BH.tryEsError $ BH.performBHRequest query
case resp of
Left err -> throwEsError "search" err
Right x -> pure x
where
newBody = case (fields, toJSON body) of
-- The results has fields, and the body is an object
(xs@(_ : _), Aeson.Object obj) -> Aeson.Object $ addSourceFields xs obj
-- Otherwise we don't change the body
(_, bodyValue) -> bodyValue

addSourceFields xs = KM.insert "_source" (Aeson.Array $ fromList $ map toSourceElem xs)

toSourceElem :: String -> Value
toSourceElem = Aeson.String . from . aesonCasing

-- The fields of the result data types.
fields :: [String]
fields = selectors (Proxy :: Proxy (Rep resp))
fields :: BH.Source
fields = BH.SourcePatterns $ BH.PopPatterns $ BH.Pattern . T.pack <$> selectors (Proxy :: Proxy (Rep resp))

qs = case scrollRequest of
NoScroll -> []
Expand All @@ -129,77 +76,67 @@ search index body scrollRequest = do
-- | A special purpose search implementation that uses the faster json-syntax
searchHit ::
MonadBH m =>
Aeson.ToJSON body =>
Aeson.FromJSON a =>
BH.IndexName ->
body ->
m [Json.Value]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, aeson is too slow for certain query, so we do need to keep json-syntax here. See: the test/JsonDecode.{hs,py} benchmark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted!

searchHit index body = do
rawResp <- search' index body []
case decodeHits (Json.decodeThrow $ HTTP.responseBody rawResp) of
Just xs -> pure xs
Nothing -> error $ "Could not find hits in " <> show rawResp
where
decodeHits :: Json.Value -> Maybe [Json.Value]
decodeHits value = do
hits <- Json.getAttr "hits" =<< Json.getAttr "hits" value
fmap getSource <$> Json.getArray hits
getSource value = case Json.getAttr "_source" value of
Nothing -> error $ "No source found in: " <> show value
Just v -> v

aggWithDocValues :: [(Text, Value)] -> Maybe BH.Query -> Value
aggWithDocValues agg = mkAgg agg (Just dv)
where
dv =
Aeson.Array
( V.fromList
[ Aeson.object
[ "field" .= Aeson.String "created_at"
, "format" .= Aeson.String "date_time"
]
]
)
BH.Search ->
m (BH.SearchResult a)
searchHit index payload = do
let query = Query.searchByIndex index payload
resp <- BH.tryEsError $ BH.performBHRequest query
case resp of
Right xs -> pure xs
Left e -> throwEsError "Could not find hits" e

toPair :: (Text, Value) -> Aeson.Pair
toPair (k, v) = (from k, v)
aggWithDocValues :: BH.Aggregations -> Maybe BH.Query -> BH.Search
aggWithDocValues agg = mkAgg agg (Just [BH.DocvalueFieldNameAndFormat (BH.FieldName "created_at") "date_time"])

mkAgg :: [(Text, Value)] -> Maybe Value -> Maybe BH.Query -> Value
mkAgg :: BH.Aggregations -> Maybe [BH.DocvalueField] -> Maybe BH.Query -> BH.Search
mkAgg agg docvalues query =
Aeson.object
$ [ "aggregations" .= Aeson.object (toPair <$> agg)
, "size" .= Aeson.Number 0
]
<> case docvalues of
Just dv -> ["docvalue_fields" .= dv]
Nothing -> []
<> case query of
Just q -> ["query" .= Aeson.toJSON q]
Nothing -> []

mkTermsCompositeAgg :: Text -> Maybe Value -> (Text, Value)
BH.Search
{ trackSortScores = False
, suggestBody = Nothing
, sortBody = Nothing
, searchType = BH.SearchTypeQueryThenFetch
, searchAfterKey = Nothing
, scriptFields = Nothing
, docvalueFields = docvalues
, queryBody = query
, pointInTime = Nothing
, highlight = Nothing
, filterBody = Nothing
, aggBody = Just agg
, source = Nothing
, size = BH.Size 0
, fields = Nothing
, from = BH.From 0
}

mkTermsCompositeAgg :: Text -> Maybe Value -> BH.Aggregations
mkTermsCompositeAgg term afterM =
( "agg1"
, Aeson.object
[ "composite"
.= Aeson.object
( [ "sources" .= [agg]
, "size" .= Aeson.Number 1024
]
<> after
)
]
)
where
after = case afterM of
Just v -> ["after" .= Aeson.object ["agg" .= v]]
Nothing -> []
agg =
Aeson.object
[ "agg"
.= Aeson.object
[ "terms" .= Aeson.object ["field" .= term]
]
]
Map.singleton
"agg1"
$ BH.CompositeAgg
$ BH.CompositeAggregation
{ compositeAggregationSize = Just 1024
, compositeAggregationSources =
[ BH.CompositeAggregationSource
"agg"
$ BH.CompositeTermsAgg
BH.TermsAggregation
{ term = Right term
, termInclude = Nothing
, termExclude = Nothing
, termOrder = Nothing
, termMinDocCount = Nothing
, termSize = Nothing
, termShardSize = Nothing
, termCollectMode = Nothing
, termExecutionHint = Nothing
, termAggs = Nothing
}
]
, compositeAggregationAfter = (\v -> Aeson.object ["agg" .= v]) <$> afterM
}

-- Make Value a type parameter for TermsCompositeAggResult
newtype TermsCompositeAggKey = TermsCompositeAggKey
Expand Down
26 changes: 10 additions & 16 deletions src/Monocle/Backend/Index.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
-- witch instance for CrawlerPB
{-# LANGUAGE QuasiQuotes #-}
{-# OPTIONS_GHC -Wno-orphans #-}

-- | Index management functions such as document mapping and ingest
Expand Down Expand Up @@ -264,7 +265,7 @@ configVersion :: ConfigVersion
configVersion = ConfigVersion 7

configIndex :: BH.IndexName
configIndex = BH.IndexName "monocle.config"
configIndex = [BH.qqIndexName|monocle.config|]

configDoc :: BH.DocId
configDoc = BH.DocId "config"
Expand Down Expand Up @@ -477,13 +478,13 @@ ensureIndexCrawlerMetadata = do
QueryWorkspace config <- getQueryTarget
traverse_ initCrawlerMetadata config.crawlers

withRefresh :: HasCallStack => MonoQuery :> es => IndexEffects es => Eff es BH.Reply -> Eff es ()
withRefresh :: HasCallStack => Show e => Show a => MonoQuery :> es => IndexEffects es => Eff es (Either e a) -> Eff es ()
withRefresh action = do
index <- getIndexName
resp <- action
unless (BH.isSuccess resp) (error $ "Unable to add or update: " <> show resp)
unless (isRight resp) (error $ "Unable to add or update: " <> show resp)
refreshResp <- esRefreshIndex index
unless (BH.isSuccess refreshResp) (error $ "Unable to refresh index: " <> show resp)
unless (isRight refreshResp) (error $ "Unable to refresh index: " <> show resp)

ensureIndex :: (E.Fail :> es, LoggerEffect :> es, MonoQuery :> es, Error ElasticError :> es, ElasticEffect :> es, Retry :> es) => Eff es ()
ensureIndex = do
Expand Down Expand Up @@ -679,8 +680,8 @@ indexEvents events = indexDocs (fmap toDoc events)
statusCheck :: (Int -> c) -> HTTP.Response body -> c
statusCheck prd = prd . NHTS.statusCode . HTTP.responseStatus

isNotFound :: BH.Reply -> Bool
isNotFound = statusCheck (== 404)
isNotFound :: BH.BHResponse parsingContext a -> Bool
isNotFound (BH.BHResponse r) = statusCheck (== 404) r

checkDocExists :: MonoQuery :> es => IndexEffects es => BH.DocId -> Eff es Bool
checkDocExists docId = do
Expand All @@ -690,16 +691,9 @@ checkDocExists docId = do
getDocumentById' :: IndexEffects es => FromJSON a => BH.IndexName -> BH.DocId -> Eff es (Maybe a)
getDocumentById' index docId = do
resp <- esGetDocument index docId
if isNotFound resp
then pure Nothing
else do
parsed <- BH.parseEsResponse resp
case parsed of
Right cm -> pure . getHit $ BH.foundResult cm
Left _ -> error "Unable to get parse result"
where
getHit (Just (BH.EsResultFound _ cm)) = Just cm
getHit Nothing = Nothing
case resp of
Left _ -> pure Nothing
Right x -> pure $ BH._source <$> BH.foundResult x

getDocumentById :: MonoQuery :> es => IndexEffects es => FromJSON a => BH.DocId -> Eff es (Maybe a)
getDocumentById docId = do
Expand Down
Loading
Loading