11import { Injectable , Logger } from '@nestjs/common' ;
22import { MinioService } from 'omniboxd/minio/minio.service' ;
33import { ConfigService } from '@nestjs/config' ;
4+ import { buffer } from 'node:stream/consumers' ;
45
56@Injectable ( )
67export class ChunkManagerService {
@@ -26,7 +27,7 @@ export class ChunkManagerService {
2627 const buffer = Buffer . from ( data , 'base64' ) ;
2728
2829 try {
29- await this . minioService . putChunkObject ( chunkPath , buffer ) ;
30+ await this . minioService . putObject ( chunkPath , buffer ) ;
3031 this . logger . debug (
3132 `Stored chunk ${ chunkIndex + 1 } /${ totalChunks } for task ${ taskId } ` ,
3233 ) ;
@@ -40,32 +41,13 @@ export class ChunkManagerService {
4041 }
4142
4243 async assembleChunks ( taskId : string , totalChunks : number ) : Promise < string > {
43- const assembledPath = this . getAssembledPath ( taskId ) ;
44- const chunkPaths = Array . from ( { length : totalChunks } , ( _ , i ) =>
45- this . getChunkPath ( taskId , i ) ,
46- ) ;
47-
48- try {
49- // Use MinIO's composeObject to merge all chunks
50- await this . minioService . composeObject ( assembledPath , chunkPaths ) ;
51-
52- // Retrieve the assembled data
53- const stream = await this . minioService . getObject ( assembledPath ) ;
54- const chunks : Buffer [ ] = [ ] ;
55-
56- return new Promise ( ( resolve , reject ) => {
57- stream . on ( 'data' , ( chunk ) => chunks . push ( chunk ) ) ;
58- stream . on ( 'end' , ( ) => {
59- const assembledBuffer = Buffer . concat ( chunks ) ;
60- const assembledData = assembledBuffer . toString ( 'utf-8' ) ;
61- resolve ( assembledData ) ;
62- } ) ;
63- stream . on ( 'error' , reject ) ;
64- } ) ;
65- } catch ( error ) {
66- this . logger . error ( `Failed to assemble chunks for task ${ taskId } :` , error ) ;
67- throw error ;
44+ const buffers : Buffer [ ] = [ ] ;
45+ for ( let i = 0 ; i < totalChunks ; i ++ ) {
46+ const chunkPath = this . getChunkPath ( taskId , i ) ;
47+ const stream = await this . minioService . getObject ( chunkPath ) ;
48+ buffers . push ( await buffer ( stream ) ) ;
6849 }
50+ return Buffer . concat ( buffers ) . toString ( 'utf-8' ) ;
6951 }
7052
7153 cleanupChunks ( taskId : string , totalChunks : number ) : void {
@@ -87,15 +69,10 @@ export class ChunkManagerService {
8769 try {
8870 const objectsToRemove : string [ ] = [ ] ;
8971
90- // Add chunk paths
9172 for ( let i = 0 ; i < totalChunks ; i ++ ) {
9273 objectsToRemove . push ( this . getChunkPath ( taskId , i ) ) ;
9374 }
9475
95- // Add assembled path
96- objectsToRemove . push ( this . getAssembledPath ( taskId ) ) ;
97-
98- // Remove all objects
9976 await Promise . all (
10077 objectsToRemove . map ( ( objectName ) =>
10178 this . minioService . removeObject ( objectName ) . catch ( ( error ) => {
@@ -113,8 +90,4 @@ export class ChunkManagerService {
11390 private getChunkPath ( taskId : string , chunkIndex : number ) : string {
11491 return `wizard-chunks/${ taskId } /chunk-${ chunkIndex . toString ( ) . padStart ( 6 , '0' ) } ` ;
11592 }
116-
117- private getAssembledPath ( taskId : string ) : string {
118- return `wizard-chunks/${ taskId } /assembled` ;
119- }
12093}
0 commit comments