diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a4b7a55..87337ef8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,6 +3,7 @@ on: push: branches: - master + - cse concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} cancel-in-progress: true @@ -52,8 +53,13 @@ jobs: integration-test: name: integration test + strategy: + fail-fast: false + matrix: + case: ["integration-test-txn", "integration-test-raw"] env: CARGO_INCREMENTAL: 0 + TIKV_VERSION: v7.5.5 runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -69,8 +75,8 @@ jobs: - name: start tiup playground run: | # use latest stable version - ~/.tiup/bin/tiup install tikv pd - ~/.tiup/bin/tiup playground --mode tikv-slim --kv 3 --without-monitor --kv.config config/tikv.toml --pd.config config/pd.toml & + ~/.tiup/bin/tiup install tikv:${{ env.TIKV_VERSION }} pd:${{ env.TIKV_VERSION }} + ~/.tiup/bin/tiup playground ${{ env.TIKV_VERSION }} --mode tikv-slim --kv 3 --tag cluster --without-monitor --kv.config config/tikv.toml --pd.config config/pd.toml & while :; do echo "waiting cluster to be ready" [[ "$(curl -I http://127.0.0.1:2379/pd/api/v1/regions 2>/dev/null | head -n 1 | cut -d$' ' -f2)" -ne "405" ]] || break @@ -78,5 +84,13 @@ jobs: done - name: Install latest nextest release uses: taiki-e/install-action@nextest - - name: integration test - run: MULTI_REGION=1 make integration-test + - name: Integration test + run: MULTI_REGION=1 make ${{ matrix.case }} + - name: Upload logs + if: failure() + uses: actions/upload-artifact@v4 + with: + name: cluster-logs + path: | + ~/.tiup/data/cluster/tikv*/*.log + ~/.tiup/data/cluster/pd*/*.log diff --git a/Cargo.toml b/Cargo.toml index 0aab2b03..cac91e73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ clap = "2" env_logger = "0.10" fail = { version = "0.4", features = ["failpoints"] } proptest = "1" -proptest-derive = "0.3" +proptest-derive = "0.5.1" reqwest = { version = "0.11", default-features = false, features = [ "native-tls-vendored", ] } diff --git a/Makefile b/Makefile index aef0ad45..06b69618 100644 --- a/Makefile +++ b/Makefile @@ -1,13 +1,17 @@ export RUSTFLAGS=-Dwarnings -.PHONY: default check unit-test integration-tests test doc docker-pd docker-kv docker all +.PHONY: default check unit-test generate integration-tests integration-tests-txn integration-tests-raw test doc docker-pd docker-kv docker all export PD_ADDRS ?= 127.0.0.1:2379 export MULTI_REGION ?= 1 ALL_FEATURES := integration-tests -INTEGRATION_TEST_ARGS := --features "integration-tests" +NEXTEST_ARGS := --config-file $(shell pwd)/config/nextest.toml -P ci + +INTEGRATION_TEST_ARGS := --features "integration-tests" --test-threads 1 + +RUN_INTEGRATION_TEST := cargo nextest run ${NEXTEST_ARGS} --all ${INTEGRATION_TEST_ARGS} default: check @@ -20,12 +24,15 @@ check: generate cargo clippy --all-targets --features "${ALL_FEATURES}" -- -D clippy::all unit-test: generate - cargo nextest run --all --no-default-features + cargo nextest run ${NEXTEST_ARGS} --all --no-default-features + +integration-test: integration-test-txn integration-test-raw + +integration-test-txn: generate + $(RUN_INTEGRATION_TEST) txn_ -integration-test: generate - cargo test txn_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture - cargo test raw_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture - cargo test misc_ --all ${INTEGRATION_TEST_ARGS} -- --nocapture +integration-test-raw: generate + $(RUN_INTEGRATION_TEST) raw_ test: unit-test integration-test diff --git a/config/nextest.toml b/config/nextest.toml new file mode 100644 index 00000000..4357217b --- /dev/null +++ b/config/nextest.toml @@ -0,0 +1,8 @@ +[profile.ci] +retries = 0 +fail-fast = false +slow-timeout = { period = "60s", terminate-after = 3 } # Timeout 3m. +failure-output = "final" + +[profile.ci.junit] +path = "junit.xml" diff --git a/config/tikv.toml b/config/tikv.toml index 52965253..6eef5b26 100644 --- a/config/tikv.toml +++ b/config/tikv.toml @@ -1,6 +1,5 @@ [coprocessor] -region-max-keys = 10 -region-split-keys = 7 +region-split-keys = 150 batch-split-limit = 100 [raftstore] @@ -8,10 +7,13 @@ region-split-check-diff = "1B" pd-heartbeat-tick-interval = "2s" pd-store-heartbeat-tick-interval = "5s" split-region-check-tick-interval = "1s" -raft-entry-max-size = "1MB" +raft-entry-max-size = "256KiB" [rocksdb] max-open-files = 10000 [raftdb] max-open-files = 10000 + +[storage] +reserve-space = "0MiB" diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 73cb934d..af41d93d 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "stable" -components = ["rustfmt", "clippy"] +channel = "1.84.1" +components = ["rustfmt", "clippy", "rust-analyzer"] diff --git a/src/common/errors.rs b/src/common/errors.rs index 246aff00..1798be70 100644 --- a/src/common/errors.rs +++ b/src/common/errors.rs @@ -5,6 +5,7 @@ use std::result; use thiserror::Error; use crate::proto::kvrpcpb; +use crate::region::RegionVerId; use crate::BoundRange; /// An error originating from the TiKV client or dependencies. @@ -89,8 +90,8 @@ pub enum Error { #[error("Region {} is not found in the response", region_id)] RegionNotFoundInResponse { region_id: u64 }, /// No leader is found for the given id. - #[error("Leader of region {} is not found", region_id)] - LeaderNotFound { region_id: u64 }, + #[error("Leader of region {} is not found", region.id)] + LeaderNotFound { region: RegionVerId }, /// Scan limit exceeds the maximum #[error("Limit {} exceeds max scan limit {}", limit, max_limit)] MaxScanLimitExceeded { limit: u32, max_limit: u32 }, diff --git a/src/generated/backup.rs b/src/generated/backup.rs index fad59c43..2f727a5e 100644 --- a/src/generated/backup.rs +++ b/src/generated/backup.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. /// The message save the metadata of a backup. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/src/generated/cdcpb.rs b/src/generated/cdcpb.rs index 0cf47c51..f97d860d 100644 --- a/src/generated/cdcpb.rs +++ b/src/generated/cdcpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Header { diff --git a/src/generated/configpb.rs b/src/generated/configpb.rs index 48886295..58f3c345 100644 --- a/src/generated/configpb.rs +++ b/src/generated/configpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Status { diff --git a/src/generated/coprocessor.rs b/src/generated/coprocessor.rs index 44fffbc7..9dba8ab7 100644 --- a/src/generated/coprocessor.rs +++ b/src/generated/coprocessor.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. /// \[start, end) #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/src/generated/deadlock.rs b/src/generated/deadlock.rs index ed760b98..ef9c61e3 100644 --- a/src/generated/deadlock.rs +++ b/src/generated/deadlock.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct WaitForEntriesRequest {} diff --git a/src/generated/debugpb.rs b/src/generated/debugpb.rs index c42c63eb..5d22b7a8 100644 --- a/src/generated/debugpb.rs +++ b/src/generated/debugpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetRequest { diff --git a/src/generated/diagnosticspb.rs b/src/generated/diagnosticspb.rs index 9127e734..edb1dd45 100644 --- a/src/generated/diagnosticspb.rs +++ b/src/generated/diagnosticspb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SearchLogRequest { diff --git a/src/generated/disk_usage.rs b/src/generated/disk_usage.rs index 6f4292af..4b2c1fc5 100644 --- a/src/generated/disk_usage.rs +++ b/src/generated/disk_usage.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum DiskUsage { diff --git a/src/generated/encryptionpb.rs b/src/generated/encryptionpb.rs index 26c2be81..835da90e 100644 --- a/src/generated/encryptionpb.rs +++ b/src/generated/encryptionpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. /// General encryption metadata for any data type. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/src/generated/enginepb.rs b/src/generated/enginepb.rs index 486cefec..b0841074 100644 --- a/src/generated/enginepb.rs +++ b/src/generated/enginepb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CommandRequestHeader { diff --git a/src/generated/eraftpb.rs b/src/generated/eraftpb.rs index c2920dca..9a032ae1 100644 --- a/src/generated/eraftpb.rs +++ b/src/generated/eraftpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. /// The entry is a type of change that needs to be applied. It contains two data fields. /// While the fields are built into the model; their usage is determined by the entry_type. /// diff --git a/src/generated/errorpb.rs b/src/generated/errorpb.rs index 77a54057..75a2ede1 100644 --- a/src/generated/errorpb.rs +++ b/src/generated/errorpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. /// NotLeader is the error variant that tells a request be handle by raft leader /// is sent to raft follower or learner. #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/src/generated/google.api.rs b/src/generated/google.api.rs index 76cd3321..bb9f7204 100644 --- a/src/generated/google.api.rs +++ b/src/generated/google.api.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. /// Defines the HTTP configuration for an API service. It contains a list of /// \[HttpRule\]\[google.api.HttpRule\], each specifying the mapping of an RPC method /// to one or more HTTP REST API methods. diff --git a/src/generated/import_kvpb.rs b/src/generated/import_kvpb.rs index 7c681ad2..104b4346 100644 --- a/src/generated/import_kvpb.rs +++ b/src/generated/import_kvpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SwitchModeRequest { diff --git a/src/generated/import_sstpb.rs b/src/generated/import_sstpb.rs index fcd9f5df..0258379d 100644 --- a/src/generated/import_sstpb.rs +++ b/src/generated/import_sstpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SwitchModeRequest { diff --git a/src/generated/kvrpcpb.rs b/src/generated/kvrpcpb.rs index e6ffcf94..a398b5ee 100644 --- a/src/generated/kvrpcpb.rs +++ b/src/generated/kvrpcpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. /// A transactional get command. Lookup a value for `key` in the transaction with /// starting timestamp = `version`. #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/src/generated/metapb.rs b/src/generated/metapb.rs index 3cec8093..bff652af 100644 --- a/src/generated/metapb.rs +++ b/src/generated/metapb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Cluster { diff --git a/src/generated/mod.rs b/src/generated/mod.rs index 09a476df..e6fec6d0 100644 --- a/src/generated/mod.rs +++ b/src/generated/mod.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. pub mod backup { include!("backup.rs"); } diff --git a/src/generated/mpp.rs b/src/generated/mpp.rs index 8834980c..60120a77 100644 --- a/src/generated/mpp.rs +++ b/src/generated/mpp.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. /// TaskMeta contains meta of a mpp plan, including query's ts and task address. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/src/generated/pdpb.rs b/src/generated/pdpb.rs index 07557892..faef8ec9 100644 --- a/src/generated/pdpb.rs +++ b/src/generated/pdpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RequestHeader { diff --git a/src/generated/raft_cmdpb.rs b/src/generated/raft_cmdpb.rs index da108826..870c281e 100644 --- a/src/generated/raft_cmdpb.rs +++ b/src/generated/raft_cmdpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct GetRequest { diff --git a/src/generated/raft_serverpb.rs b/src/generated/raft_serverpb.rs index e273d8c9..d6aa3726 100644 --- a/src/generated/raft_serverpb.rs +++ b/src/generated/raft_serverpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct RaftMessage { diff --git a/src/generated/replication_modepb.rs b/src/generated/replication_modepb.rs index 1f57b9e6..e0f6cb06 100644 --- a/src/generated/replication_modepb.rs +++ b/src/generated/replication_modepb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. /// The replication status sync from PD to TiKV. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/src/generated/resource_usage_agent.rs b/src/generated/resource_usage_agent.rs index 325e8f25..a875aab8 100644 --- a/src/generated/resource_usage_agent.rs +++ b/src/generated/resource_usage_agent.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct CpuTimeRecord { diff --git a/src/generated/span.rs b/src/generated/span.rs index 5fedadf5..935e7335 100644 --- a/src/generated/span.rs +++ b/src/generated/span.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct SpanSet { diff --git a/src/generated/tikvpb.rs b/src/generated/tikvpb.rs index a0391c10..ab622627 100644 --- a/src/generated/tikvpb.rs +++ b/src/generated/tikvpb.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct BatchCommandsRequest { diff --git a/src/kv/key.rs b/src/kv/key.rs index 7ee16597..daa2a24c 100644 --- a/src/kv/key.rs +++ b/src/kv/key.rs @@ -2,7 +2,6 @@ use std::fmt; use std::ops::Bound; -use std::u8; #[allow(unused_imports)] #[cfg(test)] diff --git a/src/kv/mod.rs b/src/kv/mod.rs index 489110e6..a10b6fed 100644 --- a/src/kv/mod.rs +++ b/src/kv/mod.rs @@ -1,6 +1,5 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::fmt; -use std::u8; mod bound_range; pub mod codec; @@ -16,7 +15,7 @@ pub use value::Value; struct HexRepr<'a>(pub &'a [u8]); -impl<'a> fmt::Display for HexRepr<'a> { +impl fmt::Display for HexRepr<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { for byte in self.0 { write!(f, "{byte:02X}")?; diff --git a/src/kv/value.rs b/src/kv/value.rs index 337ec24a..700d2ee7 100644 --- a/src/kv/value.rs +++ b/src/kv/value.rs @@ -11,5 +11,4 @@ const _PROPTEST_VALUE_MAX: usize = 1024 * 16; // 16 KB /// Since `Value` is just an alias for `Vec`, conversions to and from it are easy. /// /// Many functions which accept a `Value` accept an `Into`. - pub type Value = Vec; diff --git a/src/mock.rs b/src/mock.rs index f9c94aef..757523a0 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -228,6 +228,8 @@ impl PdClient for MockPdClient { async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {} + async fn invalidate_store_cache(&self, _store_id: crate::region::StoreId) {} + fn get_codec(&self) -> &Self::Codec { &self.codec } diff --git a/src/pd/client.rs b/src/pd/client.rs index 5461cb57..a2469b60 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -19,6 +19,7 @@ use crate::proto::metapb; use crate::region::RegionId; use crate::region::RegionVerId; use crate::region::RegionWithLeader; +use crate::region::StoreId; use crate::region_cache::RegionCache; use crate::request::codec::{ApiV1TxnCodec, Codec}; use crate::store::KvConnect; @@ -208,6 +209,8 @@ pub trait PdClient: Send + Sync + 'static { async fn invalidate_region_cache(&self, ver_id: RegionVerId); + async fn invalidate_store_cache(&self, store_id: StoreId); + /// Get the codec carried by `PdClient`. /// The purpose of carrying the codec is to avoid passing it on so many calling paths. fn get_codec(&self) -> &Self::Codec; @@ -283,6 +286,10 @@ impl PdClient for PdRpcClien self.region_cache.invalidate_region_cache(ver_id).await } + async fn invalidate_store_cache(&self, store_id: StoreId) { + self.region_cache.invalidate_store_cache(store_id).await + } + fn get_codec(&self) -> &Self::Codec { self.codec .as_ref() diff --git a/src/proto.rs b/src/proto.rs index 30f699f9..79becc56 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -2,6 +2,7 @@ #![allow(clippy::large_enum_variant)] #![allow(clippy::enum_variant_names)] +#![allow(clippy::doc_lazy_continuation)] pub use protos::*; diff --git a/src/raw/client.rs b/src/raw/client.rs index fc733015..8ceedfac 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -3,7 +3,6 @@ use core::ops::Range; use std::str::FromStr; use std::sync::Arc; -use std::u32; use futures::StreamExt; use log::debug; diff --git a/src/region.rs b/src/region.rs index 8e58522c..2a5f45de 100644 --- a/src/region.rs +++ b/src/region.rs @@ -47,7 +47,7 @@ impl RegionWithLeader { self.leader .as_ref() .ok_or(Error::LeaderNotFound { - region_id: self.region.id, + region: self.ver_id(), }) .map(|l| { let mut ctx = kvrpcpb::Context::default(); @@ -89,7 +89,7 @@ impl RegionWithLeader { .as_ref() .cloned() .ok_or_else(|| Error::LeaderNotFound { - region_id: self.id(), + region: self.ver_id(), }) .map(|s| s.store_id) } diff --git a/src/region_cache.rs b/src/region_cache.rs index a557a96f..cdf30c99 100644 --- a/src/region_cache.rs +++ b/src/region_cache.rs @@ -233,6 +233,11 @@ impl RegionCache { } } + pub async fn invalidate_store_cache(&self, store_id: StoreId) { + let mut cache = self.store_cache.write().await; + cache.remove(&store_id); + } + pub async fn read_through_all_stores(&self) -> Result> { let stores = self .inner_client diff --git a/src/request/plan.rs b/src/request/plan.rs index a3da3ec1..d3fb6ffe 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -17,6 +17,8 @@ use crate::pd::PdClient; use crate::proto::errorpb; use crate::proto::errorpb::EpochNotMatch; use crate::proto::kvrpcpb; +use crate::region::RegionVerId; +use crate::region::StoreId; use crate::request::shard::HasNextBatch; use crate::request::NextBatch; use crate::request::Shardable; @@ -115,13 +117,10 @@ where let shards = current_plan.shards(&pd_client).collect::>().await; let mut handles = Vec::new(); for shard in shards { - let (shard, region_store) = shard?; - let mut clone = current_plan.clone(); - clone.apply_shard(shard, ®ion_store)?; let handle = tokio::spawn(Self::single_shard_handler( pd_client.clone(), - clone, - region_store, + current_plan.clone(), + shard, backoff.clone(), permits.clone(), preserve_region_results, @@ -152,12 +151,32 @@ where #[async_recursion] async fn single_shard_handler( pd_client: Arc, - plan: P, - region_store: RegionStore, + mut plan: P, + shard: Result<(

::Shard, RegionStore)>, mut backoff: Backoff, permits: Arc, preserve_region_results: bool, ) -> Result<::Result> { + let region_store = match shard.and_then(|(shard, region_store)| { + plan.apply_shard(shard, ®ion_store).map(|_| region_store) + }) { + Ok(region_store) => region_store, + Err(Error::LeaderNotFound { region }) => { + return Self::handle_other_error( + pd_client, + plan, + region.clone(), + None, + backoff, + permits, + preserve_region_results, + Error::LeaderNotFound { region }, + ) + .await + } + Err(err) => return Err(err), + }; + // limit concurrent requests let permit = permits.acquire().await.unwrap(); let res = plan.execute().await; @@ -166,10 +185,11 @@ where let mut resp = match res { Ok(resp) => resp, Err(e) if is_grpc_error(&e) => { - return Self::handle_grpc_error( + return Self::handle_other_error( pd_client, plan, - region_store, + region_store.region_with_leader.ver_id(), + region_store.region_with_leader.get_store_id().ok(), backoff, permits, preserve_region_results, @@ -217,6 +237,7 @@ where region_store: RegionStore, ) -> Result { let ver_id = region_store.region_with_leader.ver_id(); + let store_id = region_store.region_with_leader.get_store_id(); if let Some(not_leader) = e.not_leader { if let Some(leader) = not_leader.leader { match pd_client @@ -239,6 +260,9 @@ where } } else if e.store_not_match.is_some() { pd_client.invalidate_region_cache(ver_id).await; + if let Ok(store_id) = store_id { + pd_client.invalidate_store_cache(store_id).await; + } Ok(false) } else if e.epoch_not_match.is_some() { Self::on_region_epoch_not_match( @@ -259,6 +283,9 @@ where // TODO: pass the logger around // info!("unknwon region error: {:?}", e); pd_client.invalidate_region_cache(ver_id).await; + if let Ok(store_id) = store_id { + pd_client.invalidate_store_cache(store_id).await; + } Ok(false) } } @@ -303,18 +330,24 @@ where Ok(false) } - async fn handle_grpc_error( + #[allow(clippy::too_many_arguments)] + async fn handle_other_error( pd_client: Arc, plan: P, - region_store: RegionStore, + region: RegionVerId, + store: Option, mut backoff: Backoff, permits: Arc, preserve_region_results: bool, e: Error, ) -> Result<::Result> { debug!("handle grpc error: {:?}", e); - let ver_id = region_store.region_with_leader.ver_id(); - pd_client.invalidate_region_cache(ver_id).await; + pd_client.invalidate_region_cache(region).await; + if is_grpc_error(&e) { + if let Some(store_id) = store { + pd_client.invalidate_store_cache(store_id).await; + } + } match backoff.next_delay_duration() { Some(duration) => { sleep(duration).await; diff --git a/src/store/mod.rs b/src/store/mod.rs index a244a1bc..f21373b4 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -8,7 +8,6 @@ use std::cmp::max; use std::cmp::min; use std::sync::Arc; -use async_trait::async_trait; use derive_new::new; use futures::prelude::*; use futures::stream::BoxStream; @@ -38,21 +37,6 @@ pub struct Store { pub client: Arc, } -#[async_trait] -pub trait KvConnectStore: KvConnect { - async fn connect_to_store( - &self, - region: RegionWithLeader, - address: String, - ) -> Result { - log::info!("connect to tikv endpoint: {:?}", &address); - let client = self.connect(address.as_str()).await?; - Ok(RegionStore::new(region, Arc::new(client))) - } -} - -impl KvConnectStore for TikvConnect {} - /// Maps keys to a stream of stores. `key_data` must be sorted in increasing order pub fn store_stream_for_keys( key_data: impl Iterator + Send + Sync + 'static, diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 4bcb16d9..befde865 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -40,7 +40,7 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024; /// - `gc`: trigger a GC process which clears stale data in the cluster. /// - `current_timestamp`: get the current `Timestamp` from PD. /// - `snapshot`: get a [`Snapshot`] of the database at a specified timestamp. -/// A `Snapshot` is a read-only transaction. +/// A `Snapshot` is a read-only transaction. /// /// The returned results of transactional requests are [`Future`](std::future::Future)s that must be /// awaited to execute. diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index e3800459..7e67c6ae 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -253,7 +253,7 @@ pub fn new_prewrite_request( req.start_version = start_version; req.lock_ttl = lock_ttl; // FIXME: Lite resolve lock is currently disabled - req.txn_size = std::u64::MAX; + req.txn_size = u64::MAX; req } @@ -906,7 +906,6 @@ impl Merge for Collect { } #[cfg(test)] -#[cfg_attr(feature = "protobuf-codec", allow(clippy::useless_conversion))] mod tests { use crate::common::Error::PessimisticLockError; use crate::common::Error::ResolveLockError; diff --git a/tests/common/ctl.rs b/tests/common/ctl.rs index 92dcacca..092c32bb 100644 --- a/tests/common/ctl.rs +++ b/tests/common/ctl.rs @@ -16,7 +16,9 @@ pub async fn get_region_count() -> Result { .text() .await .map_err(|e| Error::StringError(e.to_string()))?; - let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap(); + let value: serde_json::Value = serde_json::from_str(body.as_ref()).unwrap_or_else(|err| { + panic!("invalid body: {:?}, error: {:?}", body, err); + }); value["count"] .as_u64() .ok_or_else(|| Error::StringError("pd region count does not return an integer".to_owned())) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4d63dd56..8c5ad085 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -56,8 +56,8 @@ pub async fn init() -> Result<()> { .take(count as usize - 1) .map(|x| x.to_be_bytes().to_vec()); - // about 43 regions with above keys. - ensure_region_split(keys_1.chain(keys_2), 40).await?; + // 10 regions is enough for most tests + ensure_region_split(keys_1.chain(keys_2), 10).await?; } clear_tikv().await; diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 82442c4b..96092fa8 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -5,7 +5,6 @@ //! Test names should begin with one of the following: //! 1. txn_ //! 2. raw_ -//! 3. misc_ //! //! We make use of the convention to control the order of tests in CI, to allow //! transactional and raw tests to coexist, since transactional requests have @@ -192,12 +191,16 @@ async fn txn_split_batch() -> Result<()> { let mut txn = client.begin_optimistic().await?; let mut rng = thread_rng(); - // testing with raft-entry-max-size = "1MB" + // testing with raft-entry-max-size = "256KiB" let keys_count: usize = 1000; - let val_len = 15000; + let val_len = 1024; let values: Vec<_> = (0..keys_count) - .map(|_| (0..val_len).map(|_| rng.gen::()).collect::>()) + .map(|_| { + let mut buf = vec![0; val_len]; + rng.fill(&mut buf[..]); + buf + }) .collect(); for (i, value) in values.iter().enumerate() {