Skip to content

Commit 1168832

Browse files
authored
Use ServerContext instead of Manager in fate util code (#5941)
The class o.a.a.manager.tableOps.Utils took a Manager type for most of its methods and only used the ServerContext. Modified these methods to take a ServerContext intead. Making this change as the beginning of an effort to remove direct usage of the Manager type in fate operations.
1 parent 8e3ca10 commit 1168832

33 files changed

+165
-144
lines changed

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/ChangeTableState.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ public ChangeTableState(NamespaceId namespaceId, TableId tableId, TableOperation
5555
public long isReady(FateId fateId, Manager env) throws Exception {
5656
// reserve the table so that this op does not run concurrently with create, clone, or delete
5757
// table
58-
return Utils.reserveNamespace(env, namespaceId, fateId, LockType.READ, true, top)
59-
+ Utils.reserveTable(env, tableId, fateId, LockType.WRITE, true, top, LockRange.infinite());
58+
return Utils.reserveNamespace(env.getContext(), namespaceId, fateId, LockType.READ, true, top)
59+
+ Utils.reserveTable(env.getContext(), tableId, fateId, LockType.WRITE, true, top,
60+
LockRange.infinite());
6061
}
6162

6263
@Override
@@ -67,16 +68,16 @@ public Repo<Manager> call(FateId fateId, Manager env) {
6768
}
6869

6970
env.getTableManager().transitionTableState(tableId, ts, expectedCurrStates);
70-
Utils.unreserveNamespace(env, namespaceId, fateId, LockType.READ);
71-
Utils.unreserveTable(env, tableId, fateId, LockType.WRITE);
71+
Utils.unreserveNamespace(env.getContext(), namespaceId, fateId, LockType.READ);
72+
Utils.unreserveTable(env.getContext(), tableId, fateId, LockType.WRITE);
7273
LoggerFactory.getLogger(ChangeTableState.class).debug("Changed table state {} {}", tableId, ts);
7374
env.getEventCoordinator().event(tableId, "Set table state of %s to %s", tableId, ts);
7475
return null;
7576
}
7677

7778
@Override
7879
public void undo(FateId fateId, Manager env) {
79-
Utils.unreserveNamespace(env, namespaceId, fateId, LockType.READ);
80-
Utils.unreserveTable(env, tableId, fateId, LockType.WRITE);
80+
Utils.unreserveNamespace(env.getContext(), namespaceId, fateId, LockType.READ);
81+
Utils.unreserveTable(env.getContext(), tableId, fateId, LockType.WRITE);
8182
}
8283
}

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import org.apache.accumulo.core.fate.zookeeper.ZooReservation;
5555
import org.apache.accumulo.core.metadata.schema.Ample;
5656
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
57-
import org.apache.accumulo.manager.Manager;
5857
import org.apache.accumulo.server.ServerContext;
5958
import org.apache.hadoop.fs.FileSystem;
6059
import org.apache.hadoop.fs.Path;
@@ -140,13 +139,14 @@ public static LockRange widen(Ample ample, TableId tableId, LockRange range, Tab
140139

141140
}
142141

143-
public static long reserveTable(Manager env, TableId tableId, FateId fateId, LockType lockType,
144-
boolean tableMustExist, TableOperation op) throws Exception {
145-
return reserveTable(env, tableId, fateId, lockType, tableMustExist, op, LockRange.infinite());
142+
public static long reserveTable(ServerContext ctx, TableId tableId, FateId fateId,
143+
LockType lockType, boolean tableMustExist, TableOperation op) throws Exception {
144+
return reserveTable(ctx, tableId, fateId, lockType, tableMustExist, op, LockRange.infinite());
146145
}
147146

148-
public static long reserveTable(Manager env, TableId tableId, FateId fateId, LockType lockType,
149-
boolean tableMustExist, TableOperation op, final LockRange range) throws Exception {
147+
public static long reserveTable(ServerContext ctx, TableId tableId, FateId fateId,
148+
LockType lockType, boolean tableMustExist, TableOperation op, final LockRange range)
149+
throws Exception {
150150
final LockRange widenedRange;
151151

152152
boolean shouldWiden = lockType == LockType.WRITE || op == TableOperation.COMPACT;
@@ -174,13 +174,13 @@ public static long reserveTable(Manager env, TableId tableId, FateId fateId, Loc
174174
*
175175
* Widening is done for compactions because those operations widen their range.
176176
*/
177-
widenedRange = widen(env.getContext().getAmple(), tableId, range, op, tableMustExist);
177+
widenedRange = widen(ctx.getAmple(), tableId, range, op, tableMustExist);
178178
log.debug("{} widened write lock range from {} to {}", fateId, range, widenedRange);
179179
} else {
180180
widenedRange = range;
181181
}
182182

183-
var lock = getLock(env.getContext(), tableId, fateId, lockType, widenedRange);
183+
var lock = getLock(ctx, tableId, fateId, lockType, widenedRange);
184184
if (shouldWiden && !widenedRange.equals(lock.getRange())) {
185185
// It is possible the range changed since the lock entry was created. Pre existing locks are
186186
// found using the fate id and could have a different range.
@@ -201,7 +201,7 @@ public static long reserveTable(Manager env, TableId tableId, FateId fateId, Loc
201201
// it means the table splits changed so release the lock and try again later. The table
202202
// splits in this range can not change once the lock is acquired, so this recheck is done
203203
// after getting the lock.
204-
var widenedRange2 = widen(env.getContext().getAmple(), tableId, range, op, tableMustExist);
204+
var widenedRange2 = widen(ctx.getAmple(), tableId, range, op, tableMustExist);
205205
if (!widenedRange.equals(widenedRange2)) {
206206
lock.unlock();
207207
log.info(
@@ -212,7 +212,7 @@ public static long reserveTable(Manager env, TableId tableId, FateId fateId, Loc
212212
}
213213

214214
if (tableMustExist) {
215-
ZooReaderWriter zk = env.getContext().getZooSession().asReaderWriter();
215+
ZooReaderWriter zk = ctx.getZooSession().asReaderWriter();
216216
if (!zk.exists(Constants.ZTABLES + "/" + tableId)) {
217217
throw new AcceptableThriftTableOperationException(tableId.canonical(), "", op,
218218
TableOperationExceptionType.NOTFOUND, "Table does not exist");
@@ -232,23 +232,23 @@ public static long reserveTable(Manager env, TableId tableId, FateId fateId, Loc
232232
}
233233
}
234234

235-
public static void unreserveTable(Manager env, TableId tableId, FateId fateId,
235+
public static void unreserveTable(ServerContext ctx, TableId tableId, FateId fateId,
236236
LockType lockType) {
237-
getLock(env.getContext(), tableId, fateId, lockType, LockRange.infinite()).unlock();
237+
getLock(ctx, tableId, fateId, lockType, LockRange.infinite()).unlock();
238238
log.info("table {} {} unlocked for {}", tableId, fateId, lockType);
239239
}
240240

241-
public static void unreserveNamespace(Manager env, NamespaceId namespaceId, FateId fateId,
241+
public static void unreserveNamespace(ServerContext ctx, NamespaceId namespaceId, FateId fateId,
242242
LockType lockType) {
243-
getLock(env.getContext(), namespaceId, fateId, lockType, LockRange.infinite()).unlock();
243+
getLock(ctx, namespaceId, fateId, lockType, LockRange.infinite()).unlock();
244244
log.info("namespace {} {} unlocked for {}", namespaceId, fateId, lockType);
245245
}
246246

247-
public static long reserveNamespace(Manager env, NamespaceId namespaceId, FateId fateId,
247+
public static long reserveNamespace(ServerContext ctx, NamespaceId namespaceId, FateId fateId,
248248
LockType lockType, boolean mustExist, TableOperation op) throws Exception {
249-
if (getLock(env.getContext(), namespaceId, fateId, lockType, LockRange.infinite()).tryLock()) {
249+
if (getLock(ctx, namespaceId, fateId, lockType, LockRange.infinite()).tryLock()) {
250250
if (mustExist) {
251-
ZooReaderWriter zk = env.getContext().getZooSession().asReaderWriter();
251+
ZooReaderWriter zk = ctx.getZooSession().asReaderWriter();
252252
if (!zk.exists(Constants.ZNAMESPACES + "/" + namespaceId)) {
253253
throw new AcceptableThriftTableOperationException(namespaceId.canonical(), "", op,
254254
TableOperationExceptionType.NAMESPACE_NOTFOUND, "Namespace does not exist");
@@ -261,10 +261,10 @@ public static long reserveNamespace(Manager env, NamespaceId namespaceId, FateId
261261
}
262262
}
263263

264-
public static long reserveHdfsDirectory(Manager env, String directory, FateId fateId)
264+
public static long reserveHdfsDirectory(ServerContext ctx, String directory, FateId fateId)
265265
throws KeeperException, InterruptedException {
266266

267-
ZooReaderWriter zk = env.getContext().getZooSession().asReaderWriter();
267+
ZooReaderWriter zk = ctx.getZooSession().asReaderWriter();
268268

269269
if (ZooReservation.attempt(zk, Constants.ZHDFS_RESERVATIONS + "/"
270270
+ Base64.getEncoder().encodeToString(directory.getBytes(UTF_8)), fateId, "")) {
@@ -276,12 +276,10 @@ public static long reserveHdfsDirectory(Manager env, String directory, FateId fa
276276
}
277277
}
278278

279-
public static void unreserveHdfsDirectory(Manager env, String directory, FateId fateId)
279+
public static void unreserveHdfsDirectory(ServerContext ctx, String directory, FateId fateId)
280280
throws KeeperException, InterruptedException {
281-
ZooReservation.release(env.getContext().getZooSession().asReaderWriter(),
282-
Constants.ZHDFS_RESERVATIONS + "/"
283-
+ Base64.getEncoder().encodeToString(directory.getBytes(UTF_8)),
284-
fateId);
281+
ZooReservation.release(ctx.getZooSession().asReaderWriter(), Constants.ZHDFS_RESERVATIONS + "/"
282+
+ Base64.getEncoder().encodeToString(directory.getBytes(UTF_8)), fateId);
285283
log.trace("{} unreserved bulk dir {}", fateId, directory);
286284
}
287285

@@ -308,9 +306,9 @@ private static DistributedLock getLock(ServerContext context, AbstractId<?> id,
308306
return lock;
309307
}
310308

311-
public static DistributedLock getReadLock(Manager env, AbstractId<?> id, FateId fateId,
309+
public static DistributedLock getReadLock(ServerContext ctx, AbstractId<?> id, FateId fateId,
312310
LockRange range) {
313-
return Utils.getLock(env.getContext(), id, fateId, LockType.READ, range);
311+
return Utils.getLock(ctx, id, fateId, LockType.READ, range);
314312
}
315313

316314
/**
@@ -320,9 +318,9 @@ public static DistributedLock getReadLock(Manager env, AbstractId<?> id, FateId
320318
*
321319
* @param path the fully-qualified path
322320
*/
323-
public static SortedSet<Text> getSortedSetFromFile(Manager manager, Path path, boolean encoded)
321+
public static SortedSet<Text> getSortedSetFromFile(ServerContext ctx, Path path, boolean encoded)
324322
throws IOException {
325-
FileSystem fs = path.getFileSystem(manager.getContext().getHadoopConf());
323+
FileSystem fs = path.getFileSystem(ctx.getHadoopConf());
326324
var data = new TreeSet<Text>();
327325
try (var file = new java.util.Scanner(fs.open(path), UTF_8)) {
328326
while (file.hasNextLine()) {

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/LockTable.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ public LockTable(TableId tableId, NamespaceId namespaceId, TRange range,
4848

4949
@Override
5050
public long isReady(FateId fateId, Manager manager) throws Exception {
51-
return Utils.reserveNamespace(manager, namespaceId, fateId,
51+
return Utils.reserveNamespace(manager.getContext(), namespaceId, fateId,
5252
DistributedReadWriteLock.LockType.READ, true, TableOperation.SET_TABLET_AVAILABILITY)
53-
+ Utils.reserveTable(manager, tableId, fateId, DistributedReadWriteLock.LockType.WRITE,
54-
true, TableOperation.SET_TABLET_AVAILABILITY);
53+
+ Utils.reserveTable(manager.getContext(), tableId, fateId,
54+
DistributedReadWriteLock.LockType.WRITE, true, TableOperation.SET_TABLET_AVAILABILITY);
5555
}
5656

5757
@Override
@@ -61,7 +61,9 @@ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
6161

6262
@Override
6363
public void undo(FateId fateId, Manager manager) throws Exception {
64-
Utils.unreserveNamespace(manager, namespaceId, fateId, DistributedReadWriteLock.LockType.READ);
65-
Utils.unreserveTable(manager, tableId, fateId, DistributedReadWriteLock.LockType.WRITE);
64+
Utils.unreserveNamespace(manager.getContext(), namespaceId, fateId,
65+
DistributedReadWriteLock.LockType.READ);
66+
Utils.unreserveTable(manager.getContext(), tableId, fateId,
67+
DistributedReadWriteLock.LockType.WRITE);
6668
}
6769
}

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/availability/SetTabletAvailability.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,8 @@ public long isReady(FateId fateId, Manager manager) throws Exception {
150150

151151
@Override
152152
public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
153-
Utils.unreserveNamespace(manager, namespaceId, fateId, LockType.READ);
154-
Utils.unreserveTable(manager, tableId, fateId, LockType.WRITE);
153+
Utils.unreserveNamespace(manager.getContext(), namespaceId, fateId, LockType.READ);
154+
Utils.unreserveTable(manager.getContext(), tableId, fateId, LockType.WRITE);
155155
return null;
156156
}
157157
}

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
7575
firstSplit, lastSplit);
7676
removeBulkLoadEntries(ample, info.tableId, fateId, firstSplit, lastSplit);
7777

78-
Utils.unreserveHdfsDirectory(manager, info.sourceDir, fateId);
79-
Utils.getReadLock(manager, info.tableId, fateId, LockRange.infinite()).unlock();
78+
Utils.unreserveHdfsDirectory(manager.getContext(), info.sourceDir, fateId);
79+
Utils.getReadLock(manager.getContext(), info.tableId, fateId, LockRange.infinite()).unlock();
8080
// delete json renames and mapping files
8181
Path renamingFile = new Path(bulkDir, Constants.BULK_RENAME_FILE);
8282
Path mappingFile = new Path(bulkDir, Constants.BULK_LOAD_MAPPING);

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public PrepBulkImport(BulkInfo info) {
8888

8989
@Override
9090
public long isReady(FateId fateId, Manager manager) throws Exception {
91-
long wait = Utils.reserveTable(manager, bulkInfo.tableId, fateId,
91+
long wait = Utils.reserveTable(manager.getContext(), bulkInfo.tableId, fateId,
9292
DistributedReadWriteLock.LockType.READ, true, TableOperation.BULK_IMPORT,
9393
LockRange.of(bulkInfo.firstSplit, bulkInfo.lastSplit));
9494
if (wait > 0) {
@@ -99,7 +99,7 @@ public long isReady(FateId fateId, Manager manager) throws Exception {
9999
return 500;
100100
}
101101

102-
return Utils.reserveHdfsDirectory(manager, bulkInfo.sourceDir, fateId);
102+
return Utils.reserveHdfsDirectory(manager.getContext(), bulkInfo.sourceDir, fateId);
103103
}
104104

105105
@VisibleForTesting
@@ -319,8 +319,9 @@ private Path createNewBulkDir(ServerContext context, VolumeManager fs, TableId t
319319
@Override
320320
public void undo(FateId fateId, Manager environment) throws Exception {
321321
// unreserve sourceDir/error directories
322-
Utils.unreserveHdfsDirectory(environment, bulkInfo.sourceDir, fateId);
323-
Utils.getReadLock(environment, bulkInfo.tableId, fateId, LockRange.infinite()).unlock();
322+
Utils.unreserveHdfsDirectory(environment.getContext(), bulkInfo.sourceDir, fateId);
323+
Utils.getReadLock(environment.getContext(), bulkInfo.tableId, fateId, LockRange.infinite())
324+
.unlock();
324325
environment.removeBulkImportStatus(bulkInfo.sourceDir);
325326
}
326327
}

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneTable.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ public CloneTable(String user, NamespaceId srcNamespaceId, TableId srcTableId,
4545

4646
@Override
4747
public long isReady(FateId fateId, Manager environment) throws Exception {
48-
long val = Utils.reserveNamespace(environment, cloneInfo.getNamespaceId(), fateId,
48+
long val = Utils.reserveNamespace(environment.getContext(), cloneInfo.getNamespaceId(), fateId,
49+
LockType.READ, true, TableOperation.CLONE);
50+
val += Utils.reserveTable(environment.getContext(), cloneInfo.getSrcTableId(), fateId,
4951
LockType.READ, true, TableOperation.CLONE);
50-
val += Utils.reserveTable(environment, cloneInfo.getSrcTableId(), fateId, LockType.READ, true,
51-
TableOperation.CLONE);
5252
return val;
5353
}
5454

@@ -62,8 +62,10 @@ public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
6262

6363
@Override
6464
public void undo(FateId fateId, Manager environment) {
65-
Utils.unreserveNamespace(environment, cloneInfo.getNamespaceId(), fateId, LockType.READ);
66-
Utils.unreserveTable(environment, cloneInfo.getSrcTableId(), fateId, LockType.READ);
65+
Utils.unreserveNamespace(environment.getContext(), cloneInfo.getNamespaceId(), fateId,
66+
LockType.READ);
67+
Utils.unreserveTable(environment.getContext(), cloneInfo.getSrcTableId(), fateId,
68+
LockType.READ);
6769
}
6870

6971
}

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/CloneZookeeper.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@ public CloneZookeeper(CloneInfo cloneInfo, ClientContext context)
4343
public long isReady(FateId fateId, Manager environment) throws Exception {
4444
long val = 0;
4545
if (!cloneInfo.getSrcNamespaceId().equals(cloneInfo.getNamespaceId())) {
46-
val += Utils.reserveNamespace(environment, cloneInfo.getNamespaceId(), fateId, LockType.READ,
47-
true, TableOperation.CLONE);
46+
val += Utils.reserveNamespace(environment.getContext(), cloneInfo.getNamespaceId(), fateId,
47+
LockType.READ, true, TableOperation.CLONE);
4848
}
49-
val += Utils.reserveTable(environment, cloneInfo.getTableId(), fateId, LockType.WRITE, false,
50-
TableOperation.CLONE);
49+
val += Utils.reserveTable(environment.getContext(), cloneInfo.getTableId(), fateId,
50+
LockType.WRITE, false, TableOperation.CLONE);
5151
return val;
5252
}
5353

@@ -69,9 +69,10 @@ public Repo<Manager> call(FateId fateId, Manager environment) throws Exception {
6969
public void undo(FateId fateId, Manager environment) throws Exception {
7070
environment.getTableManager().removeTable(cloneInfo.getTableId(), cloneInfo.getNamespaceId());
7171
if (!cloneInfo.getSrcNamespaceId().equals(cloneInfo.getNamespaceId())) {
72-
Utils.unreserveNamespace(environment, cloneInfo.getNamespaceId(), fateId, LockType.READ);
72+
Utils.unreserveNamespace(environment.getContext(), cloneInfo.getNamespaceId(), fateId,
73+
LockType.READ);
7374
}
74-
Utils.unreserveTable(environment, cloneInfo.getTableId(), fateId, LockType.WRITE);
75+
Utils.unreserveTable(environment.getContext(), cloneInfo.getTableId(), fateId, LockType.WRITE);
7576
environment.getContext().clearTableListCache();
7677
}
7778

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/clone/FinishCloneTable.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,15 @@ public Repo<Manager> call(FateId fateId, Manager environment) {
6262
expectedCurrStates);
6363
}
6464

65-
Utils.unreserveNamespace(environment, cloneInfo.getSrcNamespaceId(), fateId, LockType.READ);
65+
Utils.unreserveNamespace(environment.getContext(), cloneInfo.getSrcNamespaceId(), fateId,
66+
LockType.READ);
6667
if (!cloneInfo.getSrcNamespaceId().equals(cloneInfo.getNamespaceId())) {
67-
Utils.unreserveNamespace(environment, cloneInfo.getNamespaceId(), fateId, LockType.READ);
68+
Utils.unreserveNamespace(environment.getContext(), cloneInfo.getNamespaceId(), fateId,
69+
LockType.READ);
6870
}
69-
Utils.unreserveTable(environment, cloneInfo.getSrcTableId(), fateId, LockType.READ);
70-
Utils.unreserveTable(environment, cloneInfo.getTableId(), fateId, LockType.WRITE);
71+
Utils.unreserveTable(environment.getContext(), cloneInfo.getSrcTableId(), fateId,
72+
LockType.READ);
73+
Utils.unreserveTable(environment.getContext(), cloneInfo.getTableId(), fateId, LockType.WRITE);
7174

7275
environment.getEventCoordinator().event(cloneInfo.getTableId(), "Cloned table %s from %s",
7376
cloneInfo.getTableName(), cloneInfo.getSrcTableId());

server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CleanUp.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ public long isReady(FateId fateId, Manager manager) throws Exception {
121121
@Override
122122
public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
123123
CompactionConfigStorage.deleteConfig(manager.getContext(), fateId);
124-
Utils.getReadLock(manager, tableId, fateId, LockRange.infinite()).unlock();
125-
Utils.getReadLock(manager, namespaceId, fateId, LockRange.infinite()).unlock();
124+
Utils.getReadLock(manager.getContext(), tableId, fateId, LockRange.infinite()).unlock();
125+
Utils.getReadLock(manager.getContext(), namespaceId, fateId, LockRange.infinite()).unlock();
126126
return null;
127127
}
128128
}

0 commit comments

Comments
 (0)