diff --git a/.clang-format b/.clang-format index 82162e6fea..b5586086f0 100644 --- a/.clang-format +++ b/.clang-format @@ -5,15 +5,15 @@ AccessModifierOffset: -3 AlignAfterOpenBracket: Align AlignConsecutiveMacros: false AlignConsecutiveAssignments: false -AlignConsecutiveBitFields: false +# AlignConsecutiveBitFields: false AlignConsecutiveDeclarations: false AlignEscapedNewlines: Left -AlignOperands: Align +AlignOperands: true AlignTrailingComments: true AllowAllArgumentsOnNextLine: true AllowAllConstructorInitializersOnNextLine: true AllowAllParametersOfDeclarationOnNextLine: true -AllowShortEnumsOnASingleLine: true +# AllowShortEnumsOnASingleLine: true AllowShortBlocksOnASingleLine: Never AllowShortCaseLabelsOnASingleLine: false AllowShortFunctionsOnASingleLine: All @@ -39,8 +39,8 @@ BraceWrapping: AfterExternBlock: false BeforeCatch: false BeforeElse: false - BeforeLambdaBody: false - BeforeWhile: false + # BeforeLambdaBody: false + # BeforeWhile: false IndentBraces: false SplitEmptyFunction: true SplitEmptyRecord: true @@ -87,13 +87,13 @@ IncludeCategories: IncludeIsMainRegex: '([-_](test|unittest))?$' IncludeIsMainSourceRegex: '' IndentCaseLabels: false -IndentCaseBlocks: true +# IndentCaseBlocks: true IndentGotoLabels: true IndentPPDirectives: None -IndentExternBlock: AfterExternBlock +# IndentExternBlock: AfterExternBlock IndentWidth: 4 IndentWrappedFunctionNames: false -InsertTrailingCommas: None +# InsertTrailingCommas: None JavaScriptQuotes: Leave JavaScriptWrapImports: true KeepEmptyLinesAtTheStartOfBlocks: false @@ -103,7 +103,7 @@ MaxEmptyLinesToKeep: 1 NamespaceIndentation: None ObjCBinPackProtocolList: Never ObjCBlockIndentWidth: 2 -ObjCBreakBeforeNestedBlockParam: true +# ObjCBreakBeforeNestedBlockParam: true ObjCSpaceAfterProperty: false ObjCSpaceBeforeProtocolList: true PenaltyBreakAssignment: 2 @@ -174,9 +174,9 @@ StatementMacros: TabWidth: 4 UseCRLF: false UseTab: Never -WhitespaceSensitiveMacros: - - STRINGIZE - - PP_STRINGIZE - - BOOST_PP_STRINGIZE +# WhitespaceSensitiveMacros: + # - STRINGIZE + # - PP_STRINGIZE + # - BOOST_PP_STRINGIZE ... diff --git a/CMakeLists.txt b/CMakeLists.txt index 10c7b6f22b..3ac569531a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,10 +3,10 @@ project(TuGraph C CXX) message("Community version.") -# options +#options include(Options.cmake) -# env +#env set(LGRAPH_ROOT_DIR ${CMAKE_CURRENT_LIST_DIR}) set(DEPS_INCLUDE_DIR ${CMAKE_CURRENT_LIST_DIR}/deps/install/include) set(DEPS_LIBRARY_DIR ${CMAKE_CURRENT_LIST_DIR}/deps/install/lib) @@ -24,12 +24,12 @@ elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin") link_directories(${DEPS_LOCAL_LIBRARY_DIR}) endif () -# output +#output set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/output) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/output) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/output) -# third-party +#third - party if(NOT DEFINED TUGRAPH_THIRD_PARTY_DIR) set(TUGRAPH_THIRD_PARTY_DIR "/usr/local;/usr") endif() @@ -37,14 +37,14 @@ if (NOT DEFINED GEAX_THIRD_PARTY_DIR) set(GEAX_THIRD_PARTY_DIR ${TUGRAPH_THIRD_PARTY_DIR}) endif() -# modules +#modules list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) find_package(OpenSSL REQUIRED) -# boost +#boost set(Boost_USE_STATIC_LIBS ON) find_package(Boost 1.68 REQUIRED COMPONENTS log system filesystem) -# target +#target set(GEAX_ENABLE_GLOG OFF) add_subdirectory(deps/geax-front-end) add_subdirectory(src) @@ -55,7 +55,7 @@ if (BUILD_PROCEDURE) add_subdirectory(procedures) endif (BUILD_PROCEDURE) -# unit_test +#unit_test if (WITH_TESTS) add_subdirectory(test) endif (WITH_TESTS) diff --git a/CPPLINT.cfg b/CPPLINT.cfg index 7aadf56cc7..c28572788c 100644 --- a/CPPLINT.cfg +++ b/CPPLINT.cfg @@ -1,5 +1,4 @@ -set noparent -linelength=100 -filter=-build/include_what_you_use,-build/namespaces,-build/c++11,-build/c++14,-runtime/references,-readability/casting -exclude_files=benchmark/* +set noparent linelength = 100 filter = -build / include_what_you_use, -build / namespaces, + -build / c++ 11, -build / c++ 14, -runtime / references, + -readability / casting exclude_files = benchmark/* exclude_files=demo/* diff --git a/Options.cmake b/Options.cmake index 3983726462..97d24f25c9 100644 --- a/Options.cmake +++ b/Options.cmake @@ -2,7 +2,7 @@ set(LGRAPH_VERSION_MAJOR 4) set(LGRAPH_VERSION_MINOR 3) set(LGRAPH_VERSION_PATCH 0) -# options +#options option(ENABLE_WALL "Enable all compiler's warning messages." ON) if (ENABLE_WALL) message("Wall is enabled.") @@ -88,13 +88,13 @@ if (WITH_TESTS) message("Build with tests.") endif (WITH_TESTS) -# disable krb5 +#disable krb5 set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DOPENSSL_NO_KRB5=1") -# remove prefix in macro __FILE__ +#remove prefix in macro __FILE__ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fmacro-prefix-map=${CMAKE_CURRENT_LIST_DIR}/=") -# Detect build type, fallback to release and throw a warning if use didn't specify any +#Detect build type, fallback to release and throw a warning if use didn't specify any if (NOT CMAKE_BUILD_TYPE) message(WARNING "Build type not set, falling back to RelWithDebInfo mode. To specify build type use: @@ -105,17 +105,17 @@ if (NOT CMAKE_BUILD_TYPE) endif (NOT CMAKE_BUILD_TYPE) message("CMAKE_BUILD_TYPE ${CMAKE_BUILD_TYPE}") -# coverage flags +#coverage flags SET(CMAKE_CXX_FLAGS_COVERAGE "${CMAKE_CXX_FLAGS} -fprofile-arcs -ftest-coverage -O3") SET(CMAKE_EXE_LINKER_FLAGS_COVERAGE "${CMAKE_EXE_LINKER_FLAGS} -lgcov -O3") MARK_AS_ADVANCED( CMAKE_CXX_FLAGS_COVERAGE CMAKE_EXE_LINKER_FLAGS_COVERAGE) -# support PRId64 +#support PRId64 set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D__STDC_FORMAT_MACROS") -# make sure cmake prefers static lib +#make sure cmake prefers static lib IF (WIN32) SET(CMAKE_FIND_LIBRARY_SUFFIXES .lib .a ${CMAKE_FIND_LIBRARY_SUFFIXES}) ELSE (WIN32) @@ -125,15 +125,15 @@ ENDIF (WIN32) ## --------------------------- ## check compiler options ## --------------------------- -# check compiler version +#check compiler version if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") - # require at least gcc 4.8 +#require at least gcc 4.8 if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 5.0) message(FATAL_ERROR "GCC is too old, requires GCC 5.0 or above.") endif () elseif (CMAKE_CXX_COMPILER_ID MATCHES "Clang") - # CMAKE_CXX_COMPILER_ID might be AppleClang, use "MATCHES" instead of "STREQUAL" - # require at least clang 3.3 +#CMAKE_CXX_COMPILER_ID might be AppleClang, use "MATCHES" instead of "STREQUAL" +#require at least clang 3.3 if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS 3.3) message(FATAL_ERROR "Clang is too old, requires Clang 3.3 or above.") endif () @@ -142,11 +142,11 @@ else () message(WARNING "You are using an unsupported compiler! Compilation has only been tested with Clang and GCC.") endif () -# check OpenMP +#check OpenMP set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp") -# compiling rocksdb needs c++17 -# check c++17 +#compiling rocksdb needs c++ 17 +#check c++ 17 include(CheckCXXCompilerFlag) CHECK_CXX_COMPILER_FLAG("-std=c++17" COMPILER_SUPPORTS_CXX17) if (COMPILER_SUPPORTS_CXX17) @@ -157,8 +157,8 @@ endif () set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) -# GNU: static link libstdc++ and libgcc -# Clang: static link libc++ +#GNU : static link libstdc++and libgcc +#Clang : static link libc++ if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") CHECK_CXX_COMPILER_FLAG("-static-libstdc++ -static-libgcc" COMPILER_SUPPORTS_STATIC_GCC) if (COMPILER_SUPPORTS_STATIC_GCC) diff --git a/README.md b/README.md index 6a4eb371a8..48a118c344 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# TuGraph +#TuGraph [![Release](https://shields.io/github/v/release/tugraph-family/tugraph-db.svg?logo=stackblitz&label=Version&color=red)](https://github.com/TuGraph-family/tugraph-db/releases) [![UT&&IT](https://github.com/TuGraph-family/tugraph-db/actions/workflows/ci.yml/badge.svg)](https://github.com/TuGraph-family/tugraph-db/actions/workflows/ci.yml) @@ -106,6 +106,3 @@ Slack (For developer quick communication): Contact us via dingtalk, wechat, email and telephone: ![contacts](./docs/images/contact-en.png) - - - diff --git a/README_CN.md b/README_CN.md index 652cd4eb7a..78f17e3846 100644 --- a/README_CN.md +++ b/README_CN.md @@ -1,4 +1,4 @@ -# TuGraph +#TuGraph [![Release](https://shields.io/github/v/release/tugraph-family/tugraph-db.svg?logo=stackblitz&label=Version&color=red)](https://github.com/TuGraph-family/tugraph-db/releases) [![UT&&IT](https://github.com/TuGraph-family/tugraph-db/actions/workflows/ci.yml/badge.svg)](https://github.com/TuGraph-family/tugraph-db/actions/workflows/ci.yml) @@ -104,4 +104,3 @@ Slack (在线开发沟通): 通过钉钉群、微信群、微信公众号、邮箱和电话联系我们: ![contacts](./docs/images/contact-zh.png) - diff --git a/codecov.yml b/codecov.yml index a8b7034065..92c1e23742 100644 --- a/codecov.yml +++ b/codecov.yml @@ -1,30 +1,14 @@ -codecov: - notify: - require_ci_to_pass: yes +codecov : notify : require_ci_to_pass : yes -coverage: - precision: 2 - round: down - range: "60...100" + coverage : precision : 2 round : down range : "60...100" - status: - project: no - patch: no - changes: no + status + : project : no patch : no changes : no -github_checks: false + github_checks : false -parsers: - gcov: - branch_detection: - conditional: yes - loop: yes - method: no - macro: no - - -comment: - layout: "header, diff" - behavior: default - require_changes: no + parsers : gcov : branch_detection + : conditional : yes loop : yes method : no macro : no + comment : layout + : "header, diff" behavior : default require_changes : no diff --git a/src/import/CSVPathParser.cpp b/src/import/CSVPathParser.cpp new file mode 100644 index 0000000000..62aeb7190d --- /dev/null +++ b/src/import/CSVPathParser.cpp @@ -0,0 +1,334 @@ +#include "./lib/CSVPathParser.h" + +const int DFATable[4][3] = { + {0, 3, 1}, + {0, 1, 1}, + {0, 3, 4}, + {3, 2, 3}, +}; + +/*---Set the size of the state matrix---*/ +int* bitmap::stateMetrix(int blockNum) { + int* Metrix = new int[blockNum * 4]; + return Metrix; +} + +/*---Sort the states within the matrix---*/ +int* sortMetrix(int* metrix, int blockNum) { + int* stMetrix; + stMetrix = new int[blockNum]; + int j = 0; + stMetrix[0] = 0; + for (int i = 0; i < blockNum - 1; i++) { + stMetrix[i + 1] = metrix[i * 4 + j]; + j = metrix[i * 4 + j]; + } + free(metrix); + return stMetrix; +} + +// ---bitmap--- // +/*---Multithreaded processing of CSV files---*/ +BitmapSet* bitmap::bmAlloc(unsigned long length, unsigned int blockNum, unsigned int blockSize, + bool quote, char*& fileBegin, int threads) { + bool* bitmapField; + bool* bitmapRecord; + bool* Field_IR; + bool* Field_OR; + bool* Field_IQ; + bool* Field_OQ; + bool* Record_IR; + bool* Record_OR; + bool* Record_IQ; + bool* Record_OQ; + int* Metrix; + if (quote == false) { + bitmapField = new bool[length]; + bitmapRecord = new bool[length]; + } else { + Field_IR = new bool[length]; + Field_OR = new bool[length]; + Field_IQ = new bool[length]; + Field_OQ = new bool[length]; + Record_IR = new bool[length]; + Record_OR = new bool[length]; + Record_IQ = new bool[length]; + Record_OQ = new bool[length]; + Metrix = bitmap::stateMetrix(blockNum); + } + +#pragma omp parallel for schedule(dynamic) num_threads(threads) + for (int i = 0; i < blockNum - 1; i++) { + char* linebegin; + char* lineend; + linebegin = (char*)(fileBegin + blockSize * i); + lineend = (char*)(fileBegin + blockSize * (i + 1) - 1); + + if (i == blockNum - 1) { + if (quote == false) { + bool* mbitmapField; + bool* mbitmapRecord; + mbitmapField = (bool*)(bitmapField + (unsigned long)blockSize * i); + mbitmapRecord = (bool*)(bitmapRecord + (unsigned long)blockSize * i); + char* linebegin = (char*)(fileBegin + (unsigned long)blockSize * i); + char* lineend = (char*)(fileBegin + length - 1); + noquote::parse(i, (length - (unsigned long)i * blockSize) / 32, linebegin, lineend, + mbitmapField, mbitmapRecord); + } else { + bool* mField_IR; + bool* mField_OR; + bool* mField_IQ; + bool* mField_OQ; + bool* mRecord_IR; + bool* mRecord_OR; + bool* mRecord_IQ; + bool* mRecord_OQ; + mField_IR = (bool*)(Field_IR + (unsigned long)blockSize * i); + mField_OR = (bool*)(Field_OR + (unsigned long)blockSize * i); + mField_IQ = (bool*)(Field_IQ + (unsigned long)blockSize * i); + mField_OQ = (bool*)(Field_OQ + (unsigned long)blockSize * i); + mRecord_IR = (bool*)(Record_IR + (unsigned long)blockSize * i); + mRecord_OR = (bool*)(Record_OR + (unsigned long)blockSize * i); + mRecord_IQ = (bool*)(Record_IQ + (unsigned long)blockSize * i); + mRecord_OQ = (bool*)(Record_OQ + (unsigned long)blockSize * i); + char* linebegin = (char*)(fileBegin + (unsigned long)blockSize * i); + char* lineend = (char*)(fileBegin + length - 1); + DFA::getDFA(i, linebegin, lineend, mField_IR, mField_OR, mField_IQ, mField_OQ, + mRecord_IR, mRecord_OR, mRecord_IQ, mRecord_OQ, Metrix); + } + } else { + if (quote == false) { + bool* mbitmapField; + bool* mbitmapRecord; + mbitmapField = (bool*)(bitmapField + blockSize * i); + mbitmapRecord = (bool*)(bitmapRecord + blockSize * i); + noquote::parse(i, blockSize / 32, linebegin, lineend, mbitmapField, mbitmapRecord); + } else { + bool* mField_IR; + bool* mField_OR; + bool* mField_IQ; + bool* mField_OQ; + bool* mRecord_IR; + bool* mRecord_OR; + bool* mRecord_IQ; + bool* mRecord_OQ; + mField_IR = (bool*)(Field_IR + blockSize * i); + mField_OR = (bool*)(Field_OR + blockSize * i); + mField_IQ = (bool*)(Field_IQ + blockSize * i); + mField_OQ = (bool*)(Field_OQ + blockSize * i); + mRecord_IR = (bool*)(Record_IR + blockSize * i); + mRecord_OR = (bool*)(Record_OR + blockSize * i); + mRecord_IQ = (bool*)(Record_IQ + blockSize * i); + mRecord_OQ = (bool*)(Record_OQ + blockSize * i); + DFA::getDFA(i, linebegin, lineend, mField_IR, mField_OR, mField_IQ, mField_OQ, + mRecord_IR, mRecord_OR, mRecord_IQ, mRecord_OQ, Metrix); + } + } + } + + int* standMetrix; + if (quote == true) { + standMetrix = sortMetrix(Metrix, blockNum); + bitmapField = new bool[length]; + bitmapRecord = new bool[length]; +#pragma omp parallel for schedule(dynamic) num_threads(threads) + for (int counter = 0; counter < blockNum; counter++) { + bool* totalBTMPF; + bool* totalBTMPR; + bool* fenbtmpF; + bool* fenbtmpR; + if (counter == blockNum - 1) { + if (standMetrix[counter] == 0) { + totalBTMPF = (bool*)(bitmapField + blockSize * counter); + totalBTMPR = (bool*)(bitmapRecord + blockSize * counter); + fenbtmpF = (bool*)(Field_OR + blockSize * counter); + fenbtmpR = (bool*)(Record_OR + blockSize * counter); + bitmap::mergebitmap(totalBTMPF, totalBTMPR, fenbtmpF, fenbtmpR, + (unsigned long)(length - blockSize * counter)); + } else if (standMetrix[counter] == 1) { + totalBTMPF = (bool*)(bitmapField + blockSize * counter); + totalBTMPR = (bool*)(bitmapRecord + blockSize * counter); + fenbtmpF = (bool*)(Field_IR + blockSize * counter); + fenbtmpR = (bool*)(Record_IR + blockSize * counter); + bitmap::mergebitmap(totalBTMPF, totalBTMPR, fenbtmpF, fenbtmpR, + (unsigned long)(length - blockSize * counter)); + } else if (standMetrix[counter] == 2) { + totalBTMPF = (bool*)(bitmapField + blockSize * counter); + totalBTMPR = (bool*)(bitmapRecord + blockSize * counter); + fenbtmpF = (bool*)(Field_OQ + blockSize * counter); + fenbtmpR = (bool*)(Record_OQ + blockSize * counter); + bitmap::mergebitmap(totalBTMPF, totalBTMPR, fenbtmpF, fenbtmpR, + (unsigned long)(length - blockSize * counter)); + } else if (standMetrix[counter] == 3) { + totalBTMPF = (bool*)(bitmapField + blockSize * counter); + totalBTMPR = (bool*)(bitmapRecord + blockSize * counter); + fenbtmpF = (bool*)(Field_IQ + blockSize * counter); + fenbtmpR = (bool*)(Record_IQ + blockSize * counter); + bitmap::mergebitmap(totalBTMPF, totalBTMPR, fenbtmpF, fenbtmpR, + (unsigned long)(length - blockSize * counter)); + } + } else { + if (standMetrix[counter] == 0) { + totalBTMPF = (bool*)(bitmapField + blockSize * counter); + totalBTMPR = (bool*)(bitmapRecord + blockSize * counter); + fenbtmpF = (bool*)(Field_OR + blockSize * counter); + fenbtmpR = (bool*)(Record_OR + blockSize * counter); + bitmap::mergebitmap(totalBTMPF, totalBTMPR, fenbtmpF, fenbtmpR, blockSize); + } else if (standMetrix[counter] == 1) { + totalBTMPF = (bool*)(bitmapField + blockSize * counter); + totalBTMPR = (bool*)(bitmapRecord + blockSize * counter); + fenbtmpF = (bool*)(Field_IR + blockSize * counter); + fenbtmpR = (bool*)(Record_IR + blockSize * counter); + bitmap::mergebitmap(totalBTMPF, totalBTMPR, fenbtmpF, fenbtmpR, blockSize); + } else if (standMetrix[counter] == 2) { + totalBTMPF = (bool*)(bitmapField + blockSize * counter); + totalBTMPR = (bool*)(bitmapRecord + blockSize * counter); + fenbtmpF = (bool*)(Field_OQ + blockSize * counter); + fenbtmpR = (bool*)(Record_OQ + blockSize * counter); + bitmap::mergebitmap(totalBTMPF, totalBTMPR, fenbtmpF, fenbtmpR, blockSize); + } else if (standMetrix[counter] == 3) { + totalBTMPF = (bool*)(bitmapField + blockSize * counter); + totalBTMPR = (bool*)(bitmapRecord + blockSize * counter); + fenbtmpF = (bool*)(Field_IQ + blockSize * counter); + fenbtmpR = (bool*)(Record_IQ + blockSize * counter); + bitmap::mergebitmap(totalBTMPF, totalBTMPR, fenbtmpF, fenbtmpR, blockSize); + } + } + } + } + BitmapSet* BTMP = new BitmapSet(); + + BTMP->bitmapComma = bitmapField; + BTMP->bitmapLineBreak = bitmapRecord; + BTMP->length = length; + BTMP->delete_bitmap = false; + return BTMP; +} + +/*---merge four states bitmap---*/ +void bitmap::mergebitmap(bool* totalBTMPF, bool* totalBTMPR, bool* fenbtmpF, bool* fenbtmpR, + unsigned int blockSize) { + memcpy(totalBTMPF, fenbtmpF, blockSize * sizeof(bool)); + memcpy(totalBTMPR, fenbtmpR, blockSize * sizeof(bool)); +} + +// -----noquote----- // +noquote::noquote(bool done) { Done = done; } + +/*---Directly detect the delimiter---*/ +void checkCR(char*& lineBegin, bool*& columnBegin, bool*& recordBegin) { + const __m256i column = _mm256_set1_epi8(','); + const __m256i record = _mm256_set1_epi8('\n'); + __m256i op = _mm256_loadu_si256((__m256i*)lineBegin); + __m256i eqC = _mm256_cmpeq_epi8(op, column); + __m256i eqR = _mm256_cmpeq_epi8(op, record); + int maskC = _mm256_movemask_epi8(eqC); + int maskR = _mm256_movemask_epi8(eqR); + while (maskC) { + int CP = bitScan(maskC) - 1; + columnBegin[CP] = true; + maskC &= ~(0x1 << CP); + } + while (maskR) { + int RP = bitScan(maskR) - 1; + recordBegin[RP] = true; + maskR &= ~(0x1 << RP); + } +} + +/*---*/ +noquote* noquote::parse(int block, int frequence, char*& line_begin, char*& line_end, + bool*& col_column_begin, bool*& col_record_begin) { + char* mlinebegin; + bool* mcolumnbegin; + bool* mrecordbegin; + for (int i = 0; i < frequence; i++) { + mlinebegin = (char*)(line_begin + 32 * i); + mcolumnbegin = (bool*)(col_column_begin + 32 * i); + mrecordbegin = (bool*)(col_record_begin + 32 * i); + checkCR(mlinebegin, mcolumnbegin, mrecordbegin); + } + return NULL; +} + +bool noquote::get_noquoteState() { return Done; } + +/*---class DFA---*/ +DFA::DFA(int block, int* metrix) { + Block = block; + Metrix = metrix; +} + +/*---Construction of a finite state machine---*/ +DFA* DFA::getDFA(int block, char*& line_begin, char*& line_end, bool*& field_IR, bool*& field_OR, + bool*& field_IQ, bool*& field_OQ, bool*& record_IR, bool*& record_OR, + bool*& record_IQ, bool*& record_OQ, int*& Metrix) { + int start[4]; + int result[4]; + int symbol; + unsigned int sig_sym = 0; + + bool sig[4]; + for (int i = 0; i < 4; i++) { + start[i] = i; + result[i] = i; + sig[i] = 0; + } + while (line_begin <= line_end) { + if (*line_begin == ',') { + symbol = 0; + sig_sym = 44; + } else if (*line_begin == '\n') { + symbol = 0; + sig_sym = 10; + } else if (*line_begin == '\"') { + symbol = 1; + sig_sym = 0; + } else { + symbol = 2; + sig_sym = 0; + } + for (int j = 0; j < 4; j++) { + if (result[j] != 4) { + result[j] = DFATable[result[j]][symbol]; + if (sig[j] == 0 && result[j] == 3) sig[j] = 1; + if (sig[j] == 1 && result[j] == 2) sig[j] = 0; + if (sig[j] == 0) { + if (sig_sym == 44) { + if (j == 0) + *field_OR = true; + else if (j == 1) + *field_IR = true; + else if (j == 2) + *field_OQ = true; + else + *field_IQ = true; + } else if (sig_sym == 10) { + if (j == 0) + *record_OR = true; + else if (j == 1) + *record_IR = true; + else if (j == 2) + *record_OQ = true; + else + *record_IQ = true; + } + } + } + } + ++field_OR; + ++field_IR; + ++field_OQ; + ++field_IQ; + ++record_OR; + ++record_IR; + ++record_OQ; + ++record_IQ; + ++line_begin; + } + for (int k = 0; k < 4; k++) { + Metrix[block * 4 + k] = result[k]; + } + DFA* object = new DFA(block, Metrix); + return object; +} \ No newline at end of file diff --git a/src/import/CSVPathParser.h b/src/import/CSVPathParser.h new file mode 100644 index 0000000000..88ebb32420 --- /dev/null +++ b/src/import/CSVPathParser.h @@ -0,0 +1,51 @@ +#ifndef CSVPATHPASER_H +#define CSVPATHPASER_H + +#include +#include +#include +#include +#include +#include "BitmapSet.h" + +#define bitScan(x) __builtin_ffs(x) + +using namespace std; + +class bitmap { + public: + static BitmapSet* bmAlloc(unsigned long length, unsigned int blockNum, unsigned int blockSize, + bool quote, char*& fileBegin, int threads); + static int* stateMetrix(int blockNum); + static void mergebitmap(bool* totalBTMPF, bool* totalBTMPR, bool* fenbtmpF, bool* fenbtmpR, + unsigned int blockSize); +}; + +class noquote { + public: + noquote(); + noquote(bool done); + ~noquote(); + static noquote* parse(int block, int frequence, char*& line_begin, char*& line_end, + bool*& col_column_begin, bool*& col_record_begin); + bool get_noquoteState(); + + private: + bool Done; +}; + +class DFA { + public: + DFA(); + DFA(int block, int* metrix); + ~DFA(); + static DFA* getDFA(int block, char*& line_begin, char*& line_end, bool*& field_IR, + bool*& field_OR, bool*& field_IQ, bool*& field_OQ, bool*& record_IR, + bool*& record_OR, bool*& record_IQ, bool*& record_OQ, int*& Metrix); + int* get_Metrix(); + + private: + int Block; + int* Metrix; +}; +#endif \ No newline at end of file diff --git a/src/import/RecordLoader.cpp b/src/import/RecordLoader.cpp new file mode 100644 index 0000000000..722d9c3dbb --- /dev/null +++ b/src/import/RecordLoader.cpp @@ -0,0 +1,169 @@ +#include +#include "./lib/RecordLoader.h" +using namespace std; + +#define MAX_PAD 64 + +/*---To pursue speed, all CSV data is loaded into memory---*/ +RecordSet* RecordLoader::loadRecords(char* file_path, bool head_line) { + unsigned long size; + FILE* fp = fopen(file_path, "rb+"); + if (fp == NULL) { + return NULL; + } + fseek(fp, 0, SEEK_END); + size = ftell(fp); + rewind(fp); + void* p; + if (posix_memalign(&p, 64, (size + MAX_PAD) * sizeof(char)) != 0) { + cout << "Memory space is not enough! " << endl; + } + char* record_text = (char*)p; + size_t load_size = fread(record_text, 1, size, fp); + if (load_size == 0) { + cout << "Fail to load the input record into memory! " << endl; + } + int remain = 64 - (size % 64); + int counter = 0; + // pad the input data where its size can be divided by 64 + while (counter < remain) { + record_text[size + counter] = 'd'; + counter++; + } + record_text[size + counter] = '\0'; + fclose(fp); + bool quote = false; + quote = intervalSample((char*)p, size); + if (quote == 0) + cout << "Go to Template without quote" << endl; + else + cout << "Go to Template with quote" << endl; + unsigned int columns = scanColumns(record_text, head_line, quote); + // only one single record + RecordSet* record = new RecordSet(); + record->text = record_text; + record->columns = columns; + record->headline = head_line; + record->quotesample = quote; + record->abs_size = size; + record->rec_size = strlen(record_text); + record->delete_text = false; + return record; +} + +/*---Determine the number of columns by scanning the head line---*/ +unsigned int RecordLoader::scanColumns(char* first_line, bool head_line, bool quote_sample) { + unsigned int columns = 0; + char* pos = (char*)&first_line[0]; + if (head_line == true) { + while (1) { + if (*pos != ',' && *pos != '\n') { + pos++; + } else if (*pos == ',') { + pos++; + columns++; + } else if (*pos == '\n') { + columns++; + break; + } + } + } else { + if (quote_sample == false) { + while (1) { + if (*pos == ',') { + columns++; + pos++; + } else if (*pos == '\n') { + columns++; + break; + } else + pos++; + } + } else { + bool sigquote = false; + while (1) { + if (*pos == '\n' && sigquote == false) { + columns++; + break; + } else if (*pos == ',' && sigquote == false) { + pos++; + columns++; + } else if (*pos == '\"') { + if (*(++pos) == ',' || *(++pos) == '\n' && sigquote == true) { + pos++; + sigquote = false; + + } else if (sigquote == false) { + pos++; + sigquote = true; + } + } else + pos++; + } + } + } + return columns; +} + +/*---Exponential sampling---*/ +bool RecordLoader::intervalSample(char* fileBegin, unsigned long length) { + srand((unsigned)time(NULL)); + int Time = rand() % 1000 + 100; + unsigned long initial_Time = Time; + bool quote; + for (int sa = 5; (pow(2, sa) + initial_Time) < length; sa++) { + char* sampleLine; + sampleLine = new char[32]; + sampleLine = fileBegin + initial_Time; + quote = quoteBool(sampleLine); + if (quote == true) return quote; + initial_Time = initial_Time + pow(2, sa); + } + return quote; +} + +/*---Check if double quotes exist in the sample line---*/ +bool RecordLoader::quoteBool(char*& sampleLine) { + const __m256i emitting = _mm256_set1_epi8('"'); + char* sample32; + sample32 = new char[32]; + sample32 = sampleLine; + __m256i op = _mm256_loadu_si256((__m256i*)sample32); + __m256i eqC = _mm256_cmpeq_epi8(op, emitting); + int mask = _mm256_movemask_epi8(eqC); + int gg = -1; + while (mask) { + gg = bitScan(mask) - 1; + if (gg != -1) return true; + } + return false; +} + +// -----blSize----- // +blSize::blSize(unsigned int block, unsigned int blnumber) { + blockSize = block; + blockNumber = blnumber; +} + +/*---Determine the size of the blocksize based on the size of the file---*/ +blSize* blSize::matchingBlock(unsigned long length, int avcore) { + int fileSize = length / 1024 / 1024 / 1024 + 1; + int blockSize = 0; + int blnumber = 0; + if (fileSize < 2) + blockSize = 128 * 1024; + else if (fileSize < 5) + blockSize = 256 * 1024; + else if (fileSize < 10) + blockSize = 512 * 1024; + else + blockSize = 1024 * 1024; + if (length / blockSize < avcore) blockSize = length / avcore; + blnumber = length / blockSize + 1; + blSize* object = new blSize(blockSize, blnumber); + return object; +} + +unsigned int blSize::getBlSize() { return blockSize; } + +unsigned int blSize::getBlNum() { return blockNumber; } \ No newline at end of file diff --git a/src/import/RecordLoader.h b/src/import/RecordLoader.h new file mode 100644 index 0000000000..f34ef93174 --- /dev/null +++ b/src/import/RecordLoader.h @@ -0,0 +1,47 @@ +#ifndef RECORDLOADER_H +#define RECORDLOADER_H +#include +#include +#include +#include +#if defined(__MACH__) +#include +#else +#include +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "RecordSet.h" + +#define bitScan(x) __builtin_ffs(x) + +using namespace std; + +class RecordLoader { + public: + static RecordSet* loadRecords(char* file_path, bool head_line); + static unsigned int scanColumns(char* first_line, bool head_line, bool quote_sample); + static bool intervalSample(char* file_Begin, unsigned long length); + static bool quoteBool(char*& sampleLine); +}; + +class blSize { + public: + blSize(unsigned int block, unsigned int blnumber); + static blSize* matchingBlock(unsigned long length, int avcore); + unsigned int getBlSize(); + unsigned int getBlNum(); + + private: + unsigned int blockSize; + unsigned int blockNumber; +}; +#endif \ No newline at end of file diff --git a/src/import/RecordSet.h b/src/import/RecordSet.h new file mode 100644 index 0000000000..51951c2c9c --- /dev/null +++ b/src/import/RecordSet.h @@ -0,0 +1,42 @@ +#ifndef RECORDSET_H +#define RECORDSET_H + +#include +#include + +using namespace std; + +/*---RecordSet is using for record CSV file based information*/ +struct RecordSet { + char* text; + unsigned int columns; + unsigned long abs_size; + unsigned long rec_size; + bool headline; + bool quotesample; + bool delete_text; + + RecordSet() { + text = NULL; + columns = 0; + abs_size = 0; + rec_size = 0; + headline = false; + quotesample = false; + delete_text = true; + } + + ~RecordSet() { + if (delete_text == true && text != NULL) { + free(text); + text = NULL; + columns = 0; + abs_size = 0; + rec_size = 0; + headline = false; + quotesample = false; + delete_text = false; + } + } +}; +#endif \ No newline at end of file diff --git a/src/import/blob_writer.h b/src/import/blob_writer.h index b8cf183aab..aa345e220d 100644 --- a/src/import/blob_writer.h +++ b/src/import/blob_writer.h @@ -1,16 +1,16 @@ /** -* Copyright 2022 AntGroup CO., Ltd. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -*/ + * Copyright 2022 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ #pragma once class BufferedBlobWriter { diff --git a/src/import/column_parser.h b/src/import/column_parser.h index d5b784de44..06bfcd4921 100644 --- a/src/import/column_parser.h +++ b/src/import/column_parser.h @@ -31,7 +31,7 @@ namespace import_v2 { class BlockParser { public: virtual bool ReadBlock(std::vector>& buf) = 0; - virtual ~BlockParser(){} + virtual ~BlockParser() {} }; /** Parse each line of a csv into a vector of FieldData, excluding SKIP columns. @@ -347,13 +347,13 @@ class ColumnParser : public BlockParser { values.clear(); const char* p = beg; bool success = true; -#define WARN_OR_THROW(except) \ - if (forgiving_) { \ +#define WARN_OR_THROW(except) \ + if (forgiving_) { \ if (errors_++ < max_errors_) LOG_INFO() << except.what(); \ - success = false; \ - break; \ - } else { \ - throw except; \ + success = false; \ + break; \ + } else { \ + throw except; \ } for (size_t column = 0; column < field_specs_.size() && success; column++) { @@ -387,13 +387,13 @@ class ColumnParser : public BlockParser { p = newp; } } -#define WARN_OR_THROW2(except) \ - if (forgiving_) { \ +#define WARN_OR_THROW2(except) \ + if (forgiving_) { \ if (errors_++ < max_errors_) LOG_INFO() << except.what(); \ if (errors_ == max_errors_) LOG_INFO() << "ignore more error message"; \ - success = false; \ - } else { \ - throw except; \ + success = false; \ + } else { \ + throw except; \ } if (p != end && !fma_common::TextParserUtils::IsNewLine(*p)) { @@ -472,7 +472,7 @@ class JsonLinesParser : public BlockParser { #define SKIP_OR_THROW(except) \ if (forgiving_) { \ - if (errors_++ < max_errors_) LOG_INFO() << except.what(); \ + if (errors_++ < max_errors_) LOG_INFO() << except.what(); \ while (start < end && !fma_common::TextParserUtils::IsNewLine(*start)) start++; \ while (start < end && fma_common::TextParserUtils::IsNewLine(*start)) start++; \ return std::tuple(start - original_starting, false); \ @@ -488,14 +488,14 @@ class JsonLinesParser : public BlockParser { case 0: break; case 1: - { - istr.unget(); // hack - break; - } + { + istr.unget(); // hack + break; + } default: - { - SKIP_OR_THROW(ParseJsonException(start, end, err_code.message())); - } + { + SKIP_OR_THROW(ParseJsonException(start, end, err_code.message())); + } } using namespace lgraph::field_data_helper; try { @@ -513,89 +513,89 @@ class JsonLinesParser : public BlockParser { case FieldType::NUL: FMA_ASSERT(false); case FieldType::BOOL: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData(str.data(), - str.data() + str.size(), fd); - } else { - fd = FieldData::Bool(val.as_bool()); - } - break; + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Bool(val.as_bool()); } + break; + } case FieldType::INT8: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData(str.data(), - str.data() + str.size(), fd); - } else { - fd = FieldData::Int8(val.as_number().to_int32()); - } - break; + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Int8(val.as_number().to_int32()); } + break; + } case FieldType::INT16: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData(str.data(), - str.data() + str.size(), fd); - } else { - fd = FieldData::Int16(val.as_number().to_int32()); - } - break; + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Int16(val.as_number().to_int32()); } + break; + } case FieldType::INT32: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData(str.data(), - str.data() + str.size(), fd); - } else { - fd = FieldData::Int32(val.as_number().to_int32()); - } - break; + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Int32(val.as_number().to_int32()); } + break; + } case FieldType::INT64: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData(str.data(), - str.data() + str.size(), fd); - } else { - fd = FieldData::Int64(val.as_number().to_int64()); - } - break; + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Int64(val.as_number().to_int64()); } + break; + } case FieldType::FLOAT: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData(str.data(), - str.data() + str.size(), fd); - } else { - fd = FieldData::Float(static_cast(val.as_double())); - } - break; + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Float(static_cast(val.as_double())); } + break; + } case FieldType::DOUBLE: - { - const auto& val = json_obj.at(column); - if (val.is_string()) { - const auto& str = ToStdString(val.as_string()); - ParseStringIntoFieldData( - str.data(), str.data() + str.size(), fd); - } else { - fd = FieldData::Double(val.as_double()); - } - break; + { + const auto& val = json_obj.at(column); + if (val.is_string()) { + const auto& str = ToStdString(val.as_string()); + ParseStringIntoFieldData(str.data(), + str.data() + str.size(), fd); + } else { + fd = FieldData::Double(val.as_double()); } + break; + } case FieldType::DATE: fd = FieldData::Date(ToStdString(json_obj.at(column).as_string())); break; diff --git a/src/import/import_client.cpp b/src/import/import_client.cpp index 36c41383f9..cb8c48c463 100644 --- a/src/import/import_client.cpp +++ b/src/import/import_client.cpp @@ -97,7 +97,7 @@ void lgraph::import_v2::OnlineImportClient::DoImport() { } ++packages_processed; LOG_INFO() << fma_common::StringFormatter::Format("importing ({}) packages", - packages_processed); + packages_processed); if (is_first_package) { if (fd.n_header_line > static_cast(std::count(begin, end, '\n')) + (end[-1] != '\n')) @@ -141,31 +141,28 @@ void lgraph::import_v2::OnlineImportClient::DoFullImport() const { double t1 = fma_common::GetTime(); RestClient client(config_.url); client.Login(config_.username, config_.password); - std::string str = FMA_FMT("CALL db.importor.fullImportor(\\{{}:\"{}\", {}:\"{}\", {}:\"{}\", " + std::string str = FMA_FMT( + "CALL db.importor.fullImportor(\\{{}:\"{}\", {}:\"{}\", {}:\"{}\", " "{}:{}, {}:\"{}\", {}:\"{}\", {}:{}, {}:{}, " "{}:{}, {}:{}, {}:{}, {}:{}, {}:{}, " "{}:{}, {}:{}, {}:{}, {}:{}, {}:\"{}\"\\})", - "config_file", config_.config_file, "username", config_.username, - "password", config_.password, "continue_on_error", - config_.continue_on_error ? "true" : "false", + "config_file", config_.config_file, "username", config_.username, "password", + config_.password, "continue_on_error", config_.continue_on_error ? "true" : "false", "graph_name", config_.graph_name, "delimiter", config_.delimiter, "delete_if_exists", - config_.delete_if_exists ? "true" : "false", - "parse_block_size", config_.parse_block_size, - "parse_block_threads", config_.parse_block_threads, - "parse_file_threads", config_.parse_file_threads, - "generate_sst_threads", config_.generate_sst_threads, - "read_rocksdb_threads", config_.read_rocksdb_threads, - "vid_num_per_reading", config_.vid_num_per_reading, - "max_size_per_reading", config_.max_size_per_reading, - "compact", config_.compact ? "true" : "false", - "keep_vid_in_memory", config_.keep_vid_in_memory ? "true" : "false", - "enable_fulltext_index", config_.enable_fulltext_index ? "true" : "false", - "fulltext_index_analyzer", config_.fulltext_index_analyzer); + config_.delete_if_exists ? "true" : "false", "parse_block_size", config_.parse_block_size, + "parse_block_threads", config_.parse_block_threads, "parse_file_threads", + config_.parse_file_threads, "generate_sst_threads", config_.generate_sst_threads, + "read_rocksdb_threads", config_.read_rocksdb_threads, "vid_num_per_reading", + config_.vid_num_per_reading, "max_size_per_reading", config_.max_size_per_reading, + "compact", config_.compact ? "true" : "false", "keep_vid_in_memory", + config_.keep_vid_in_memory ? "true" : "false", "enable_fulltext_index", + config_.enable_fulltext_index ? "true" : "false", "fulltext_index_analyzer", + config_.fulltext_index_analyzer); auto log = client.EvalCypher("", str)["result"][0][0].as_string(); if (!log.empty()) { std::vector parts; boost::split(parts, log, boost::is_any_of("\n")); - for (const auto &item : parts) { + for (const auto& item : parts) { if (!item.empty()) LOG_INFO() << item; } } @@ -177,9 +174,9 @@ void lgraph::import_v2::OnlineImportClient::DoFullImportFile() const { double t1 = fma_common::GetTime(); RestClient client(config_.url); client.Login(config_.username, config_.password); - std::string cypher = FMA_FMT(R"(CALL db.importor.fullFileImportor("{}","{}",{}))", - config_.graph_name, config_.path, - config_.remote ? "true" : "false"); + std::string cypher = + FMA_FMT(R"(CALL db.importor.fullFileImportor("{}","{}",{}))", config_.graph_name, + config_.path, config_.remote ? "true" : "false"); client.EvalCypher("", cypher); double t2 = fma_common::GetTime(); LOG_INFO() << "Full online import file finished in " << t2 - t1 << " seconds."; diff --git a/src/import/import_client.h b/src/import/import_client.h index 0f0845058c..30c524bcd6 100644 --- a/src/import/import_client.h +++ b/src/import/import_client.h @@ -45,7 +45,7 @@ class OnlineImportClient { uint16_t generate_sst_threads = 15; uint16_t read_rocksdb_threads = 15; size_t vid_num_per_reading = 10000; - size_t max_size_per_reading = 32*1024*1024; + size_t max_size_per_reading = 32 * 1024 * 1024; bool compact = false; bool keep_vid_in_memory = true; bool enable_fulltext_index = false; diff --git a/src/import/import_config_parser.h b/src/import/import_config_parser.h index b47eadd5fb..d06513d3bb 100644 --- a/src/import/import_config_parser.h +++ b/src/import/import_config_parser.h @@ -59,8 +59,8 @@ enum KeyWord { HEADER, OPTIONAL_, // OPTIONAL is a macro in windows SDK FORMAT, - ASC, // TemporalFieldOrder::ASC - DESC // TemporalFieldOrder::DESC + ASC, // TemporalFieldOrder::ASC + DESC // TemporalFieldOrder::DESC }; class KeyWordFunc { @@ -207,17 +207,18 @@ struct ColumnSpec { bool fulltext = false; inline bool CheckValid() const { - if (primary && !(index && idxType == IndexType::GlobalUniqueIndex)) THROW_CODE(InputError, - "primary {} should be index and unique", name); - if ((idxType == IndexType::GlobalUniqueIndex || idxType == IndexType::PairUniqueIndex) - && optional) THROW_CODE(InputError, - FMA_FMT("unique/primary index {} should not be optional", name)); - if (type == FieldType ::BLOB && index) THROW_CODE(InputError, - FMA_FMT("BLOB field {} should not be indexed", name)); - if (type != FieldType::STRING && fulltext) THROW_CODE(InputError, - FMA_FMT("fulltext index {} only supports STRING type", name)); - if (type != FieldType::INT64 && temporal) THROW_CODE(InputError, - FMA_FMT("edge label [{}] temporal field [{}] should be INT64", name)); + if (primary && !(index && idxType == IndexType::GlobalUniqueIndex)) + THROW_CODE(InputError, "primary {} should be index and unique", name); + if ((idxType == IndexType::GlobalUniqueIndex || idxType == IndexType::PairUniqueIndex) && + optional) + THROW_CODE(InputError, FMA_FMT("unique/primary index {} should not be optional", name)); + if (type == FieldType ::BLOB && index) + THROW_CODE(InputError, FMA_FMT("BLOB field {} should not be indexed", name)); + if (type != FieldType::STRING && fulltext) + THROW_CODE(InputError, FMA_FMT("fulltext index {} only supports STRING type", name)); + if (type != FieldType::INT64 && temporal) + THROW_CODE(InputError, + FMA_FMT("edge label [{}] temporal field [{}] should be INT64", name)); return true; } @@ -248,8 +249,8 @@ struct ColumnSpec { bool operator==(const ColumnSpec& rhs) const { // do not compare is_id/skip return name == rhs.name && type == rhs.type && optional == rhs.optional && - index == rhs.index && idxType == rhs.idxType && - temporal == rhs.temporal && fulltext == rhs.fulltext; + index == rhs.index && idxType == rhs.idxType && temporal == rhs.temporal && + fulltext == rhs.fulltext; } bool operator<(const ColumnSpec& rhs) const { return name < rhs.name; } @@ -333,9 +334,7 @@ struct LabelDesc { FMA_FMT("field name [{}] not found in label [{}]", field_name, name)); } - std::vector GetColumnSpecs() const { - return columns; - } + std::vector GetColumnSpecs() const { return columns; } ColumnSpec Find(std::string field_name) const { for (size_t i = 0; i < columns.size(); ++i) { @@ -434,8 +433,8 @@ struct LabelDesc { if (it->primary) return true; else - THROW_CODE(InputError, - "[{}] is not primary field of label [{}]", field_name, name); + THROW_CODE(InputError, "[{}] is not primary field of label [{}]", field_name, + name); } } THROW_CODE(InputError, "[{}] is not valid field name of label [{}]", field_name, name); @@ -448,8 +447,8 @@ struct LabelDesc { it->CheckValid(); if (it->primary && is_vertex) { if (find_primary) { - THROW_CODE(InputError, - "vertex label [{}] should has no more than one primary.", name); + THROW_CODE(InputError, "vertex label [{}] should has no more than one primary.", + name); } find_primary = true; } @@ -458,8 +457,8 @@ struct LabelDesc { THROW_CODE(InputError, "vertex label [{}] has no primary", name); } if (is_vertex && !edge_constraints.empty()) { - THROW_CODE(InputError, - "label [{}] type is vertex, but edge_constraints is not empty", name); + THROW_CODE(InputError, "label [{}] type is vertex, but edge_constraints is not empty", + name); } return true; } @@ -719,7 +718,7 @@ struct SchemaDesc { // find, check if equal if (!(x == ld)) { THROW_CODE(InputError, "{} and {} not comparable in two schema", x.ToString(), - ld.ToString()); + ld.ToString()); } } } @@ -791,42 +790,44 @@ class ImportConfParser { if (ld.is_vertex) { if (!s.contains("primary") || !s.contains("properties")) { THROW_CODE(InputError, - R"(Label[{}]: Missing "primary" or "properties" definition)", ld.name); + R"(Label[{}]: Missing "primary" or "properties" definition)", + ld.name); } if (s.contains("temporal")) { - THROW_CODE(InputError, - R"(Label[{}]: "temporal" is not supported in Vertex)", ld.name); + THROW_CODE(InputError, R"(Label[{}]: "temporal" is not supported in Vertex)", + ld.name); } - for (auto & p : s["properties"]) { + for (auto& p : s["properties"]) { if (p.contains("pair_unique")) { THROW_CODE(InputError, - R"(Label[{}]: "pair_unique index" is not supported in Vertex)", ld.name); + R"(Label[{}]: "pair_unique index" is not supported in Vertex)", + ld.name); } } } else { if (s.contains("primary")) { - THROW_CODE(InputError, - R"(Label[{}]: "primary" is not supported in Edge)", ld.name); + THROW_CODE(InputError, R"(Label[{}]: "primary" is not supported in Edge)", + ld.name); } if (s.contains("constraints")) { if (!s["constraints"].is_array()) - THROW_CODE(InputError, - R"(Label[{}]: "constraints" is not array)", ld.name); + THROW_CODE(InputError, R"(Label[{}]: "constraints" is not array)", ld.name); for (auto& p : s["constraints"]) { if (!p.is_array() || p.size() != 2) THROW_CODE(InputError, - R"(Label[{}]: "constraints" element size should be 2)", ld.name); + R"(Label[{}]: "constraints" element size should be 2)", + ld.name); ld.edge_constraints.emplace_back(p[0], p[1]); } } if (s.contains("properties")) { for (auto& p : s["properties"]) { - if (p.contains("pair_unique") && p["pair_unique"] - && p.contains("unique") && p["unique"]) { + if (p.contains("pair_unique") && p["pair_unique"] && p.contains("unique") && + p["unique"]) { THROW_CODE(InputError, - "Label[{}]: pair_unique and unique configuration cannot" - " occur simultaneously)", - ld.name); + "Label[{}]: pair_unique and unique configuration cannot" + " occur simultaneously)", + ld.name); } } } @@ -838,14 +839,16 @@ class ImportConfParser { for (auto& p : s["properties"]) { ColumnSpec cs; if (!p.contains("name") || !p.contains("type")) { - THROW_CODE(InputError, + THROW_CODE( + InputError, R"(Label[{}]: Missing "name" or "type" in "properties" definition)", ld.name); } if (p["name"] == KeyWordFunc::GetStrFromKeyWord(KeyWord::SKIP) || p["name"] == KeyWordFunc::GetStrFromKeyWord(KeyWord::SRC_ID) || p["name"] == KeyWordFunc::GetStrFromKeyWord(KeyWord::DST_ID)) { - THROW_CODE(InputError, + THROW_CODE( + InputError, R"(Label[{}]: Property name cannot be "SKIP" or "SRC_ID" or "DST_ID")", ld.name); } @@ -896,8 +899,8 @@ class ImportConfParser { } if (s.contains("detach_property")) { if (!s["detach_property"].is_boolean()) { - THROW_CODE(InputError, - "Label[{}]: \"detach_property\" is not boolean", ld.name); + THROW_CODE(InputError, "Label[{}]: \"detach_property\" is not boolean", + ld.name); } ld.detach_property = s["detach_property"]; } @@ -919,8 +922,8 @@ class ImportConfParser { for (auto& item : conf["files"]) { if (!item.contains("format") || !item.contains("label") || !item.contains("columns")) { THROW_CODE(InputError, - R"(Missing "path" or "format" or "label" or "columns" in json {})", - item.dump(4)); + R"(Missing "path" or "format" or "label" or "columns" in json {})", + item.dump(4)); } if (!item["columns"].is_array()) { THROW_CODE(InputError, "\"columns\" is not array in json {}", item.dump(4)); @@ -932,8 +935,8 @@ class ImportConfParser { THROW_CODE(InputError, R"(Missing "path" in json {})", item.dump(4)); const std::string& path = item["path"]; if (!fs::exists(path)) { - THROW_CODE(InputError, - "Path [{}] does not exist in json {}", path, item.dump(4)); + THROW_CODE(InputError, "Path [{}] does not exist in json {}", path, + item.dump(4)); } if (fs::is_directory(path)) { for (const auto& entry : fs::directory_iterator(path)) { @@ -960,8 +963,8 @@ class ImportConfParser { cd.data_format = item["format"]; if (cd.data_format != "CSV" && cd.data_format != "JSON") { THROW_CODE(InputError, - "\"format\" value error : {}, should be CSV or JSON in json {}", - cd.data_format, item.dump(4)); + "\"format\" value error : {}, should be CSV or JSON in json {}", + cd.data_format, item.dump(4)); } cd.label = item["label"]; if (item.contains("header")) { @@ -970,8 +973,8 @@ class ImportConfParser { cd.is_vertex_file = !(item.contains("SRC_ID") || item.contains("DST_ID")); if (!cd.is_vertex_file) { if (!item.contains("SRC_ID") || !item.contains("DST_ID")) { - THROW_CODE(InputError, - R"(Missing "SRC_ID" or "DST_ID" in json {})", item.dump(4)); + THROW_CODE(InputError, R"(Missing "SRC_ID" or "DST_ID" in json {})", + item.dump(4)); } cd.edge_src.label = item["SRC_ID"]; cd.edge_dst.label = item["DST_ID"]; @@ -1025,9 +1028,9 @@ class ImportConfParser { std::string err; for (const auto& v : set) err.append(v + ","); THROW_CODE(InputError, - "All fields (expect optional) should be defined, " - "[{}] not defined", - err); + "All fields (expect optional) should be defined, " + "[{}] not defined", + err); } // check edge SRC_ID and DST_ID valid diff --git a/src/import/import_data_file.h b/src/import/import_data_file.h index c5c0b5dba2..747244fba7 100644 --- a/src/import/import_data_file.h +++ b/src/import/import_data_file.h @@ -176,7 +176,7 @@ class ImportDataFile { std::make_tuple(), std::make_tuple(&writers_)); if (!src_oe_files_[i].Good()) { LOG_ERROR() << "Error opening intermediate file [" << src_oe_files_[i].Path() - << "] for write"; + << "] for write"; } } @@ -192,7 +192,7 @@ class ImportDataFile { std::make_tuple(), std::make_tuple(&writers_)); if (!dst_ie_files_[i].Good()) { LOG_ERROR() << "Error opening intermediate file [" << dst_ie_files_[i].Path() - << "] for write"; + << "] for write"; } } } diff --git a/src/import/import_exception.h b/src/import/import_exception.h index da8fcaca72..f14adddbb1 100644 --- a/src/import/import_exception.h +++ b/src/import/import_exception.h @@ -66,9 +66,8 @@ class LineParserException : public std::exception { protected: virtual void Print(int indent) const { - LOG_INFO() << std::string(indent, ' ') - << "Due to: " << err_ << "\n>Error line:\n\t" << line_ - << "\n>Binary format:\n\t" << _detail::BinaryLine(line_); + LOG_INFO() << std::string(indent, ' ') << "Due to: " << err_ << "\n>Error line:\n\t" + << line_ << "\n>Binary format:\n\t" << _detail::BinaryLine(line_); } }; diff --git a/src/import/import_online.cpp b/src/import/import_online.cpp index 13e75c0e6a..20d78ec8ec 100644 --- a/src/import/import_online.cpp +++ b/src/import/import_online.cpp @@ -45,10 +45,14 @@ struct OnlineImportEdge { TemporalId tid; Value prop; bool is_out_edge; - OnlineImportEdge(VertexId vid1_, LabelId lid_, VertexId vid2_, TemporalId tid_, - Value&& prop, bool is_out) - : vid1(vid1_), lid(lid_), vid2(vid2_), tid(tid_), prop(std::move(prop)) - , is_out_edge(is_out) {} + OnlineImportEdge(VertexId vid1_, LabelId lid_, VertexId vid2_, TemporalId tid_, Value&& prop, + bool is_out) + : vid1(vid1_), + lid(lid_), + vid2(vid2_), + tid(tid_), + prop(std::move(prop)), + is_out_edge(is_out) {} OnlineImportEdge(VertexId vid1_, LabelId lid_, VertexId vid2_, TemporalId tid_, const Value& prop, bool is_out) @@ -165,13 +169,11 @@ std::string lgraph::import_v2::ImportOnline::ImportEdges(LightningGraph* db, Tra { auto schema = txn.GetSchema(label, false); - if (!schema) - THROW_CODE(InputError, "Edge Label [{}] does not exist.", label); + if (!schema) THROW_CODE(InputError, "Edge Label [{}] does not exist.", label); const auto& ec = schema->GetEdgeConstraintsLids(); if (!ec.empty()) { graph::EdgeConstraintsChecker ecc(ec); - ecc.Check(txn.GetLabelId(true, src_label), - txn.GetLabelId(true, dst_label)); + ecc.Check(txn.GetLabelId(true, src_label), txn.GetLabelId(true, dst_label)); } } @@ -202,14 +204,14 @@ std::string lgraph::import_v2::ImportOnline::ImportEdges(LightningGraph* db, Tra VertexIndex* src_index = txn.GetVertexIndex(src_label, fd.edge_src.id); if (!src_index || !src_index->IsReady()) { - THROW_CODE(InputError, - "Src {} field {} has no available index!", src_label, fd.edge_src.id); + THROW_CODE(InputError, "Src {} field {} has no available index!", src_label, + fd.edge_src.id); } VertexIndex* dst_index = txn.GetVertexIndex(dst_label, fd.edge_dst.id); if (!dst_index || !dst_index->IsReady()) { - THROW_CODE(InputError, - "Dst {} field {} has no available index!", dst_label, fd.edge_dst.id); + THROW_CODE(InputError, "Dst {} field {} has no available index!", dst_label, + fd.edge_dst.id); } // lookup index for src vid @@ -304,10 +306,9 @@ std::string lgraph::import_v2::ImportOnline::ImportEdges(LightningGraph* db, Tra continue; } std::lock_guard l(edges_mtx); - edges.emplace_back(src_vid, static_cast(label_id), dst_vid, - tid, prop, true); - edges.emplace_back(dst_vid, static_cast(label_id), src_vid, - tid, std::move(prop), false); + edges.emplace_back(src_vid, static_cast(label_id), dst_vid, tid, prop, true); + edges.emplace_back(dst_vid, static_cast(label_id), src_vid, tid, + std::move(prop), false); } }; DoMultiThreadWork(n, prepare_records, config.n_threads); @@ -349,8 +350,7 @@ std::string lgraph::import_v2::ImportOnline::HandleOnlineTextPackage( std::vector cds; LOG_INFO() << "desc: " << desc; cds = ImportConfParser::ParseFiles(nlohmann::json::parse(desc), false); - if (cds.size() != 1) - THROW_CODE(InputError, "config items number error: {}", desc); + if (cds.size() != 1) THROW_CODE(InputError, "config items number error: {}", desc); fd = cds[0]; if (!fd.is_vertex_file) { auto txn = db->CreateReadTxn(); @@ -436,40 +436,38 @@ std::string lgraph::import_v2::ImportOnline::HandleOnlineSchema(std::string&& de bool ok = db.AddLabel(v.is_vertex, v.name, fds, *options); if (ok) { LOG_INFO() << FMA_FMT("Add {} label:{}, detach:{}", v.is_vertex ? "vertex" : "edge", - v.name, options->detach_property); + v.name, options->detach_property); } else { - THROW_CODE(InputError, - "{} label:{} already exists", v.is_vertex ? "Vertex" : "Edge", v.name); + THROW_CODE(InputError, "{} label:{} already exists", v.is_vertex ? "Vertex" : "Edge", + v.name); } // create index for (auto& spec : v.columns) { if (v.is_vertex && spec.index && !spec.primary) { if (db.AddVertexIndex(v.name, spec.name, spec.idxType)) { - LOG_INFO() << FMA_FMT("Add vertex index [label:{}, field:{}, type:{}]", - v.name, spec.name, static_cast(spec.idxType)); + LOG_INFO() << FMA_FMT("Add vertex index [label:{}, field:{}, type:{}]", v.name, + spec.name, static_cast(spec.idxType)); } else { - THROW_CODE(InputError, - "Vertex index [label:{}, field:{}] already exists", - v.name, spec.name); + THROW_CODE(InputError, "Vertex index [label:{}, field:{}] already exists", + v.name, spec.name); } } else if (!v.is_vertex && spec.index) { if (db.AddEdgeIndex(v.name, spec.name, spec.idxType)) { - LOG_INFO() << FMA_FMT("Add edge index [label:{}, field:{}, type:{}]", - v.name, spec.name, static_cast(spec.idxType)); + LOG_INFO() << FMA_FMT("Add edge index [label:{}, field:{}, type:{}]", v.name, + spec.name, static_cast(spec.idxType)); } else { - THROW_CODE(InputError, - "Edge index [label:{}, field:{}] already exists", - v.name, spec.name); + THROW_CODE(InputError, "Edge index [label:{}, field:{}] already exists", v.name, + spec.name); } } if (spec.fulltext) { if (db.AddFullTextIndex(v.is_vertex, v.name, spec.name)) { - LOG_INFO() << FMA_FMT("Add fulltext index [label:{}, field:{}] success", - v.name, spec.name); + LOG_INFO() << FMA_FMT("Add fulltext index [label:{}, field:{}] success", v.name, + spec.name); } else { THROW_CODE(InputError, "Fulltext index [label:{}, field:{}] already exists", - v.name, spec.name); + v.name, spec.name); } } } diff --git a/src/import/import_utils.h b/src/import/import_utils.h index 4916041311..bea1f05cd0 100644 --- a/src/import/import_utils.h +++ b/src/import/import_utils.h @@ -1,16 +1,16 @@ /** -* Copyright 2022 AntGroup CO., Ltd. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -*/ + * Copyright 2022 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ #pragma once #include @@ -21,43 +21,40 @@ namespace lgraph { namespace import_v3 { -template +template static void encodeNumToStr(T num, std::string& ret) { - static_assert(std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v); + static_assert(std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v); - if constexpr(std::is_same_v) { - uint8_t val = uint8_t(num ^ (uint8_t)1<<7); + if constexpr (std::is_same_v) { + uint8_t val = uint8_t(num ^ (uint8_t)1 << 7); ret.append((const char*)&val, sizeof(val)); - } else if constexpr(std::is_same_v) { - uint16_t val = uint16_t(num ^ (uint16_t)1<<15); + } else if constexpr (std::is_same_v) { + uint16_t val = uint16_t(num ^ (uint16_t)1 << 15); boost::endian::native_to_big_inplace(val); ret.append((const char*)&val, sizeof(val)); - } else if constexpr(std::is_same_v) { - uint32_t val = uint32_t(num ^ (uint32_t)1<<31); + } else if constexpr (std::is_same_v) { + uint32_t val = uint32_t(num ^ (uint32_t)1 << 31); boost::endian::native_to_big_inplace(val); ret.append((const char*)&val, sizeof(val)); - } else if constexpr(std::is_same_v) { - uint64_t val = uint64_t(num ^ (uint64_t)1<<63); + } else if constexpr (std::is_same_v) { + uint64_t val = uint64_t(num ^ (uint64_t)1 << 63); boost::endian::native_to_big_inplace(val); ret.append((const char*)&val, sizeof(val)); - } else if constexpr(std::is_same_v) { + } else if constexpr (std::is_same_v) { uint32_t val = *(uint32_t*)# if (num >= 0) { - val |= (uint32_t)1<<31; + val |= (uint32_t)1 << 31; } else { val = ~val; } boost::endian::native_to_big_inplace(val); ret.append((const char*)&val, sizeof(val)); - } else if constexpr(std::is_same_v) { + } else if constexpr (std::is_same_v) { uint64_t val = *(uint64_t*)# if (num >= 0) { - val |= (uint64_t)1<<63; + val |= (uint64_t)1 << 63; } else { val = ~val; } @@ -66,44 +63,41 @@ static void encodeNumToStr(T num, std::string& ret) { } } -template +template static T decodeStrToNum(const char* str) { - static_assert(std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v || - std::is_same_v); + static_assert(std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v); - if constexpr(std::is_same_v) { + if constexpr (std::is_same_v) { uint8_t newVal = *(uint8_t*)str; - return int8_t(newVal ^ (uint8_t)1<<7); - } else if constexpr(std::is_same_v) { + return int8_t(newVal ^ (uint8_t)1 << 7); + } else if constexpr (std::is_same_v) { uint16_t newVal = *(uint16_t*)str; boost::endian::big_to_native_inplace(newVal); - return int16_t(newVal ^ (uint16_t)1<<15); - } else if constexpr(std::is_same_v) { + return int16_t(newVal ^ (uint16_t)1 << 15); + } else if constexpr (std::is_same_v) { uint32_t newVal = *(uint32_t*)str; boost::endian::big_to_native_inplace(newVal); - return int32_t(newVal ^ (uint32_t)1<<31); - } else if constexpr(std::is_same_v) { + return int32_t(newVal ^ (uint32_t)1 << 31); + } else if constexpr (std::is_same_v) { uint64_t newVal = *(uint64_t*)str; boost::endian::big_to_native_inplace(newVal); - return int64_t(newVal ^ (uint64_t)1<<63); - } else if constexpr(std::is_same_v) { + return int64_t(newVal ^ (uint64_t)1 << 63); + } else if constexpr (std::is_same_v) { uint32_t newVal = *(uint32_t*)str; boost::endian::big_to_native_inplace(newVal); - if ((newVal & (uint32_t)1<<31) > 0) { - newVal &= ~((uint32_t)1<<31); + if ((newVal & (uint32_t)1 << 31) > 0) { + newVal &= ~((uint32_t)1 << 31); } else { newVal = ~newVal; } return *(float*)(&newVal); - } else if constexpr(std::is_same_v) { + } else if constexpr (std::is_same_v) { uint64_t newVal = *(uint64_t*)str; boost::endian::big_to_native_inplace(newVal); - if ((newVal & (uint64_t)1<<63) > 0) { - newVal &= ~((uint64_t)1<<63); + if ((newVal & (uint64_t)1 << 63) > 0) { + newVal &= ~((uint64_t)1 << 63); } else { newVal = ~newVal; } @@ -117,71 +111,60 @@ static void AppendFieldData(std::string& ret, const FieldData& data) { FMA_ASSERT(false); break; case FieldType::BOOL: - { - auto val = field_data_helper::GetStoredValue(data); - encodeNumToStr(val, ret); - } - break; + { + auto val = field_data_helper::GetStoredValue(data); + encodeNumToStr(val, ret); + } break; case FieldType::FLOAT: - { - auto val = field_data_helper::GetStoredValue(data); - encodeNumToStr(val, ret); - } - break; + { + auto val = field_data_helper::GetStoredValue(data); + encodeNumToStr(val, ret); + } break; case FieldType::DOUBLE: - { - auto val = field_data_helper::GetStoredValue(data); - encodeNumToStr(val, ret); - } - break; + { + auto val = field_data_helper::GetStoredValue(data); + encodeNumToStr(val, ret); + } break; case FieldType::DATE: - { - auto val = field_data_helper::GetStoredValue(data); - encodeNumToStr(val, ret); - } - break; + { + auto val = field_data_helper::GetStoredValue(data); + encodeNumToStr(val, ret); + } break; case FieldType::DATETIME: - { - auto val = field_data_helper::GetStoredValue(data); - encodeNumToStr(val, ret); - } - break; + { + auto val = field_data_helper::GetStoredValue(data); + encodeNumToStr(val, ret); + } break; case FieldType::INT64: - { - auto val = field_data_helper::GetStoredValue(data); - encodeNumToStr(val, ret); - } - break; + { + auto val = field_data_helper::GetStoredValue(data); + encodeNumToStr(val, ret); + } break; case FieldType::INT32: - { - auto val = field_data_helper::GetStoredValue(data); - encodeNumToStr(val, ret); - } - break; + { + auto val = field_data_helper::GetStoredValue(data); + encodeNumToStr(val, ret); + } break; case FieldType::INT16: - { - auto val = field_data_helper::GetStoredValue(data); - encodeNumToStr(val, ret); - } - break; + { + auto val = field_data_helper::GetStoredValue(data); + encodeNumToStr(val, ret); + } break; case FieldType::INT8: - { - auto val = field_data_helper::GetStoredValue(data); - encodeNumToStr(val, ret); - } - break; + { + auto val = field_data_helper::GetStoredValue(data); + encodeNumToStr(val, ret); + } break; case FieldType::STRING: - { - const auto& val = field_data_helper::GetStoredValue(data); - ret.append(val); - } - break; + { + const auto& val = field_data_helper::GetStoredValue(data); + ret.append(val); + } break; case FieldType::BLOB: - { - const auto& val = field_data_helper::GetStoredValue(data); - ret.append(val); - } - break; + { + const auto& val = field_data_helper::GetStoredValue(data); + ret.append(val); + } break; default: FMA_ASSERT(false); break; diff --git a/src/import/import_v2.cpp b/src/import/import_v2.cpp index 9df17f401b..54e83d8077 100644 --- a/src/import/import_v2.cpp +++ b/src/import/import_v2.cpp @@ -118,7 +118,7 @@ void lgraph::import_v2::Importer::DoImportOffline() { bool ok = db.AddLabel(v.is_vertex, v.name, fds, *options); if (ok) { LOG_INFO() << FMA_FMT("Add {} label:{} success", v.is_vertex ? "vertex" : "edge", - v.name); + v.name); } else { throw std::runtime_error( FMA_FMT("Add {} label:{} error", v.is_vertex ? "vertex" : "edge", v.name)); @@ -126,7 +126,7 @@ void lgraph::import_v2::Importer::DoImportOffline() { } } - auto import_index = [&](){ + auto import_index = [&]() { for (auto& v : schema.label_desc) { for (auto& spec : v.columns) { if (v.is_vertex && spec.index && !spec.primary && @@ -134,47 +134,45 @@ void lgraph::import_v2::Importer::DoImportOffline() { // create index, ID column has creadted if (db.AddVertexIndex(v.name, spec.name, spec.idxType)) { LOG_INFO() << FMA_FMT("Add vertex index [label:{}, field:{}, type:{}]", - v.name, spec.name, static_cast(spec.idxType)); + v.name, spec.name, static_cast(spec.idxType)); } else { - THROW_CODE(InputError, - "Vertex index [label:{}, field:{}] already exists", - v.name, spec.name); + THROW_CODE(InputError, "Vertex index [label:{}, field:{}] already exists", + v.name, spec.name); } } else if (v.is_vertex && spec.index && !spec.primary && (spec.idxType == lgraph::IndexType::GlobalUniqueIndex || spec.idxType == lgraph::IndexType::PairUniqueIndex)) { THROW_CODE(InputError, - "offline import does not support to create a unique " - "index [label:{}, field:{}]. You should create an index for " - "an attribute column after the import is complete", - v.name, spec.name); + "offline import does not support to create a unique " + "index [label:{}, field:{}]. You should create an index for " + "an attribute column after the import is complete", + v.name, spec.name); } else if (!v.is_vertex && spec.index && spec.idxType != lgraph::IndexType::GlobalUniqueIndex) { if (db.AddEdgeIndex(v.name, spec.name, spec.idxType)) { LOG_INFO() << FMA_FMT("Add edge index [label:{}, field:{}, type:{}]", - v.name, spec.name, static_cast(spec.idxType)); + v.name, spec.name, static_cast(spec.idxType)); } else { - THROW_CODE(InputError, - "Edge index [label:{}, field:{}] already exists", - v.name, spec.name); + THROW_CODE(InputError, "Edge index [label:{}, field:{}] already exists", + v.name, spec.name); } } else if (!v.is_vertex && spec.index && spec.idxType == lgraph::IndexType::GlobalUniqueIndex) { THROW_CODE(InputError, - "offline import does not support to create a unique " - "index [label:{}, field:{}]. You should create an index for " - "an attribute column after the import is complete", - v.name, spec.name); + "offline import does not support to create a unique " + "index [label:{}, field:{}]. You should create an index for " + "an attribute column after the import is complete", + v.name, spec.name); } if (spec.fulltext) { bool ok = db.AddFullTextIndex(v.is_vertex, v.name, spec.name); if (ok) { LOG_INFO() << FMA_FMT("Add fulltext index [{} label:{}, field:{}]", - v.is_vertex ? "vertex" : "edge", v.name, spec.name); + v.is_vertex ? "vertex" : "edge", v.name, spec.name); } else { THROW_CODE(InputError, - "Fulltext index [{} label:{}, field:{}] already exists", - v.is_vertex ? "vertex" : "edge", v.name, spec.name); + "Fulltext index [{} label:{}, field:{}] already exists", + v.is_vertex ? "vertex" : "edge", v.name, spec.name); } } } @@ -232,40 +230,40 @@ void lgraph::import_v2::Importer::DoImportOffline() { for (auto& act : actions) { switch (act.type) { case PlanExecutor::Action::LOAD_VERTEX: - { - const std::string& vlabel = vid_label[act.vid]; - LOG_INFO() << "Load vertex label " << vlabel; - if (!config_.dry_run) { - LoadVertexFiles(db.GetLightningGraph(), v_label_files[vlabel], - schema.FindVertexLabel(vlabel)); - } - break; + { + const std::string& vlabel = vid_label[act.vid]; + LOG_INFO() << "Load vertex label " << vlabel; + if (!config_.dry_run) { + LoadVertexFiles(db.GetLightningGraph(), v_label_files[vlabel], + schema.FindVertexLabel(vlabel)); } + break; + } case PlanExecutor::Action::LOAD_EDGE: - { - const std::string& src_label = vid_label[act.edge.first]; - const std::string& dst_label = vid_label[act.edge.second]; - LOG_INFO() << "Load edges from vertex " << src_label << " to " << dst_label; - if (!config_.dry_run) { - std::map>& elabel_files = - src_dst_files[std::make_pair(src_label, dst_label)]; - for (auto& kv : elabel_files) { - LoadEdgeFiles(db.GetLightningGraph(), src_label, dst_label, - schema.FindEdgeLabel(kv.first), kv.second); - } + { + const std::string& src_label = vid_label[act.edge.first]; + const std::string& dst_label = vid_label[act.edge.second]; + LOG_INFO() << "Load edges from vertex " << src_label << " to " << dst_label; + if (!config_.dry_run) { + std::map>& elabel_files = + src_dst_files[std::make_pair(src_label, dst_label)]; + for (auto& kv : elabel_files) { + LoadEdgeFiles(db.GetLightningGraph(), src_label, dst_label, + schema.FindEdgeLabel(kv.first), kv.second); } - break; } + break; + } case PlanExecutor::Action::DUMP_VERTEX: - { - const std::string& vlabel = vid_label[act.vid]; - LOG_INFO() << "Write vertex label " << vlabel; - const ColumnSpec& key_spec = schema.FindVertexLabel(vlabel).GetPrimaryField(); - if (!config_.dry_run) { - WriteVertex(db.GetLightningGraph(), vlabel, key_spec.name, key_spec.type); - } - break; + { + const std::string& vlabel = vid_label[act.vid]; + LOG_INFO() << "Write vertex label " << vlabel; + const ColumnSpec& key_spec = schema.FindVertexLabel(vlabel).GetPrimaryField(); + if (!config_.dry_run) { + WriteVertex(db.GetLightningGraph(), vlabel, key_spec.name, key_spec.type); } + break; + } case PlanExecutor::Action::DONE: break; } @@ -415,12 +413,12 @@ void lgraph::import_v2::Importer::LoadVertexFiles(LightningGraph* db, ret.emplace_back(record.AsString(), start_vid); start_vid++; } catch (std::exception& e) { - OnErrorOffline(fma_common::StringFormatter::Format( - "Failed to pack data fields into a record: " - "{}.\nField data is:\n[{}]", - e.what(), - LimitedLengthStr(fma_common::ToString(v))), - config_.continue_on_error); + OnErrorOffline( + fma_common::StringFormatter::Format( + "Failed to pack data fields into a record: " + "{}.\nField data is:\n[{}]", + e.what(), LimitedLengthStr(fma_common::ToString(v))), + config_.continue_on_error); } } } @@ -452,7 +450,7 @@ void lgraph::import_v2::Importer::LoadVertexFiles(LightningGraph* db, double percent = (double)(nblock * config_.parse_block_size) * 100 / std::max(desc->size, 1); LOG_INFO() << "\t " << percent << "% - Read " << nlines << " lines at " - << (double)nlines / (t2 - t1) / 1000 << " KLine/S"; + << (double)nlines / (t2 - t1) / 1000 << " KLine/S"; } } } catch (const std::exception& e) { @@ -552,7 +550,7 @@ void lgraph::import_v2::Importer::LoadEdgeFiles(LightningGraph* db, std::string txn.Abort(); } std::vector unique_index_pos; - for (auto & cs : ld.GetColumnSpecs()) { + for (auto& cs : ld.GetColumnSpecs()) { if (cs.index && (cs.idxType == lgraph::IndexType::PairUniqueIndex)) { unique_index_pos.push_back(desc->FindIdxExcludeSkip(cs.name)); } @@ -611,8 +609,7 @@ void lgraph::import_v2::Importer::LoadEdgeFiles(LightningGraph* db, std::string { for (const auto& pos : unique_index_pos) { const FieldData& unique_index_col = edge[pos]; - if (unique_index_col.IsNull() || - unique_index_col.is_empty_buf()) { + if (unique_index_col.IsNull() || unique_index_col.is_empty_buf()) { OnErrorOffline("Invalid unique index key", config_.continue_on_error); continue; @@ -620,8 +617,8 @@ void lgraph::import_v2::Importer::LoadEdgeFiles(LightningGraph* db, std::string if (unique_index_col.IsString() && unique_index_col.string().size() > lgraph::_detail::MAX_KEY_SIZE) { - OnErrorOffline("Unique index string key is too long: " - + unique_index_col.string().substr(0, 1024), + OnErrorOffline("Unique index string key is too long: " + + unique_index_col.string().substr(0, 1024), config_.continue_on_error); continue; } @@ -636,7 +633,7 @@ void lgraph::import_v2::Importer::LoadEdgeFiles(LightningGraph* db, std::string lgraph::import_v3::AppendFieldData(unique_key, unique_index_col); if (!unique_index_keys.insert(unique_key, 0)) { OnErrorOffline("Duplicate unique index field: " + - unique_index_col.ToString(), + unique_index_col.ToString(), config_.continue_on_error); resolve_success = false; break; @@ -661,12 +658,12 @@ void lgraph::import_v2::Importer::LoadEdgeFiles(LightningGraph* db, std::string ret.second.emplace_back(dst_vid, static_cast(label_id), tid, src_vid, record.AsString()); } catch (std::exception& e) { - OnErrorOffline(fma_common::StringFormatter::Format( - "Failed to pack data fields into a record: " - "{}.\nField data is:\n[{}]", - e.what(), - LimitedLengthStr(fma_common::ToString(edge))), - config_.continue_on_error); + OnErrorOffline( + fma_common::StringFormatter::Format( + "Failed to pack data fields into a record: " + "{}.\nField data is:\n[{}]", + e.what(), LimitedLengthStr(fma_common::ToString(edge))), + config_.continue_on_error); } } } @@ -691,7 +688,7 @@ void lgraph::import_v2::Importer::LoadEdgeFiles(LightningGraph* db, std::string double t2 = fma_common::GetTime(); double percent = (double)(nblock * config_.parse_block_size) * 100 / desc->size; LOG_INFO() << "\t " << percent << "% - Read " << nlines << " lines at " - << (double)nlines / (t2 - t1) / 1000 << " KLine/S"; + << (double)nlines / (t2 - t1) / 1000 << " KLine/S"; } } } catch (...) { @@ -818,7 +815,7 @@ void lgraph::import_v2::Importer::WriteVertex(LightningGraph* db, const std::str double t2 = fma_common::GetTime(); double percent = (double)total_committed * 100 / n_vertices; LOG_INFO() << "\t " << percent << "% - Committed " << total_committed - << ", time to write batch=" << t2 - t1; + << ", time to write batch=" << t2 - t1; }, nullptr, 0, 1, 1); // packer packs vertex property, out-edges and in-edges into KVs @@ -981,7 +978,7 @@ void lgraph::import_v2::Importer::OnErrorOffline(const std::string& msg, bool co LOG_WARN() << msg; if (!continue_on_error) { LOG_ERROR() << "If you wish to ignore the errors, use " - "--continue_on_error true"; + "--continue_on_error true"; exit(-1); } } diff --git a/src/import/import_v2.h b/src/import/import_v2.h index 829bd7a335..d6771a20da 100644 --- a/src/import/import_v2.h +++ b/src/import/import_v2.h @@ -116,9 +116,9 @@ class Importer { explicit cuckoohash_map_foreach_helper(const parent_t& it) : parent_t(it) {} template - void foreach(F fn){ -// TODO(jiazhenjiang): mutil-thread memory issues need to be fixed -// #pragma omp parallel for schedule(dynamic) + void foreach (F fn) { + // TODO(jiazhenjiang): mutil-thread memory issues need to be fixed + // #pragma omp parallel for schedule(dynamic) for (decltype(parent_t::index_) index = 0; index < parent_t::buckets_->size(); ++index) { for (decltype(parent_t::slot_) slot = 0; slot < Map::slot_per_bucket(); ++slot) { @@ -135,7 +135,7 @@ class Importer { void cuckoohash_map_foreach(Map& map, F fn) { const auto& lt = map.lock_table(); cuckoohash_map_foreach_helper it(lt.cbegin()); - it.foreach(fn); + it.foreach (fn); } #else template diff --git a/src/import/import_v3.cpp b/src/import/import_v3.cpp index 37db2add4c..5924c47ac6 100644 --- a/src/import/import_v3.cpp +++ b/src/import/import_v3.cpp @@ -40,7 +40,7 @@ void Importer::OnErrorOffline(const std::string& msg) { LOG_WARN() << msg; if (!config_.continue_on_error) { LOG_WARN() << "If you wish to ignore the errors, use " - "--continue_on_error true"; + "--continue_on_error true"; if (!config_.import_online) { exit(-1); } else { @@ -72,10 +72,10 @@ void Importer::DoImportOffline() { .MeetEdgeConstraints(file.edge_src.label, file.edge_dst.label)) { throw std::runtime_error(FMA_FMT("{} not meet the edge constraints", file.path)); } - file.edge_src.id = schemaDesc_.FindVertexLabel( - file.edge_src.label).GetPrimaryField().name; - file.edge_dst.id = schemaDesc_.FindVertexLabel( - file.edge_dst.label).GetPrimaryField().name; + file.edge_src.id = + schemaDesc_.FindVertexLabel(file.edge_src.label).GetPrimaryField().name; + file.edge_dst.id = + schemaDesc_.FindVertexLabel(file.edge_dst.label).GetPrimaryField().name; } import_v2::ImportConfParser::CheckConsistent(schemaDesc_, file); } @@ -104,15 +104,14 @@ void Importer::DoImportOffline() { bool ok = db_->AddLabel(v.is_vertex, v.name, fds, *options); if (ok) { if (!config_.import_online) { - LOG_INFO() << FMA_FMT("Add {} label:{}", v.is_vertex ? "vertex" : "edge", - v.name); + LOG_INFO() << FMA_FMT("Add {} label:{}", v.is_vertex ? "vertex" : "edge", v.name); } else { - online_full_import_oss << FMA_FMT("Add {} label:{}\n", - v.is_vertex ? "vertex" : "edge", v.name); + online_full_import_oss + << FMA_FMT("Add {} label:{}\n", v.is_vertex ? "vertex" : "edge", v.name); } } else { - THROW_CODE(InputError, - "{} label:{} already exists", v.is_vertex ? "Vertex" : "Edge", v.name); + THROW_CODE(InputError, "{} label:{} already exists", v.is_vertex ? "Vertex" : "Edge", + v.name); } auto lid = db_->CreateReadTxn().GetLabelId(v.is_vertex, v.name); if (v.is_vertex) { @@ -147,64 +146,70 @@ void Importer::DoImportOffline() { // create index, ID column has creadted if (db_->AddVertexIndex(v.name, spec.name, spec.idxType)) { if (!config_.import_online) { - LOG_INFO() << FMA_FMT("Add vertex index [label:{}, field:{}, type:{}]", - v.name, spec.name, static_cast(spec.idxType)); + LOG_INFO() + << FMA_FMT("Add vertex index [label:{}, field:{}, type:{}]", v.name, + spec.name, static_cast(spec.idxType)); } else { - online_full_import_oss << FMA_FMT("Add vertex index [label:{}, " - "field:{}, type:{}]\n", v.name, spec.name, - static_cast(spec.idxType)); + online_full_import_oss << FMA_FMT( + "Add vertex index [label:{}, " + "field:{}, type:{}]\n", + v.name, spec.name, static_cast(spec.idxType)); } } else { - THROW_CODE(InputError, - "Vertex index [label:{}, field:{}] already exists", - v.name, spec.name); + THROW_CODE(InputError, "Vertex index [label:{}, field:{}] already exists", + v.name, spec.name); } } else if (v.is_vertex && spec.index && !spec.primary && spec.idxType != lgraph::IndexType::NonuniqueIndex) { THROW_CODE(InputError, - "offline import does not support to create a unique " - "index [label:{}, field:{}]. You should create an index for " - "an attribute column after the import is complete", - v.name, spec.name); + "offline import does not support to create a unique " + "index [label:{}, field:{}]. You should create an index for " + "an attribute column after the import is complete", + v.name, spec.name); } else if (!v.is_vertex && spec.index && (spec.idxType == lgraph::IndexType::NonuniqueIndex || - spec.idxType == lgraph::IndexType::PairUniqueIndex)) { + spec.idxType == lgraph::IndexType::PairUniqueIndex)) { if (db_->AddEdgeIndex(v.name, spec.name, spec.idxType)) { if (!config_.import_online) { - LOG_INFO() << FMA_FMT("Add edge index [label:{}, field:{}, type:{}]", - v.name, spec.name, static_cast(spec.idxType)); + LOG_INFO() + << FMA_FMT("Add edge index [label:{}, field:{}, type:{}]", v.name, + spec.name, static_cast(spec.idxType)); } else { - online_full_import_oss << FMA_FMT("Add edge index [label:{}, field:{}," - " type:{}]\n", v.name, spec.name, static_cast(spec.idxType)); + online_full_import_oss << FMA_FMT( + "Add edge index [label:{}, field:{}," + " type:{}]\n", + v.name, spec.name, static_cast(spec.idxType)); } } else { - THROW_CODE(InputError, - "Edge index [label:{}, field:{}] already exists", - v.name, spec.name); + THROW_CODE(InputError, "Edge index [label:{}, field:{}] already exists", + v.name, spec.name); } } else if (!v.is_vertex && spec.index && spec.idxType == lgraph::IndexType::GlobalUniqueIndex) { THROW_CODE(InputError, - "offline import does not support to create an unique " - "index [label:{}, field:{}]. You should create an index for " - "an attribute column after the import is complete", - v.name, spec.name); + "offline import does not support to create an unique " + "index [label:{}, field:{}]. You should create an index for " + "an attribute column after the import is complete", + v.name, spec.name); } if (spec.fulltext) { bool ok = db_->AddFullTextIndex(v.is_vertex, v.name, spec.name); if (ok) { if (!config_.import_online) { - LOG_INFO() << FMA_FMT("Add fulltext index [{} label:{}, field:{}]", - v.is_vertex ? "vertex" : "edge", v.name, spec.name); + LOG_INFO() + << FMA_FMT("Add fulltext index [{} label:{}, field:{}]", + v.is_vertex ? "vertex" : "edge", v.name, spec.name); } else { - online_full_import_oss << FMA_FMT("Add fulltext index [{} label:{}, " - "field:{}]\n", v.is_vertex ? "vertex" : "edge", v.name, spec.name); + online_full_import_oss << FMA_FMT( + "Add fulltext index [{} label:{}, " + "field:{}]\n", + v.is_vertex ? "vertex" : "edge", v.name, spec.name); } } else { THROW_CODE(InputError, - "Fulltext index [{} label:{}, field:{}] already exists", - v.is_vertex ? "vertex" : "edge", v.name, spec.name); + "Fulltext index [{} label:{}, field:{}] already exists", + v.is_vertex ? "vertex" : "edge", v.name, spec.name); } } } @@ -234,16 +239,14 @@ void Importer::VertexDataToSST() { rocksdb::DB* db; auto s = rocksdb::DB::Open(options, vid_path_, &db); if (!s.ok()) { - throw std::runtime_error( - FMA_FMT("Opening DB failed, error: {}", s.ToString().c_str())); + throw std::runtime_error(FMA_FMT("Opening DB failed, error: {}", s.ToString().c_str())); } rocksdb_vids_.reset(db); } - parse_file_threads_ = std::make_unique( - config_.parse_file_threads); - generate_sst_threads_ = std::make_unique( - config_.generate_sst_threads); + parse_file_threads_ = std::make_unique(config_.parse_file_threads); + generate_sst_threads_ = + std::make_unique(config_.generate_sst_threads); struct VertexDataBlock { VertexId start_vid = 0; uint16_t key_col_id = 0; @@ -263,7 +266,7 @@ void Importer::VertexDataToSST() { if (!file.is_vertex_file) { continue; } - boost::asio::post(*parse_file_threads_, [this, &blob_writer, &pending_tasks, &file](){ + boost::asio::post(*parse_file_threads_, [this, &blob_writer, &pending_tasks, &file]() { try { std::vector fts; { @@ -329,10 +332,10 @@ void Importer::VertexDataToSST() { std::vector vec_kvs; VertexId start_vid = dataBlock->start_vid; std::unique_ptr vertex_sst_writer( - new rocksdb::SstFileWriter(rocksdb::EnvOptions(), - {}, nullptr, false)); - std::string sst_path = sst_files_path_ + "/vertex_" + - std::to_string(start_vid); + new rocksdb::SstFileWriter(rocksdb::EnvOptions(), {}, nullptr, + false)); + std::string sst_path = + sst_files_path_ + "/vertex_" + std::to_string(start_vid); auto s = vertex_sst_writer->Open(sst_path); if (!s.ok()) { throw std::runtime_error( @@ -395,7 +398,7 @@ void Importer::VertexDataToSST() { }); } s = vertex_sst_writer->Put({(const char*)&vid, sizeof(vid)}, - {value.Data(), value.Size()}); + {value.Data(), value.Size()}); if (!s.ok()) { throw std::runtime_error( FMA_FMT("vertex_sst_writer.Put error, {}", s.ToString())); @@ -416,10 +419,9 @@ void Importer::VertexDataToSST() { if (!config_.keep_vid_in_memory && !vec_kvs.empty()) { rocksdb::SstFileWriter vid_sst_writer(rocksdb::EnvOptions(), {}, - nullptr, false); - s = vid_sst_writer.Open( - sst_files_path_ + "/vid_" + - std::to_string(dataBlock->start_vid)); + nullptr, false); + s = vid_sst_writer.Open(sst_files_path_ + "/vid_" + + std::to_string(dataBlock->start_vid)); if (!s.ok()) { throw std::runtime_error( FMA_FMT("Failed to open vid_sst_writer")); @@ -470,7 +472,7 @@ void Importer::VertexDataToSST() { if (!config_.keep_vid_in_memory) { auto begin = fma_common::GetTime(); std::vector ingest_files; - for (const auto & entry : std::filesystem::directory_iterator(sst_files_path_)) { + for (const auto& entry : std::filesystem::directory_iterator(sst_files_path_)) { const auto& path = entry.path().string(); if (path.find("vid_") != std::string::npos) { ingest_files.push_back(path); @@ -502,8 +504,9 @@ void Importer::VertexDataToSST() { if (!config_.import_online) { LOG_INFO() << "vids CompactRange, time: " << fma_common::GetTime() - begin << "s"; } else { - online_full_import_oss << "vids CompactRange, time: " + - std::to_string(fma_common::GetTime() - begin) + "s\n"; + online_full_import_oss + << "vids CompactRange, time: " + std::to_string(fma_common::GetTime() - begin) + + "s\n"; } } @@ -518,10 +521,9 @@ void Importer::VertexDataToSST() { void Importer::EdgeDataToSST() { auto t1 = fma_common::GetTime(); - parse_file_threads_ = std::make_unique( - config_.parse_file_threads); - generate_sst_threads_ = std::make_unique( - config_.generate_sst_threads); + parse_file_threads_ = std::make_unique(config_.parse_file_threads); + generate_sst_threads_ = + std::make_unique(config_.generate_sst_threads); struct EdgeDataBlock { std::vector> block; uint64_t start_eid = 0; @@ -539,7 +541,7 @@ void Importer::EdgeDataToSST() { struct KV { std::string key; Value value; - KV(std::string k, Value v): key(std::move(k)), value(std::move(v)) {} + KV(std::string k, Value v) : key(std::move(k)), value(std::move(v)) {} }; BufferedBlobWriter blob_writer(db_->GetLightningGraph()); std::atomic pending_tasks(0); @@ -550,9 +552,8 @@ void Importer::EdgeDataToSST() { if (file.is_vertex_file) { continue; } - boost::asio::post(*parse_file_threads_, - [this, &file, &blob_writer, &pending_tasks, - &sst_file_id, &unique_index_keys](){ + boost::asio::post(*parse_file_threads_, [this, &file, &blob_writer, &pending_tasks, + &sst_file_id, &unique_index_keys]() { try { std::vector fts; { @@ -650,10 +651,9 @@ void Importer::EdgeDataToSST() { pending_tasks--; uint64_t num = ++sst_file_id; std::unique_ptr sst_file_writer( - new rocksdb::SstFileWriter(rocksdb::EnvOptions(), - {}, nullptr, false)); - std::string sst_path = sst_files_path_ + "/edge_" + - std::to_string(num); + new rocksdb::SstFileWriter(rocksdb::EnvOptions(), {}, nullptr, + false)); + std::string sst_path = sst_files_path_ + "/edge_" + std::to_string(num); auto s = sst_file_writer->Open(sst_path); if (!s.ok()) { throw std::runtime_error(FMA_FMT("failed to open sst_file_writer")); @@ -709,8 +709,8 @@ void Importer::EdgeDataToSST() { continue; } auto slice_k = vid_iter->key(); - auto offset = slice_k.data() + - (slice_k.size() - sizeof(VertexId)); + auto offset = + slice_k.data() + (slice_k.size() - sizeof(VertexId)); src_vid = *(VertexId*)offset; slice_k.remove_suffix(sizeof(VertexId)); if (slice_k.compare(k) != 0) { @@ -746,8 +746,8 @@ void Importer::EdgeDataToSST() { continue; } auto slice_k = vid_iter->key(); - auto offset = slice_k.data() + - (slice_k.size() - sizeof(VertexId)); + auto offset = + slice_k.data() + (slice_k.size() - sizeof(VertexId)); dst_vid = *(VertexId*)offset; slice_k.remove_suffix(sizeof(VertexId)); if (slice_k.compare(k) != 0) { @@ -770,8 +770,8 @@ void Importer::EdgeDataToSST() { if (unique_index_col.IsString() && unique_index_col.string().size() > lgraph::_detail::MAX_KEY_SIZE) { - OnErrorOffline("Unique index string key is too long: " - + unique_index_col.string().substr(0, 1024)); + OnErrorOffline("Unique index string key is too long: " + + unique_index_col.string().substr(0, 1024)); continue; } std::string unique_key; @@ -849,7 +849,7 @@ void Importer::EdgeDataToSST() { [](const KV& a, const KV& b) { return a.key < b.key; }); for (auto& pair : vec_kvs) { sst_file_writer->Put(pair.key, - {pair.value.Data(), pair.value.Size()}); + {pair.value.Data(), pair.value.Size()}); } std::vector().swap(vec_kvs); sst_file_writer->Finish(); @@ -906,7 +906,7 @@ void Importer::VertexPrimaryIndexToLmdb() { LabelId preLabelId = std::numeric_limits::max(); auto txn = db_->CreateWriteTxn(); VertexIndex* vertexIndex = nullptr; - auto write_index = [&](const std::string& key, VertexId vid){ + auto write_index = [&](const std::string& key, VertexId vid) { FMA_DBG_CHECK(key.size() > sizeof(LabelId)); LabelId labelId = *((LabelId*)key.data()); if (labelId != preLabelId) { @@ -928,64 +928,54 @@ void Importer::VertexPrimaryIndexToLmdb() { size_t size = key.size() - sizeof(LabelId); switch (vertexIndex->KeyType()) { case FieldType::BOOL: - { - auto k = decodeStrToNum(p); - vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); - } - break; + { + auto k = decodeStrToNum(p); + vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); + } break; case FieldType::FLOAT: - { - auto k = decodeStrToNum(p); - vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); - } - break; + { + auto k = decodeStrToNum(p); + vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); + } break; case FieldType::DOUBLE: - { - auto k = decodeStrToNum(p); - vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); - } - break; + { + auto k = decodeStrToNum(p); + vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); + } break; case FieldType::DATE: - { - auto k = decodeStrToNum(p); - vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); - } - break; + { + auto k = decodeStrToNum(p); + vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); + } break; case FieldType::DATETIME: - { - auto k = decodeStrToNum(p); - vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); - } - break; + { + auto k = decodeStrToNum(p); + vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); + } break; case FieldType::INT64: - { - auto k = decodeStrToNum(p); - vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); - } - break; + { + auto k = decodeStrToNum(p); + vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); + } break; case FieldType::INT32: - { - auto k = decodeStrToNum(p); - vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); - } - break; + { + auto k = decodeStrToNum(p); + vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); + } break; case FieldType::INT16: - { - auto k = decodeStrToNum(p); - vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); - } - break; + { + auto k = decodeStrToNum(p); + vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); + } break; case FieldType::INT8: - { - auto k = decodeStrToNum(p); - vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); - } - break; + { + auto k = decodeStrToNum(p); + vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(&k, sizeof(k)), vid); + } break; case FieldType::STRING: - { - vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(p, size), vid); - } - break; + { + vertexIndex->_AppendVertexIndexEntry(txn.GetTxn(), Value(p, size), vid); + } break; default: FMA_ASSERT(false); } @@ -1038,8 +1028,7 @@ void Importer::VertexPrimaryIndexToLmdb() { } rocksdb_vids_.reset(nullptr); } -typedef std::vector>::iterator IT; +typedef std::vector>::iterator IT; void Importer::RocksdbToLmdb() { auto t1 = fma_common::GetTime(); std::unique_ptr rocksdb; @@ -1060,7 +1049,7 @@ void Importer::RocksdbToLmdb() { rocksdb.reset(db); } std::vector ingest_files; - for (const auto & entry : std::filesystem::directory_iterator(sst_files_path_)) { + for (const auto& entry : std::filesystem::directory_iterator(sst_files_path_)) { ingest_files.push_back(entry.path().generic_string()); } if (ingest_files.empty()) { @@ -1069,7 +1058,8 @@ void Importer::RocksdbToLmdb() { if (!config_.import_online) { exit(-1); } else { - throw std::runtime_error("no sst files are created, " + throw std::runtime_error( + "no sst files are created, " "please check if the input vertex and edge files are valid"); } } @@ -1092,8 +1082,8 @@ void Importer::RocksdbToLmdb() { if (!config_.import_online) { LOG_INFO() << "CompactRange, time: " << fma_common::GetTime() - begin << "s"; } else { - online_full_import_oss << "CompactRange, time: " + - std::to_string(fma_common::GetTime() - begin) + "s\n"; + online_full_import_oss + << "CompactRange, time: " + std::to_string(fma_common::GetTime() - begin) + "s\n"; } } if (!config_.keep_vid_in_memory) { @@ -1125,16 +1115,15 @@ void Importer::RocksdbToLmdb() { std::atomic pending_tasks(0); std::atomic stage(0); - std::unique_ptr lmdb_writer( - new boost::asio::thread_pool(1)); + std::unique_ptr lmdb_writer(new boost::asio::thread_pool(1)); std::unique_ptr rocksdb_readers( new boost::asio::thread_pool(config_.read_rocksdb_threads)); for (uint16_t i = 0; i < config_.read_rocksdb_threads; i++) { - boost::asio::post(*rocksdb_readers, [this, i, &pending_tasks, - &rocksdb, &lmdb_writer, &stage]() { + boost::asio::post(*rocksdb_readers, [this, i, &pending_tasks, &rocksdb, &lmdb_writer, + &stage]() { uint64_t start_vid = i * config_.vid_num_per_reading; uint64_t bigend_start_vid = boost::endian::native_to_big(start_vid); - uint64_t end_vid = (i+1) * config_.vid_num_per_reading; + uint64_t end_vid = (i + 1) * config_.vid_num_per_reading; uint64_t bigend_end_vid = boost::endian::native_to_big(end_vid); rocksdb::ReadOptions options; options.ignore_range_deletions = true; @@ -1157,52 +1146,51 @@ void Importer::RocksdbToLmdb() { VertexId pre_vid = InvalidVid; EdgeUid last_uid(-1, -1, 0, -1, -1); - auto throw_kvs_to_lmdb = [&lmdb_writer, &pending_tasks, this, &stage, i] - (std::vector> kvs, - std::vector> v_property, - std::vector> e_property){ - while (stage != i || pending_tasks > 1) { - fma_common::SleepUs(1000); - } - if (kvs.empty()) { - return; - } - pending_tasks++; - boost::asio::post(*lmdb_writer, [this, &pending_tasks, - kvs = std::move(kvs), - v_property = std::move(v_property), - e_property = std::move(e_property)]() { - Transaction txn = db_->CreateWriteTxn(); - for (auto& kv : kvs) { - txn.ImportAppendDataRaw(kv.first, kv.second); - } - txn.RefreshNextVid(); - for (auto& pro : v_property) { - auto s = (Schema*)(txn.GetSchema(std::get<0>(pro), true)); - s->AddDetachedVertexProperty( - txn.GetTxn(), std::get<1>(pro), std::get<2>(pro)); + auto throw_kvs_to_lmdb = + [&lmdb_writer, &pending_tasks, this, &stage, i]( + std::vector> kvs, + std::vector> v_property, + std::vector> e_property) { + while (stage != i || pending_tasks > 1) { + fma_common::SleepUs(1000); } - for (auto& pro : e_property) { - auto s = (Schema*)(txn.GetSchema(std::get<0>(pro), false)); - s->AddDetachedEdgeProperty( - txn.GetTxn(), std::get<1>(pro), std::get<2>(pro)); + if (kvs.empty()) { + return; } - txn.Commit(); - pending_tasks--; - }); - }; + pending_tasks++; + boost::asio::post(*lmdb_writer, [this, &pending_tasks, kvs = std::move(kvs), + v_property = std::move(v_property), + e_property = std::move(e_property)]() { + Transaction txn = db_->CreateWriteTxn(); + for (auto& kv : kvs) { + txn.ImportAppendDataRaw(kv.first, kv.second); + } + txn.RefreshNextVid(); + for (auto& pro : v_property) { + auto s = (Schema*)(txn.GetSchema(std::get<0>(pro), true)); + s->AddDetachedVertexProperty(txn.GetTxn(), std::get<1>(pro), + std::get<2>(pro)); + } + for (auto& pro : e_property) { + auto s = (Schema*)(txn.GetSchema(std::get<0>(pro), false)); + s->AddDetachedEdgeProperty(txn.GetTxn(), std::get<1>(pro), + std::get<2>(pro)); + } + txn.Commit(); + pending_tasks--; + }); + }; auto make_kvs = [&]() { if (!split) { lgraph::graph::VertexValue vov(GetConstRef(vdata)); IT next_beg; lgraph::graph::EdgeValue oev(outs.begin(), outs.end(), out_last_lid, - out_last_tid, out_last_dst, out_last_eid, - next_beg, true); + out_last_tid, out_last_dst, out_last_eid, next_beg, + true); FMA_DBG_ASSERT(next_beg == outs.end()); outs.clear(); - lgraph::graph::EdgeValue iev(ins.begin(), ins.end(), in_last_lid, - in_last_tid, in_last_dst, in_last_eid, - next_beg, true); + lgraph::graph::EdgeValue iev(ins.begin(), ins.end(), in_last_lid, in_last_tid, + in_last_dst, in_last_eid, next_beg, true); FMA_DBG_ASSERT(next_beg == ins.end()); ins.clear(); Value data; @@ -1217,8 +1205,7 @@ void Importer::RocksdbToLmdb() { out_last_tid, out_last_dst, out_last_eid, next_start, true); FMA_DBG_ASSERT(next_start == outs.end()); - kvs.emplace_back(oev.CreateOutEdgeKey(pre_vid), - std::move(oev.GetBuf())); + kvs.emplace_back(oev.CreateOutEdgeKey(pre_vid), std::move(oev.GetBuf())); all_kv_size += kvs.back().first.Size() + kvs.back().second.Size(); outs.clear(); } @@ -1228,8 +1215,7 @@ void Importer::RocksdbToLmdb() { in_last_tid, in_last_dst, in_last_eid, next_start, true); FMA_DBG_ASSERT(next_start == ins.end()); - kvs.emplace_back(iev.CreateInEdgeKey(pre_vid), - std::move(iev.GetBuf())); + kvs.emplace_back(iev.CreateInEdgeKey(pre_vid), std::move(iev.GetBuf())); all_kv_size += kvs.back().first.Size() + kvs.back().second.Size(); ins.clear(); } @@ -1250,8 +1236,7 @@ void Importer::RocksdbToLmdb() { while (true) { pre_vid = InvalidVid; for (iter->Seek({(const char*)&bigend_start_vid, sizeof(bigend_start_vid)}); - iter->Valid(); - iter->Next()) { + iter->Valid(); iter->Next()) { auto key = iter->key(); if (key.compare({(const char*)&bigend_end_vid, sizeof(bigend_end_vid)}) >= 0) { if (pre_vid != InvalidVid) { @@ -1275,8 +1260,7 @@ void Importer::RocksdbToLmdb() { if (pre_vid != InvalidVid) { make_kvs(); if (all_kv_size > config_.max_size_per_reading) { - throw_kvs_to_lmdb(std::move(kvs), - std::move(vertex_property), + throw_kvs_to_lmdb(std::move(kvs), std::move(vertex_property), std::move(edge_property)); all_kv_size = 0; } @@ -1285,8 +1269,8 @@ void Importer::RocksdbToLmdb() { } if (vlid_detach_.at(lid)) { // detach property - vertex_property.emplace_back( - lid, vid, Value::MakeCopy(val.data(), val.size())); + vertex_property.emplace_back(lid, vid, + Value::MakeCopy(val.data(), val.size())); vdata = import_v2::DenseString((const char*)(&lid), sizeof(LabelId)); } else { vdata = import_v2::DenseString(val.data(), val.size()); @@ -1320,17 +1304,15 @@ void Importer::RocksdbToLmdb() { uid.lid = labelId; uid.dst = vertexId; uid.tid = tid; - if (last_uid.src == uid.src && - last_uid.lid == uid.lid && - last_uid.dst == uid.dst && - last_uid.tid == uid.tid) { + if (last_uid.src == uid.src && last_uid.lid == uid.lid && + last_uid.dst == uid.dst && last_uid.tid == uid.tid) { uid.eid = last_uid.eid + 1; } else { uid.eid = 0; } last_uid = uid; - edge_property.emplace_back( - labelId, uid, Value::MakeCopy(val.data(), val.size())); + edge_property.emplace_back(labelId, uid, + Value::MakeCopy(val.data(), val.size())); outs.emplace_back(labelId, tid, vertexId, import_v2::DenseString()); } else { outs.emplace_back(labelId, tid, vertexId, @@ -1352,10 +1334,9 @@ void Importer::RocksdbToLmdb() { } if (!outs.empty()) { IT next_start; - lgraph::graph::EdgeValue oev(outs.begin(), outs.end(), - out_last_lid, out_last_tid, - out_last_dst, out_last_eid, - next_start, true); + lgraph::graph::EdgeValue oev(outs.begin(), outs.end(), out_last_lid, + out_last_tid, out_last_dst, + out_last_eid, next_start, true); FMA_DBG_ASSERT(next_start == outs.end()); kvs.emplace_back(oev.CreateOutEdgeKey(pre_vid), std::move(oev.GetBuf())); @@ -1375,8 +1356,7 @@ void Importer::RocksdbToLmdb() { } if (all_kv_size > config_.max_size_per_reading) { - throw_kvs_to_lmdb(std::move(kvs), - std::move(vertex_property), + throw_kvs_to_lmdb(std::move(kvs), std::move(vertex_property), std::move(edge_property)); all_kv_size = 0; } @@ -1392,8 +1372,7 @@ void Importer::RocksdbToLmdb() { } else { if (pre_vid != InvalidVid) { make_kvs(); - throw_kvs_to_lmdb(std::move(kvs), - std::move(vertex_property), + throw_kvs_to_lmdb(std::move(kvs), std::move(vertex_property), std::move(edge_property)); all_kv_size = 0; } @@ -1409,8 +1388,8 @@ void Importer::RocksdbToLmdb() { if (!config_.import_online) { LOG_INFO() << "Dump rocksdb into lmdb, time: " << t2 - t1 << "s"; } else { - online_full_import_oss << "Dump rocksdb into lmdb, time: " + - std::to_string(t2 - t1) + "s\n"; + online_full_import_oss << "Dump rocksdb into lmdb, time: " + std::to_string(t2 - t1) + + "s\n"; } } @@ -1460,9 +1439,7 @@ AccessControlledDB Importer::OpenGraph(Galaxy& galaxy, bool empty_db) { return galaxy.OpenGraph(config_.user, config_.graph); } -std::string Importer::OnlineFullImportLog() { - return online_full_import_oss.str(); -} +std::string Importer::OnlineFullImportLog() { return online_full_import_oss.str(); } } // namespace import_v3 } // namespace lgraph diff --git a/src/import/import_v3.h b/src/import/import_v3.h index 283859edd2..d8868b9435 100644 --- a/src/import/import_v3.h +++ b/src/import/import_v3.h @@ -27,8 +27,6 @@ namespace lgraph { namespace import_v3 { - - class Importer { public: struct Config { @@ -37,11 +35,11 @@ class Importer { if (cpu_core > 0) { generate_sst_threads = cpu_core; read_rocksdb_threads = cpu_core; - parse_file_threads = std::max((uint16_t)1, (uint16_t)(cpu_core/3)); - parse_block_threads = std::max((uint16_t)1, (uint16_t)(cpu_core/3)); + parse_file_threads = std::max((uint16_t)1, (uint16_t)(cpu_core / 3)); + parse_block_threads = std::max((uint16_t)1, (uint16_t)(cpu_core / 3)); } } - std::string config_file; // the config file specifying both the scheam & files + std::string config_file; // the config file specifying both the scheam & files std::string db_dir = "./lgraph_db"; // db data dir to use std::string user = "admin"; std::string password = "73@TuGraph"; @@ -56,7 +54,7 @@ class Importer { uint16_t generate_sst_threads = 15; uint16_t read_rocksdb_threads = 15; size_t vid_num_per_reading = 10000; - size_t max_size_per_reading = 32*1024*1024; + size_t max_size_per_reading = 32 * 1024 * 1024; bool keep_vid_in_memory = true; bool compact = false; std::string delimiter = ","; diff --git a/src/import/parse_delimiter.h b/src/import/parse_delimiter.h index 307dd4a7e0..6c2f5ed88b 100644 --- a/src/import/parse_delimiter.h +++ b/src/import/parse_delimiter.h @@ -35,95 +35,95 @@ inline std::string ParseDelimiter(const std::string& delimiter) { } switch (*p) { case ('\\'): - { - ret.push_back('\\'); - p++; - break; - } + { + ret.push_back('\\'); + p++; + break; + } case ('a'): - { - ret.push_back('\a'); - p++; - break; - } + { + ret.push_back('\a'); + p++; + break; + } case ('f'): - { - ret.push_back('\f'); - p++; - break; - } + { + ret.push_back('\f'); + p++; + break; + } case ('n'): - { - ret.push_back('\n'); - p++; - break; - } + { + ret.push_back('\n'); + p++; + break; + } case ('r'): - { - ret.push_back('\r'); - p++; - break; - } + { + ret.push_back('\r'); + p++; + break; + } case ('t'): - { - ret.push_back('\t'); - p++; - break; - } + { + ret.push_back('\t'); + p++; + break; + } case ('v'): - { - ret.push_back('\v'); - p++; - break; - } + { + ret.push_back('\v'); + p++; + break; + } case ('x'): - { - // \xnn hex numbers + { + // \xnn hex numbers + p++; + uint8_t c = 0; + for (int i = 0; i < 2; i++) { + if (p >= e) + THROW_CODE(InputError, + "Illegal escape sequence: " + std::string(start_p, p)); + if (*p >= '0' && *p <= '9') + c = c * 16 + *p - '0'; + else if (*p >= 'a' && *p <= 'f') + c = c * 16 + (*p - 'a' + 10); + else if (*p >= 'A' && *p <= 'F') + c = c * 16 + (*p - 'A' + 10); + else + THROW_CODE(InputError, + "Illegal escape sequence: " + std::string(start_p, p + 1)); p++; - uint8_t c = 0; - for (int i = 0; i < 2; i++) { + } + ret.push_back((char)c); + break; + } + default: + { + if (*p >= '0' && *p <= '9') { + // \nnn octal numbers + uint16_t c = 0; + for (int i = 0; i < 3; i++) { if (p >= e) THROW_CODE(InputError, "Illegal escape sequence: " + std::string(start_p, p)); - if (*p >= '0' && *p <= '9') - c = c * 16 + *p - '0'; - else if (*p >= 'a' && *p <= 'f') - c = c * 16 + (*p - 'a' + 10); - else if (*p >= 'A' && *p <= 'F') - c = c * 16 + (*p - 'A' + 10); - else - THROW_CODE(InputError, "Illegal escape sequence: " + - std::string(start_p, p + 1)); + if (*p < '0' || *p > '7') + THROW_CODE(InputError, + "Illegal escape sequence: " + std::string(start_p, p + 1)); + c = c * 8 + *p - '0'; p++; } + if (c >= 256) + THROW_CODE(InputError, + "Illegal escape sequence: " + std::string(start_p, p)); ret.push_back((char)c); break; + } else { + THROW_CODE(InputError, + "Illegal escape sequence: " + std::string(start_p, p + 1)); } - default: - { - if (*p >= '0' && *p <= '9') { - // \nnn octal numbers - uint16_t c = 0; - for (int i = 0; i < 3; i++) { - if (p >= e) - THROW_CODE(InputError, "Illegal escape sequence: " + - std::string(start_p, p)); - if (*p < '0' || *p > '7') - THROW_CODE(InputError, "Illegal escape sequence: " + - std::string(start_p, p + 1)); - c = c * 8 + *p - '0'; - p++; - } - if (c >= 256) - THROW_CODE(InputError, - "Illegal escape sequence: " + std::string(start_p, p)); - ret.push_back((char)c); - break; - } else { - THROW_CODE(InputError, - "Illegal escape sequence: " + std::string(start_p, p + 1)); - } - } + } } } else { ret.push_back(*p); diff --git a/src/import/vid_table.h b/src/import/vid_table.h index f5e5151543..830d9f8388 100644 --- a/src/import/vid_table.h +++ b/src/import/vid_table.h @@ -156,7 +156,7 @@ class AllVidTables { void StartVidTable(const std::string& label, FieldType type, VidType start_id) { if (label_map_.find(label) != label_map_.end()) { LOG_ERROR() << "Label [" << label << "] added more than once, current labels: " - << fma_common::ToString(GetLabels()); + << fma_common::ToString(GetLabels()); } std::unique_ptr table; switch (type) { @@ -212,7 +212,7 @@ class AllVidTables { auto it = label_map_.find(label); if (it == label_map_.end()) { LOG_ERROR() << "Label [" << label - << "] not found, current labels: " << fma_common::ToString(GetLabels()); + << "] not found, current labels: " << fma_common::ToString(GetLabels()); } it->second.end_id = end_id; } @@ -241,7 +241,7 @@ class AllVidTables { auto it = label_map_.find(label); if (it == label_map_.end()) { LOG_ERROR() << "Label [" << label - << "] not found, current labels: " << fma_common::ToString(GetLabels()); + << "] not found, current labels: " << fma_common::ToString(GetLabels()); } label_map_.erase(it); } @@ -250,7 +250,7 @@ class AllVidTables { auto it = label_map_.find(label); if (it == label_map_.end()) { LOG_ERROR() << "Label [" << label - << "] not found, current labels: " << fma_common::ToString(GetLabels()); + << "] not found, current labels: " << fma_common::ToString(GetLabels()); } return it->second.start_id; } @@ -259,7 +259,7 @@ class AllVidTables { auto it = label_map_.find(label); if (it == label_map_.end()) { LOG_ERROR() << "Label [" << label - << "] not found, current labels: " << fma_common::ToString(GetLabels()); + << "] not found, current labels: " << fma_common::ToString(GetLabels()); } return it->second.end_id; }