Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,16 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
val activeTypes = conf.availableStorageTypes

lazy val localOrDfsStorageAvailable: Boolean = {
localStorageAvailable || dfsStorageAvailable
}

lazy val localStorageAvailable: Boolean = {
StorageInfo.localDiskAvailable(activeTypes) || !diskInfos.isEmpty
}

lazy val dfsStorageAvailable: Boolean = {
StorageInfo.OSSAvailable(activeTypes) || StorageInfo.S3Available(activeTypes) ||
StorageInfo.HDFSAvailable(activeTypes) || StorageInfo.localDiskAvailable(activeTypes) ||
hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty || ossDir.nonEmpty
StorageInfo.HDFSAvailable(activeTypes) || hdfsDir.nonEmpty || s3Dir.nonEmpty || ossDir.nonEmpty
}

override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = this.synchronized {
Expand Down Expand Up @@ -1029,7 +1036,17 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
} else if (location.getStorageInfo.localDiskAvailable() || location.getStorageInfo.HDFSAvailable()
|| location.getStorageInfo.S3Available() || location.getStorageInfo.OSSAvailable()) {
logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
val createDiskFileResult = createDiskFile(
val createDiskFileResult = createLocalDiskFile(
location,
partitionDataWriterContext.getAppId,
partitionDataWriterContext.getShuffleId,
location.getFileName,
partitionDataWriterContext.getUserIdentifier,
partitionDataWriterContext.getPartitionType,
partitionDataWriterContext.isPartitionSplitEnabled)
(null, createDiskFileResult._1, createDiskFileResult._2, createDiskFileResult._3)
} else if (location.getStorageInfo.HDFSAvailable() || location.getStorageInfo.S3Available() || location.getStorageInfo.OSSAvailable()) {
val createDiskFileResult = createDfsDiskFile(
location,
partitionDataWriterContext.getAppId,
partitionDataWriterContext.getShuffleId,
Expand Down Expand Up @@ -1069,10 +1086,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
memoryFileInfo
}

/**
* @return (Flusher,DiskFileInfo,workingDir)
*/
def createDiskFile(
def createLocalDiskFile(
location: PartitionLocation,
appId: String,
shuffleId: Int,
Expand Down Expand Up @@ -1102,61 +1116,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint")
}

if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
val shuffleDir =
new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId")
FileSystem.mkdirs(
StorageManager.hadoopFs.get(StorageInfo.Type.HDFS),
shuffleDir,
hdfsPermission)
val hdfsFilePath = new Path(shuffleDir, fileName).toString
val hdfsFileInfo = new DiskFileInfo(
userIdentifier,
partitionSplitEnabled,
getFileMeta(partitionType, s"hdfs", conf.shuffleChunkSize),
hdfsFilePath,
StorageInfo.Type.HDFS)
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
fileName,
hdfsFileInfo)
return (hdfsFlusher.get, hdfsFileInfo, null)
} else if (dirs.isEmpty && location.getStorageInfo.S3Available()) {
val shuffleDir =
new Path(new Path(s3Dir, conf.workerWorkingDir), s"$appId/$shuffleId")
FileSystem.mkdirs(
StorageManager.hadoopFs.get(StorageInfo.Type.S3),
shuffleDir,
hdfsPermission)
val s3FilePath = new Path(shuffleDir, fileName).toString
val s3FileInfo = new DiskFileInfo(
userIdentifier,
partitionSplitEnabled,
new ReduceFileMeta(conf.shuffleChunkSize),
s3FilePath,
StorageInfo.Type.S3)
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
fileName,
s3FileInfo)
return (s3Flusher.get, s3FileInfo, null)
} else if (dirs.isEmpty && location.getStorageInfo.OSSAvailable()) {
val shuffleDir =
new Path(new Path(ossDir, conf.workerWorkingDir), s"$appId/$shuffleId")
FileSystem.mkdirs(
StorageManager.hadoopFs.get(StorageInfo.Type.OSS),
shuffleDir,
hdfsPermission)
val ossFilePath = new Path(shuffleDir, fileName).toString
val ossFileInfo = new DiskFileInfo(
userIdentifier,
partitionSplitEnabled,
new ReduceFileMeta(conf.shuffleChunkSize),
ossFilePath,
StorageInfo.Type.OSS)
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
fileName,
ossFileInfo)
return (ossFlusher.get, ossFileInfo, null)
} else if (dirs.nonEmpty && location.getStorageInfo.localDiskAvailable()) {
if (dirs.nonEmpty && location.getStorageInfo.localDiskAvailable()) {
val dir = dirs(getNextIndex % dirs.size)
val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, mountPoints)
val shuffleDir = new File(dir, s"$appId/$shuffleId")
Expand Down Expand Up @@ -1210,9 +1170,80 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
}
retryCount += 1
}
if (dfsStorageAvailable) {
logWarning("Failed to create localFileWriter", exception)
return (null, null, null)
}
throw exception
}

def createDfsDiskFile(
location: PartitionLocation,
appId: String,
shuffleId: Int,
fileName: String,
userIdentifier: UserIdentifier,
partitionType: PartitionType,
partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = {
val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
if (location.getStorageInfo.HDFSAvailable()) {
val shuffleDir =
new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId")
FileSystem.mkdirs(
StorageManager.hadoopFs.get(StorageInfo.Type.HDFS),
shuffleDir,
hdfsPermission)
val hdfsFilePath = new Path(shuffleDir, fileName).toString
val hdfsFileInfo = new DiskFileInfo(
userIdentifier,
partitionSplitEnabled,
getFileMeta(partitionType, s"hdfs", conf.shuffleChunkSize),
hdfsFilePath,
StorageInfo.Type.HDFS)
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
fileName,
hdfsFileInfo)
return (hdfsFlusher.get, hdfsFileInfo, null)
} else if (location.getStorageInfo.S3Available()) {
val shuffleDir =
new Path(new Path(s3Dir, conf.workerWorkingDir), s"$appId/$shuffleId")
FileSystem.mkdirs(
StorageManager.hadoopFs.get(StorageInfo.Type.S3),
shuffleDir,
hdfsPermission)
val s3FilePath = new Path(shuffleDir, fileName).toString
val s3FileInfo = new DiskFileInfo(
userIdentifier,
partitionSplitEnabled,
new ReduceFileMeta(conf.shuffleChunkSize),
s3FilePath,
StorageInfo.Type.S3)
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
fileName,
s3FileInfo)
return (s3Flusher.get, s3FileInfo, null)
} else if (location.getStorageInfo.OSSAvailable()) {
val shuffleDir =
new Path(new Path(ossDir, conf.workerWorkingDir), s"$appId/$shuffleId")
FileSystem.mkdirs(
StorageManager.hadoopFs.get(StorageInfo.Type.OSS),
shuffleDir,
hdfsPermission)
val ossFilePath = new Path(shuffleDir, fileName).toString
val ossFileInfo = new DiskFileInfo(
userIdentifier,
partitionSplitEnabled,
new ReduceFileMeta(conf.shuffleChunkSize),
ossFilePath,
StorageInfo.Type.OSS)
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
fileName,
ossFileInfo)
return (ossFlusher.get, ossFileInfo, null)
}
(null, null, null)
}

def startDeviceMonitor(): Unit = {
deviceMonitor.startCheck()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,20 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
} else {
null
}
case StorageInfo.Type.HDD | StorageInfo.Type.SSD | StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 =>
if (storageManager.localOrDfsStorageAvailable) {
logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
val (flusher, diskFileInfo, workingDir) = storageManager.createDiskFile(
case StorageInfo.Type.HDD | StorageInfo.Type.SSD =>
if (storageManager.localStorageAvailable) {
logDebug(s"create local disk file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
val (flusher, diskFileInfo, workingDir) = storageManager.createLocalDiskFile(
location,
partitionDataWriterContext.getAppId,
partitionDataWriterContext.getShuffleId,
location.getFileName,
partitionDataWriterContext.getUserIdentifier,
partitionDataWriterContext.getPartitionType,
partitionDataWriterContext.isPartitionSplitEnabled)
partitionDataWriterContext.setWorkingDir(workingDir)
val metaHandler = getPartitionMetaHandler(diskFileInfo)
if (flusher.isInstanceOf[LocalFlusher]
&& location.getStorageInfo.localDiskAvailable()) {
if (diskFileInfo != null) {
partitionDataWriterContext.setWorkingDir(workingDir)
val metaHandler = getPartitionMetaHandler(diskFileInfo)
new LocalTierWriter(
conf,
metaHandler,
Expand All @@ -147,6 +146,25 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
partitionDataWriterContext,
storageManager)
} else {
null
}
} else {
null
}
case StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 =>
if (storageManager.dfsStorageAvailable) {
logDebug(s"create dfs disk file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
val (flusher, diskFileInfo, workingDir) = storageManager.createDfsDiskFile(
location,
partitionDataWriterContext.getAppId,
partitionDataWriterContext.getShuffleId,
location.getFileName,
partitionDataWriterContext.getUserIdentifier,
partitionDataWriterContext.getPartitionType,
partitionDataWriterContext.isPartitionSplitEnabled)
if (diskFileInfo != null) {
partitionDataWriterContext.setWorkingDir(workingDir)
val metaHandler = getPartitionMetaHandler(diskFileInfo)
new DfsTierWriter(
conf,
metaHandler,
Expand All @@ -158,6 +176,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
diskFileInfo.getStorageType,
partitionDataWriterContext,
storageManager)
} else {
null
}
} else {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class StoragePolicyCase1 extends CelebornFunSuite {
val mockedFlusher = mock[Flusher]
val mockedFile = mock[File]
when(
mockedStorageManager.createDiskFile(
mockedStorageManager.createLocalDiskFile(
any(),
any(),
any(),
Expand Down Expand Up @@ -101,7 +101,6 @@ class StoragePolicyCase1 extends CelebornFunSuite {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
val conf = new CelebornConf()
val flushLock = new AnyRef
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD,HDD,HDFS,OSS,S3")
val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource)
val pendingWriters = new AtomicInteger()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
val mockedFlusher = mock[LocalFlusher]
val mockedFile = mock[File]
when(
mockedStorageManager.createDiskFile(
mockedStorageManager.createLocalDiskFile(
any(),
any(),
any(),
Expand Down Expand Up @@ -100,7 +100,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
test("test create file order case2") {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin)
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
when(mockedStorageManager.localStorageAvailable).thenAnswer(true)
when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.HDD)
val conf = new CelebornConf()
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "SSD,HDD,HDFS,OSS,S3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
val mockedFlusher = mock[LocalFlusher]
val mockedFile = mock[File]
when(
mockedStorageManager.createDiskFile(
mockedStorageManager.createLocalDiskFile(
any(),
any(),
any(),
Expand Down Expand Up @@ -100,11 +100,10 @@ class StoragePolicyCase3 extends CelebornFunSuite {
test("test getEvicted file case1") {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin)
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
when(mockedStorageManager.localStorageAvailable).thenAnswer(true)
when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.SSD)
val mockedMemoryFile = mock[LocalTierWriter]
val conf = new CelebornConf()
val flushLock = new AnyRef
val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource)
val pendingWriters = new AtomicInteger()
val notifier = new FlushNotifier
Expand All @@ -120,11 +119,10 @@ class StoragePolicyCase3 extends CelebornFunSuite {
test("test evict file case2") {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
when(mockedStorageManager.localStorageAvailable).thenAnswer(true)
when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.HDD)
val mockedMemoryFile = mock[LocalTierWriter]
val conf = new CelebornConf()
val flushLock = new AnyRef
val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource)
val pendingWriters = new AtomicInteger()
val notifier = new FlushNotifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
val mockedFlusher = mock[LocalFlusher]
val mockedFile = mock[File]
when(
mockedStorageManager.createDiskFile(
mockedStorageManager.createLocalDiskFile(
any(),
any(),
any(),
Expand Down Expand Up @@ -101,7 +101,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(
memoryDisabledHintPartitionLocation)
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
when(mockedStorageManager.localStorageAvailable).thenAnswer(true)
when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.SSD)
val conf = new CelebornConf()
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD,HDD,HDFS,OSS,S3")
Expand Down
Loading