From 47635c67c81b534d75711db0f5e70a2ef5d29c24 Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Wed, 3 Mar 2021 16:45:30 -0500 Subject: [PATCH 1/2] SOLR-15045: Execute local leader commit in parallel with distributed commits in DistributedZkUpdateProcessor --- .../processor/DistributedZkUpdateProcessor.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java index ece4f53381c1..0fc38962fe27 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java @@ -194,6 +194,7 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { // zk ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); + boolean issuedDistribCommit = false; List useNodes = null; if (req.getParams().get(COMMIT_END_POINT) == null) { useNodes = nodes; @@ -203,11 +204,16 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); cmdDistrib.distribCommit(cmd, useNodes, params); - cmdDistrib.blockAndDoRetries(); + issuedDistribCommit = true; } } if (isLeader) { + if (issuedDistribCommit) { + // defensive copy of params, which was passed into distribCommit(...) above; will unconditionally replace + // DISTRIB_UPDATE_PARAM, COMMIT_END_POINT, and DISTRIB_FROM if the new `params` val will actually be used + params = new ModifiableSolrParams(params); + } params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); params.set(COMMIT_END_POINT, "replicas"); @@ -218,14 +224,15 @@ public void processCommit(CommitUpdateCommand cmd) throws IOException { params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl( zkController.getBaseUrl(), req.getCore().getName())); + // NOTE: distribCommit(...) internally calls `blockAndDoRetries()`, flushing any TOLEADER distrib commits cmdDistrib.distribCommit(cmd, useNodes, params); + issuedDistribCommit = true; } doLocalCommit(cmd); - - if (useNodes != null) { - cmdDistrib.blockAndDoRetries(); - } + } + if (issuedDistribCommit) { + cmdDistrib.blockAndDoRetries(); } } } From 2ee16aa5c70753d366d95c1171e8d9e009761b13 Mon Sep 17 00:00:00 2001 From: Michael Gibney Date: Thu, 4 Mar 2021 03:52:31 -0500 Subject: [PATCH 2/2] add test NOTE: this test jumps through hoops to support "failure" expressed as success, for compatibility with the way distrib shard commit errors are (not) currently propagated back to the client. This behavior is likely a bug in its own right that should be addressed separately, probably before this fix is committed --- .../conf/solrconfig-parallel-commit.xml | 52 ++++++ .../cloud/ParallelCommitExecutionTest.java | 172 ++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml create mode 100644 solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml new file mode 100644 index 000000000000..3e619948e189 --- /dev/null +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml @@ -0,0 +1,52 @@ + + + + + + + ${tests.luceneMatchVersion:LATEST} + + + + + + + + ${solr.ulog.dir:} + + + + ${solr.commitwithin.softcommit:true} + + + + + + + ensure-parallel-commit + + + + + + + + + diff --git a/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java new file mode 100644 index 000000000000..205baf4a16bf --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.update.CommitUpdateCommand; +import org.apache.solr.update.processor.UpdateRequestProcessor; +import org.apache.solr.update.processor.UpdateRequestProcessorFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.lucene.util.TestUtil; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ParallelCommitExecutionTest extends SolrCloudTestCase { + + private static final String DEBUG_LABEL = MethodHandles.lookup().lookupClass().getName(); + private static final String COLLECTION_NAME = DEBUG_LABEL + "_collection"; + + /** A basic client for operations at the cloud level, default collection will be set */ + private static CloudSolrClient CLOUD_CLIENT; + private static int expectCount; + private static volatile int tooHigh; + + private static volatile CountDownLatch countdown; + private static final AtomicInteger countup = new AtomicInteger(); + + @BeforeClass + public static void beforeClass() throws Exception { + // multi replicas matters; for the initial parallel commit execution tests, only consider repFactor=1 + final int repFactor = 1;//random().nextBoolean() ? 1 : 2; + final int numShards = TestUtil.nextInt(random(), 1, 4); + final int numNodes = (numShards * repFactor); + expectCount = numNodes; + + final String configName = DEBUG_LABEL + "_config-set"; + final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf"); + + configureCluster(numNodes).addConfig(configName, configDir).configure(); + + Map collectionProperties = new LinkedHashMap<>(); + collectionProperties.put("config", "solrconfig-parallel-commit.xml"); + collectionProperties.put("schema", "schema_latest.xml"); + CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, numShards, repFactor) + .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE) + .setProperties(collectionProperties) + .process(cluster.getSolrClient()); + + CLOUD_CLIENT = cluster.getSolrClient(); + CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME); + waitForRecoveriesToFinish(CLOUD_CLIENT); + } + + @AfterClass + private static void afterClass() throws Exception { + if (null != CLOUD_CLIENT) { + CLOUD_CLIENT.close(); + CLOUD_CLIENT = null; + } + } + + private static void initSyncVars(boolean tooHigh) { + final int ct; + if (tooHigh) { + ParallelCommitExecutionTest.tooHigh = TOO_HIGH_INCREMENT; + ct = expectCount + TOO_HIGH_INCREMENT; + } else { + ParallelCommitExecutionTest.tooHigh = 0; + ct = expectCount; + } + countdown = new CountDownLatch(ct); + countup.set(0); + } + + @Test + public void testParallelOk() throws Exception { + initSyncVars(false); + UpdateResponse rsp = CLOUD_CLIENT.commit(true, true); + // nocommit: why are we getting status:0 in response that clearly failed? It looks like no matter + // how many distrib commits fail, as long as the local commit succeeds, it reports success. (if the + // local commit fails, it apparently does report failure). + // that's why we're doing this "countup" thing -- because there's apparently no other way to know + // whether the commit succeeded or not? + // we expect `countup` to be incremented by exactly `expectCount` number of parallel commits + assertEquals(expectCount, countup.get()); + } + + private static final int TOO_HIGH_INCREMENT = 1; + @Test + public void testParallelFail() throws Exception { + // artificially set countdown too high; we're really testing the test code, since there's no way + // to mock the "incorrect" behavior (i.e., "serial" requests) + initSyncVars(true); + UpdateResponse rsp = CLOUD_CLIENT.commit(true, true); + + // If we could mock a "real" scenario, we'd expect the distrib requests to timeout waiting + // on the countdown latch (and thus not increment `countup`), but the "local" shard would + // then succeed, resulting in `countup` being incremented exactly once. Here we've set `countdown` + // artificially high, but to approximately mock this, we get the "local" shard to recognize + // the "tooHigh" scenario, and succeed anyway, incrementing `countup`. + assertEquals(1, countup.get()); + // sanity check: because we actually expect things to be working properly, we expect `countdown` + // to be exactly "TOO_HIGH_INCREMENT" too high. + assertEquals(TOO_HIGH_INCREMENT, countdown.getCount()); + } + + public static void waitForRecoveriesToFinish(CloudSolrClient client) throws Exception { + assert null != client.getDefaultCollection(); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(client.getDefaultCollection(), + client.getZkStateReader(), + true, true, 330); + } + + public static class CheckFactory extends UpdateRequestProcessorFactory { + @Override + public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { + return new Check(next); + } + } + + public static class Check extends UpdateRequestProcessor { + + public Check(UpdateRequestProcessor next) { + super(next); + } + + @Override + public void processCommit(CommitUpdateCommand cmd) throws IOException { + super.processCommit(cmd); + countdown.countDown(); + // In the "tooHigh" scenario, for the one-and-only local (non-distrib) shard, force success (after waiting) to + // mock the "serial blocking" behavior. + final boolean mustSucceed = tooHigh == TOO_HIGH_INCREMENT && cmd.getReq().getParams().get("update.distrib") == null; + try { + if (!countdown.await(5, TimeUnit.SECONDS) && !mustSucceed) { + throw new RuntimeException("done waiting"); + } + countup.incrementAndGet(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } +}