11(ns metabase.driver.neo4j.execute
2+ (:import [org.neo4j.driver
3+ Driver])
24 (:require
5+ [metabase.driver.neo4j.util
6+ :refer [with-neo-connection]]
7+ [neo4clj.client :as neo4j]
8+ [clojure.core.async :as a]
9+ [clojure.tools.logging :as log]
310 [metabase.driver.sql-jdbc
411 [execute :as sql-jdbc.execute]]
512 [metabase.mbql.util :as mbql.u]
613 [metabase.query-processor
714 [context :as context]
815 [interface :as qp.i]
916 [store :as qp.store]
17+ [reducible :as qp.reducible]
1018 [timezone :as qp.timezone]]))
1119
1220
1321; We want to do this in order to avoid remarks and commenting which affect the simba JDBC driver
22+
23+
1424(defn execute-reducible-query
1525 " Implementation of `execute-reducible-query` for neo4j bi connector driver
1626 Copied as is from sql-jdbc/execute"
2535 rs (sql-jdbc.execute/execute-query! driver stmt)]
2636 (let [rsmeta (.getMetaData rs)
2737 results-metadata {:cols (sql-jdbc.execute/column-metadata driver rsmeta)}]
28- (respond results-metadata (sql-jdbc.execute/reducible-rows driver rs rsmeta (context/canceled-chan context)))))))
38+ (respond results-metadata (sql-jdbc.execute/reducible-rows driver rs rsmeta (context/canceled-chan context)))))))
39+
40+ ; TODO add with-neo4j-connection macro
41+ (defn get-neo-connection
42+ [{host :host port :port user :user password :password dbname :dbname }]
43+ (let [base (str " bolt://" host " :" port)
44+ url (if dbname (str base " /" dbname) base)]
45+ (if password (neo4j/connect url user password) (neo4j/connect url user))))
46+
47+ (defn get-cypher-columns [result]
48+ (if (seq? result)
49+ {:cols (into [] (map #(assoc {} :name %) (keys (first (take 1 result)))))}
50+ {:cols [{:name " result" }]}))
51+
52+ (defn execute-reducible-query->cypher
53+ " Process and run a native cypher query."
54+ [_ {{query :query } :native } context respond]
55+ (log/info " Executing reducible query for cypher" )
56+ (with-neo-connection [^Driver connection (:details (qp.store/database ))]
57+ (let [results (volatile! (neo4j/execute! connection query))
58+ nonseq-val (volatile! false )
59+ columns (get-cypher-columns @results)
60+ row-thunk #(if-not (seq? @results) ((if-not @nonseq-val (vreset! nonseq-val true ) @results) nil )
61+ (let [old @results]
62+ (vswap! results (fn [state] (drop 1 state)))
63+ (vals (first (take 1 old)))))]
64+ ; handle cancellation
65+ (a/go
66+ (when (a/<! (context/canceled-chan context))
67+ (neo4j/disconnect connection)))
68+ (respond columns (qp.reducible/reducible-rows row-thunk (context/canceled-chan context))))))
0 commit comments