Skip to content

Commit c6da3cf

Browse files
authored
HDDS-13963. Atomic Create-If-Not-Exists (#9332)
1 parent 99f4132 commit c6da3cf

12 files changed

Lines changed: 244 additions & 28 deletions

File tree

hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,8 @@ public final class OzoneConsts {
318318
public static final String TENANT = "tenant";
319319
public static final String USER_PREFIX = "userPrefix";
320320
public static final String REWRITE_GENERATION = "rewriteGeneration";
321+
/** Sentinel generation used to request atomic create-if-not-exists(put if absent) semantics. */
322+
public static final long EXPECTED_GEN_CREATE_IF_NOT_EXISTS = -1L;
321323
public static final String FROM_SNAPSHOT = "fromSnapshot";
322324
public static final String TO_SNAPSHOT = "toSnapshot";
323325
public static final String TOKEN = "token";

hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ public enum OzoneManagerVersion implements ComponentVersion {
5454

5555
S3_LIST_MULTIPART_UPLOADS_PAGINATION(11,
5656
"OzoneManager version that supports S3 list multipart uploads API with pagination"),
57+
58+
ATOMIC_CREATE_IF_NOT_EXISTS(12,
59+
"OzoneManager version that supports explicit create-if-not-exists key semantics"),
5760

5861
FUTURE_VERSION(-1, "Used internally in the client when the server side is "
5962
+ " newer and an unknown server version has arrived to the client.");

hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ public OzoneOutputStream createKey(String key, long size,
508508
*
509509
* @param keyName Existing key to rewrite. This must exist in the bucket.
510510
* @param size The size of the new key
511-
* @param existingKeyGeneration The generation of the existing key which is checked for changes at key create
511+
* @param existingKeyGeneration The positive generation of the existing key which is checked for changes at key create
512512
* and commit time.
513513
* @param replicationConfig The replication configuration for the key to be rewritten.
514514
* @param metadata custom key value metadata
@@ -1047,8 +1047,8 @@ public List<OzoneFileStatus> listStatus(String keyName, boolean recursive,
10471047
*
10481048
* @param prefix Optional string to filter for the selected keys.
10491049
*/
1050-
public OzoneMultipartUploadList listMultipartUploads(String prefix,
1051-
String keyMarker, String uploadIdMarker, int maxUploads)
1050+
public OzoneMultipartUploadList listMultipartUploads(String prefix,
1051+
String keyMarker, String uploadIdMarker, int maxUploads)
10521052
throws IOException {
10531053
return proxy.listMultipartUploads(volumeName, getName(), prefix, keyMarker, uploadIdMarker, maxUploads);
10541054
}

hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ OzoneOutputStream createKey(String volumeName, String bucketName,
368368
* @param bucketName Name of the Bucket
369369
* @param keyName Existing key to rewrite. This must exist in the bucket.
370370
* @param size The size of the new key
371-
* @param existingKeyGeneration The generation of the existing key which is checked for changes at key create
371+
* @param existingKeyGeneration The positive generation of the existing key which is checked for changes at key create
372372
* and commit time.
373373
* @param replicationConfig The replication configuration for the key to be rewritten.
374374
* @param metadata custom key value metadata

hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,7 @@ public void createBucket(
677677
builder.setDefaultReplicationConfig(defaultReplicationConfig);
678678
}
679679

680-
String replicationType = defaultReplicationConfig == null
680+
String replicationType = defaultReplicationConfig == null
681681
? "server-side default replication type"
682682
: defaultReplicationConfig.getType().toString();
683683

@@ -1317,7 +1317,7 @@ public List<OzoneBucket> listBuckets(String volumeName, String bucketPrefix,
13171317
List<OmBucketInfo> buckets = ozoneManagerClient.listBuckets(
13181318
volumeName, prevBucket, bucketPrefix, maxListResult, hasSnapshot);
13191319

1320-
return buckets.stream().map(bucket ->
1320+
return buckets.stream().map(bucket ->
13211321
OzoneBucket.newBuilder(conf, this)
13221322
.setVolumeName(bucket.getVolumeName())
13231323
.setName(bucket.getBucketName())
@@ -1408,6 +1408,9 @@ public OzoneOutputStream rewriteKey(String volumeName, String bucketName, String
14081408
if (omVersion.compareTo(OzoneManagerVersion.ATOMIC_REWRITE_KEY) < 0) {
14091409
throw new IOException("OzoneManager does not support atomic key rewrite.");
14101410
}
1411+
Preconditions.checkArgument(existingKeyGeneration > 0,
1412+
"existingKeyGeneration must be positive, but was %s",
1413+
existingKeyGeneration);
14111414

14121415
createKeyPreChecks(volumeName, bucketName, keyName, replicationConfig);
14131416

hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ public enum ResultCodes {
208208
USER_MISMATCH, // Error code when requested user name passed is different
209209
// from remote user.
210210

211-
INVALID_PART, // When part name is not found or not matching with partname
211+
INVALID_PART, // When part name is not found or not matching with partname
212212
// in OM MPU partInfo.
213213

214214
INVALID_PART_ORDER, // When list of parts mentioned to complete MPU are not
@@ -267,7 +267,7 @@ public enum ResultCodes {
267267
UNAUTHORIZED,
268268

269269
S3_SECRET_ALREADY_EXISTS,
270-
270+
271271
INVALID_PATH,
272272
TOO_MANY_BUCKETS,
273273
KEY_UNDER_LEASE_RECOVERY,

hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,7 @@ public String createSnapshot(String volumeName,
12551255
if (!StringUtils.isBlank(snapshotName)) {
12561256
requestBuilder.setSnapshotName(snapshotName);
12571257
}
1258-
1258+
12591259
final OMRequest omRequest = createOMRequest(Type.CreateSnapshot)
12601260
.setCreateSnapshotRequest(requestBuilder)
12611261
.build();

hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/OzoneRpcClientTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL;
3737
import static org.apache.hadoop.ozone.OzoneConsts.DEFAULT_OM_UPDATE_ID;
3838
import static org.apache.hadoop.ozone.OzoneConsts.ETAG;
39+
import static org.apache.hadoop.ozone.OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS;
3940
import static org.apache.hadoop.ozone.OzoneConsts.GB;
4041
import static org.apache.hadoop.ozone.OzoneConsts.MD5_HASH;
4142
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
@@ -1435,6 +1436,26 @@ void rewriteFailsDueToOutdatedGenerationAtCommit(BucketLayout layout) throws IOE
14351436
assertUnchanged(keyInfo, ozoneManager.lookupKey(keyArgs));
14361437
}
14371438

1439+
@ParameterizedTest
1440+
@EnumSource
1441+
void rewriteRejectsNonPositiveGeneration(BucketLayout layout)
1442+
throws IOException {
1443+
checkFeatureEnable(OzoneManagerVersion.ATOMIC_REWRITE_KEY);
1444+
OzoneBucket bucket = createBucket(layout);
1445+
OzoneKeyDetails key1Details = createTestKey(bucket, "key1", "value".getBytes(UTF_8));
1446+
IllegalArgumentException e = assertThrows(IllegalArgumentException.class,
1447+
() -> {
1448+
bucket.rewriteKey("key2",
1449+
1024,
1450+
EXPECTED_GEN_CREATE_IF_NOT_EXISTS,
1451+
RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE),
1452+
singletonMap("key", "value"));
1453+
});
1454+
1455+
assertThat(e).hasMessageContaining("existingKeyGeneration must be positive");
1456+
assertKeyContent(bucket, key1Details.getName(), "value".getBytes(UTF_8));
1457+
}
1458+
14381459
@ParameterizedTest
14391460
@EnumSource
14401461
void cannotRewriteDeletedKey(BucketLayout layout) throws IOException {

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequest.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,13 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
9494
KeyArgs keyArgs = commitKeyRequest.getKeyArgs();
9595

9696
if (keyArgs.hasExpectedDataGeneration()) {
97-
ozoneManager.checkFeatureEnabled(OzoneManagerVersion.ATOMIC_REWRITE_KEY);
97+
if (keyArgs.getExpectedDataGeneration()
98+
== OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) {
99+
ozoneManager.checkFeatureEnabled(
100+
OzoneManagerVersion.ATOMIC_CREATE_IF_NOT_EXISTS);
101+
} else {
102+
ozoneManager.checkFeatureEnabled(OzoneManagerVersion.ATOMIC_REWRITE_KEY);
103+
}
98104
}
99105

100106
if (ozoneManager.getConfig().isKeyNameCharacterCheckEnabled()) {
@@ -616,14 +622,23 @@ protected void validateAtomicRewrite(OmKeyInfo existing, OmKeyInfo toCommit, Map
616622
if (toCommit.getExpectedDataGeneration() != null) {
617623
// These values are not passed in the request keyArgs, so add them into the auditMap if they are present
618624
// in the open key entry.
619-
auditMap.put(OzoneConsts.REWRITE_GENERATION, String.valueOf(toCommit.getExpectedDataGeneration()));
620-
if (existing == null) {
621-
throw new OMException("Atomic rewrite is not allowed for a new key", KEY_NOT_FOUND);
622-
}
623-
if (!toCommit.getExpectedDataGeneration().equals(existing.getUpdateID())) {
624-
throw new OMException("Cannot commit as current generation (" + existing.getUpdateID() +
625-
") does not match the expected generation to rewrite (" + toCommit.getExpectedDataGeneration() + ")",
626-
KEY_NOT_FOUND);
625+
Long expectedGen = toCommit.getExpectedDataGeneration();
626+
auditMap.put(OzoneConsts.REWRITE_GENERATION, String.valueOf(expectedGen));
627+
628+
if (expectedGen == OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) {
629+
if (existing != null) {
630+
throw new OMException("Key already exists",
631+
OMException.ResultCodes.KEY_ALREADY_EXISTS);
632+
}
633+
} else {
634+
if (existing == null) {
635+
throw new OMException("Atomic rewrite is not allowed for a new key", KEY_NOT_FOUND);
636+
}
637+
if (expectedGen != existing.getUpdateID()) {
638+
throw new OMException("Cannot commit as current generation (" + existing.getUpdateID() +
639+
") does not match the expected generation to rewrite (" + expectedGen + ")",
640+
KEY_NOT_FOUND);
641+
}
627642
}
628643
}
629644
}

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCreateRequest.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
3737
import org.apache.hadoop.hdds.utils.UniqueId;
3838
import org.apache.hadoop.ozone.OmUtils;
39+
import org.apache.hadoop.ozone.OzoneConsts;
3940
import org.apache.hadoop.ozone.OzoneManagerVersion;
4041
import org.apache.hadoop.ozone.audit.OMAction;
4142
import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -95,7 +96,13 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
9596
final OMPerformanceMetrics perfMetrics = ozoneManager.getPerfMetrics();
9697

9798
if (keyArgs.hasExpectedDataGeneration()) {
98-
ozoneManager.checkFeatureEnabled(OzoneManagerVersion.ATOMIC_REWRITE_KEY);
99+
if (keyArgs.getExpectedDataGeneration()
100+
== OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) {
101+
ozoneManager.checkFeatureEnabled(
102+
OzoneManagerVersion.ATOMIC_CREATE_IF_NOT_EXISTS);
103+
} else {
104+
ozoneManager.checkFeatureEnabled(OzoneManagerVersion.ATOMIC_REWRITE_KEY);
105+
}
99106
}
100107

101108
OmUtils.verifyKeyNameWithSnapshotReservedWord(keyArgs.getKeyName());
@@ -189,7 +196,7 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
189196

190197
KeyArgs.Builder finalNewKeyArgs = newKeyArgs;
191198
KeyArgs resolvedKeyArgs =
192-
captureLatencyNs(perfMetrics.getCreateKeyResolveBucketAndAclCheckLatencyNs(),
199+
captureLatencyNs(perfMetrics.getCreateKeyResolveBucketAndAclCheckLatencyNs(),
193200
() -> resolveBucketAndCheckKeyAcls(finalNewKeyArgs.build(), ozoneManager,
194201
IAccessAuthorizer.ACLType.CREATE));
195202
newCreateKeyRequest =
@@ -369,7 +376,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
369376
} else {
370377
perfMetrics.addCreateKeyFailureLatencyNs(createKeyLatency);
371378
}
372-
379+
373380
if (acquireLock) {
374381
mergeOmLockDetails(ozoneLockStrategy
375382
.releaseWriteLock(omMetadataManager, volumeName,
@@ -471,12 +478,22 @@ public static OMRequest blockCreateKeyWithBucketLayoutFromOldClient(
471478
protected void validateAtomicRewrite(OmKeyInfo dbKeyInfo, KeyArgs keyArgs)
472479
throws OMException {
473480
if (keyArgs.hasExpectedDataGeneration()) {
474-
// If a key does not exist, or if it exists but the updateID do not match, then fail this request.
475-
if (dbKeyInfo == null) {
476-
throw new OMException("Key not found during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND);
477-
}
478-
if (dbKeyInfo.getUpdateID() != keyArgs.getExpectedDataGeneration()) {
479-
throw new OMException("Generation mismatch during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND);
481+
long expectedGen = keyArgs.getExpectedDataGeneration();
482+
// If expectedGen is EXPECTED_GEN_CREATE_IF_NOT_EXISTS, it means the key MUST NOT exist (If-None-Match)
483+
if (expectedGen == OzoneConsts.EXPECTED_GEN_CREATE_IF_NOT_EXISTS) {
484+
if (dbKeyInfo != null) {
485+
throw new OMException("Key already exists",
486+
OMException.ResultCodes.KEY_ALREADY_EXISTS);
487+
}
488+
} else {
489+
// If a key does not exist, or if it exists but the updateID do not match, then fail this request.
490+
if (dbKeyInfo == null) {
491+
throw new OMException("Key not found during expected rewrite", OMException.ResultCodes.KEY_NOT_FOUND);
492+
}
493+
if (dbKeyInfo.getUpdateID() != expectedGen) {
494+
throw new OMException("Generation mismatch during expected rewrite",
495+
OMException.ResultCodes.KEY_NOT_FOUND);
496+
}
480497
}
481498
}
482499
}

0 commit comments

Comments
 (0)