大家好,又见面了,我是你们的朋友全栈君。
import org.apache.beam.sdk.util.BackOff; //导入依赖的package包/类
/**
* Writes a batch of mutations to Cloud Datastore.
*
*
If a commit fails, it will be retried up to {@link #MAX_RETRIES} times. All
* mutations in the batch will be committed again, even if the commit was partially
* successful. If the retry limit is exceeded, the last exception from Cloud Datastore will be
* thrown.
*
* @throws DatastoreException if the commit fails or IOException or InterruptedException if
* backing off between retries fails.
*/
private void flushBatch() throws DatastoreException, IOException, InterruptedException {
LOG.debug(“Writing batch of {} mutations”, mutations.size());
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
while (true) {
// Batch upsert entities.
CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
commitRequest.addAllMutations(mutations);
commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
long startTime = System.currentTimeMillis(), endTime;
if (throttler.throttleRequest(startTime)) {
LOG.info(“Delaying request due to previous failures”);
throttledSeconds.inc(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS / 1000);
sleeper.sleep(WriteBatcherImpl.DATASTORE_BATCH_TARGET_LATENCY_MS);
continue;
}
try {
datastore.commit(commitRequest.build());
endTime = System.currentTimeMillis();
writeBatcher.addRequestLatency(endTime, endTime – startTime, mutations.size());
throttler.successfulRequest(startTime);
rpcSuccesses.inc();
// Break if the commit threw no exception.
break;
} catch (DatastoreException exception) {
if (exception.getCode() == Code.DEADLINE_EXCEEDED) {
/* Most errors are not related to request size, and should not change our expectation of
* the latency of successful requests. DEADLINE_EXCEEDED can be taken into
* consideration, though. */
endTime = System.currentTimeMillis();
writeBatcher.addRequestLatency(endTime, endTime – startTime, mutations.size());
}
// Only log the code and message for potentially-transient errors. The entire exception
// will be propagated upon the last retry.
LOG.error(“Error writing batch of {} mutations to Datastore ({}): {}”, mutations.size(),
exception.getCode(), exception.getMessage());
rpcErrors.inc();
if (NON_RETRYABLE_ERRORS.contains(exception.getCode())) {
throw exception;
}
if (!BackOffUtils.next(sleeper, backoff)) {
LOG.error(“Aborting after {} retries.”, MAX_RETRIES);
throw exception;
}
}
}
LOG.debug(“Successfully wrote {} mutations”, mutations.size());
mutations.clear();
mutationsSize = 0;
}
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/147509.html原文链接:https://javaforall.cn