java backoff_Java BackOff类代码示例

2022-09-07 13:40:17 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

import org.apache.beam.sdk.util.BackOff; //导入依赖的package包/类

/**

* Writes a batch of mutations to Cloud Datastore.

*

*

If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. All

* mutations in the batch will be committed again, even if the commit was partially

* successful. If the retry limit is exceeded, the last exception from Cloud Datastore will be

* thrown.

*

* @throws DatastoreException if the commit fails or IOException or InterruptedException if

* backing off between retries fails.

*/

private void flushBatch() throws DatastoreException, IOException, InterruptedException {

LOG.debug(“Writing batch of {} mutations”, mutations.size());

Sleeper sleeper = Sleeper.DEFAULT;

BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();

while (true) {

// Batch upsert entities.

CommitRequest.Builder commitRequest = CommitRequest.newBuilder();

commitRequest.addAllMutations(mutations);

commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);

long startTime = System.currentTimeMillis(), endTime;

if (throttler.throttleRequest(startTime)) {

LOG.info(“Delaying request due to previous failures”);

throttledSeconds.inc(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS / 1000);

sleeper.sleep(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS);

continue;

}

try {

datastore.commit(commitRequest.build());

endTime = System.currentTimeMillis();

writeBatcher.addRequestLatency(endTime, endTime – startTime, mutations.size());

throttler.successfulRequest(startTime);

rpcSuccesses.inc();

// Break if the commit threw no exception.

break;

} catch (DatastoreException exception) {

if (exception.getCode() == Code.DEADLINE_EXCEEDED) {

/* Most errors are not related to request size, and should not change our expectation of

* the latency of successful requests. DEADLINE_EXCEEDED can be taken into

* consideration, though. */

endTime = System.currentTimeMillis();

writeBatcher.addRequestLatency(endTime, endTime – startTime, mutations.size());

}

// Only log the code and message for potentially-transient errors. The entire exception

// will be propagated upon the last retry.

LOG.error(“Error writing batch of {} mutations to Datastore ({}): {}”, mutations.size(),

exception.getCode(), exception.getMessage());

rpcErrors.inc();

if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {

throw exception;

}

if (!BackOffUtils.next(sleeper, backoff)) {

LOG.error(“Aborting after {} retries.”, MAX_RETRIES);

throw exception;

}

}

}

LOG.debug(“Successfully wrote {} mutations”, mutations.size());

mutations.clear();

mutationsSize = 0;

}

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/147509.html原文链接:https://javaforall.cn

0 人点赞