8
8
import static java .lang .String .format ;
9
9
import static java .util .Locale .ENGLISH ;
10
10
import static java .util .Objects .requireNonNull ;
11
- import static java .util .concurrent .TimeUnit .HOURS ;
12
- import static java .util .concurrent .TimeUnit .MINUTES ;
13
11
14
12
import java .util .Collections ;
15
13
import java .util .HashSet ;
16
14
import java .util .List ;
17
15
import java .util .Optional ;
18
16
import java .util .Set ;
17
+ import java .util .concurrent .TimeUnit ;
19
18
import java .util .stream .Collectors ;
20
19
21
20
import com .google .common .cache .CacheBuilder ;
@@ -71,24 +70,24 @@ public class RediSearchSession {
71
70
72
71
private final TypeManager typeManager ;
73
72
private final StatefulRedisModulesConnection <String , String > connection ;
74
- private final RediSearchClientConfig config ;
73
+ private final RediSearchConfig config ;
75
74
private final LoadingCache <SchemaTableName , RediSearchTable > tableCache ;
76
75
77
76
public RediSearchSession (TypeManager typeManager , StatefulRedisModulesConnection <String , String > connection ,
78
- RediSearchClientConfig config ) {
77
+ RediSearchConfig config ) {
79
78
this .typeManager = requireNonNull (typeManager , "typeManager is null" );
80
79
this .connection = requireNonNull (connection , "connection is null" );
81
80
this .config = requireNonNull (config , "config is null" );
82
- // TODO make table cache expiration configurable
83
- this . tableCache = CacheBuilder . newBuilder (). expireAfterWrite ( 1 , HOURS ). refreshAfterWrite ( 1 , MINUTES )
81
+ this . tableCache = CacheBuilder . newBuilder (). expireAfterWrite ( config . getTableCacheExpiration (), TimeUnit . SECONDS )
82
+ . refreshAfterWrite ( config . getTableCacheRefresh (), TimeUnit . SECONDS )
84
83
.build (CacheLoader .from (this ::loadTableSchema ));
85
84
}
86
85
87
86
public StatefulRedisModulesConnection <String , String > getConnection () {
88
87
return connection ;
89
88
}
90
89
91
- public RediSearchClientConfig getConfig () {
90
+ public RediSearchConfig getConfig () {
92
91
return config ;
93
92
}
94
93
@@ -107,7 +106,7 @@ public List<HostAddress> getAddresses() {
107
106
108
107
public Set <String > getAllTables () throws SchemaNotFoundException {
109
108
ImmutableSet .Builder <String > builder = ImmutableSet .builder ();
110
- builder .addAll (connection .sync ().list ());
109
+ builder .addAll (connection .sync ().ftList ());
111
110
return builder .build ();
112
111
}
113
112
@@ -120,25 +119,26 @@ public RediSearchTable getTable(SchemaTableName tableName) throws TableNotFoundE
120
119
}
121
120
}
122
121
122
+ @ SuppressWarnings ("unchecked" )
123
123
public void createTable (SchemaTableName schemaTableName , List <RediSearchColumnHandle > columns ) {
124
124
String tableName = schemaTableName .getTableName ();
125
- if (!connection .sync ().list ().contains (tableName )) {
126
- List <Field > fields = columns .stream ().filter (c -> !c .getName ().equals ("_id" ))
125
+ if (!connection .sync ().ftList ().contains (tableName )) {
126
+ List <Field < String > > fields = columns .stream ().filter (c -> !c .getName ().equals ("_id" ))
127
127
.map (c -> buildField (c .getName (), c .getType ())).collect (Collectors .toList ());
128
128
CreateOptions .Builder <String , String > options = CreateOptions .<String , String >builder ();
129
129
options .prefix (tableName + ":" );
130
- connection .sync ().create (tableName , options .build (), fields .toArray (new Field [0 ] ));
130
+ connection .sync ().ftCreate (tableName , options .build (), fields .toArray (Field []:: new ));
131
131
}
132
132
}
133
133
134
134
public void dropTable (SchemaTableName tableName ) {
135
- connection .sync ().dropindexDeleteDocs (toRemoteTableName (tableName .getTableName ()));
135
+ connection .sync ().ftDropindexDeleteDocs (toRemoteTableName (tableName .getTableName ()));
136
136
tableCache .invalidate (tableName );
137
137
}
138
138
139
139
public void addColumn (SchemaTableName schemaTableName , ColumnMetadata columnMetadata ) {
140
140
String tableName = toRemoteTableName (schemaTableName .getTableName ());
141
- connection .sync ().alter (tableName , buildField (columnMetadata .getName (), columnMetadata .getType ()));
141
+ connection .sync ().ftAlter (tableName , buildField (columnMetadata .getName (), columnMetadata .getType ()));
142
142
tableCache .invalidate (schemaTableName );
143
143
}
144
144
@@ -167,12 +167,12 @@ private RediSearchTable loadTableSchema(SchemaTableName schemaTableName) {
167
167
}
168
168
Set <String > fields = new HashSet <>();
169
169
ImmutableList .Builder <RediSearchColumnHandle > columnHandles = ImmutableList .builder ();
170
- for (Field columnMetadata : indexInfo .get ().getFields ()) {
170
+ for (Field < String > columnMetadata : indexInfo .get ().getFields ()) {
171
171
RediSearchColumnHandle column = buildColumnHandle (columnMetadata );
172
172
fields .add (column .getName ());
173
173
columnHandles .add (column );
174
174
}
175
- SearchResults <String , String > results = connection .sync ().search (index , "*" );
175
+ SearchResults <String , String > results = connection .sync ().ftSearch (index , "*" );
176
176
for (Document <String , String > doc : results ) {
177
177
for (String field : doc .keySet ()) {
178
178
if (fields .contains (field )) {
@@ -188,7 +188,7 @@ private RediSearchTable loadTableSchema(SchemaTableName schemaTableName) {
188
188
189
189
private Optional <IndexInfo > indexInfo (String index ) {
190
190
try {
191
- List <Object > indexInfoList = connection .sync ().indexInfo (index );
191
+ List <Object > indexInfoList = connection .sync ().ftInfo (index );
192
192
if (indexInfoList != null ) {
193
193
return Optional .of (RedisModulesUtils .indexInfo (indexInfoList ));
194
194
}
@@ -198,7 +198,7 @@ private Optional<IndexInfo> indexInfo(String index) {
198
198
return Optional .empty ();
199
199
}
200
200
201
- private RediSearchColumnHandle buildColumnHandle (Field field ) {
201
+ private RediSearchColumnHandle buildColumnHandle (Field < String > field ) {
202
202
return buildColumnHandle (field .getName (), field .getType (), false );
203
203
}
204
204
@@ -222,32 +222,31 @@ public SearchResults<String, String> search(RediSearchTableHandle tableHandle,
222
222
options .limit (Limit .offset (0 ).num (limit (tableHandle )));
223
223
options .returnFields (columns .stream ().map (RediSearchColumnHandle ::getName ).toArray (String []::new ));
224
224
log .info ("Running search on index %s with query '%s'" , index , query );
225
- return connection .sync ().search (index , query , options .build ());
225
+ return connection .sync ().ftSearch (index , query , options .build ());
226
226
}
227
227
228
228
public AggregateWithCursorResults <String > aggregate (RediSearchTableHandle table ) {
229
229
String index = index (table );
230
230
String query = RediSearchQueryBuilder .buildQuery (table .getConstraint ());
231
- AggregateOptions .Builder <String , String > optionsBuilder = AggregateOptions .builder ();
232
- optionsBuilder . limit (Limit .offset (0 ).num (limit (table )));
231
+ AggregateOptions .Builder <String , String > options = AggregateOptions .builder ();
232
+ options . operation (Limit .offset (0 ).num (limit (table )));
233
233
Optional <Group > group = RediSearchQueryBuilder .group (table .getTermAggregations (),
234
234
table .getMetricAggregations ());
235
- group .ifPresent (optionsBuilder ::group );
236
- AggregateOptions <String , String > options = optionsBuilder .build ();
237
- log .info ("Running aggregation on index %s with query '%s' and %s" , index , query , options );
235
+ group .ifPresent (options ::operation );
236
+ log .info ("Running aggregation on index %s with query '%s' and %s" , index , query , options .build ());
238
237
CursorOptions .Builder cursorOptions = CursorOptions .builder ();
239
238
if (config .getCursorCount () > 0 ) {
240
239
cursorOptions .count (config .getCursorCount ());
241
240
}
242
- return connection .sync ().aggregate (index , query , cursorOptions .build (), options );
241
+ return connection .sync ().ftAggregate (index , query , cursorOptions .build (), options . build () );
243
242
}
244
243
245
244
public AggregateWithCursorResults <String > cursorRead (RediSearchTableHandle tableHandle , long cursor ) {
246
245
String index = index (tableHandle );
247
246
if (config .getCursorCount () > 0 ) {
248
- return connection .sync ().cursorRead (index , cursor , config .getCursorCount ());
247
+ return connection .sync ().ftCursorRead (index , cursor , config .getCursorCount ());
249
248
}
250
- return connection .sync ().cursorRead (index , cursor );
249
+ return connection .sync ().ftCursorRead (index , cursor );
251
250
}
252
251
253
252
private String index (RediSearchTableHandle tableHandle ) {
@@ -261,7 +260,7 @@ private long limit(RediSearchTableHandle tableHandle) {
261
260
return config .getDefaultLimit ();
262
261
}
263
262
264
- private Field buildField (String columnName , Type columnType ) {
263
+ private Field < String > buildField (String columnName , Type columnType ) {
265
264
Field .Type fieldType = toFieldType (columnType );
266
265
switch (fieldType ) {
267
266
case GEO :
@@ -338,7 +337,7 @@ private TypeSignature varcharType() {
338
337
}
339
338
340
339
public void cursorDelete (RediSearchTableHandle tableHandle , long cursor ) {
341
- connection .sync ().cursorDelete (index (tableHandle ), cursor );
340
+ connection .sync ().ftCursorDelete (index (tableHandle ), cursor );
342
341
}
343
342
344
343
}
0 commit comments