Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
Expand Down Expand Up @@ -53,57 +52,105 @@ public class Checksum {
*/
private final ChecksumCache checksumCache;

private static Function<ByteBuffer, ByteString> newMessageDigestFunction(
String algorithm) {
final MessageDigest md;
private static MessageDigest newMessageDigest(String algorithm) {
try {
md = MessageDigest.getInstance(algorithm);
return MessageDigest.getInstance(algorithm);
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(
"Failed to get MessageDigest for " + algorithm, e);
}
return data -> {
md.reset();
md.update(data);
return ByteString.copyFrom(md.digest());
};
}

public static ByteString int2ByteString(int n) {
return UnsafeByteOperations.unsafeWrap(IntegerCodec.get().toByteArray(n));
}

private static Function<ByteBuffer, ByteString> newChecksumByteBufferFunction(
Supplier<ChecksumByteBuffer> constructor) {
final ChecksumByteBuffer algorithm = constructor.get();
return data -> {
algorithm.reset();
algorithm.update(data);
return int2ByteString((int)algorithm.getValue());
/**
* Streaming checksum strategy: feed multiple ByteBuffer slices via
* {@link #update}, then read the result via {@link #finish}, then
* {@link #reset} to start a new window. Used by both the no-cache and
* cache compute paths to avoid the {@code byte[bytesPerChecksum]}
* allocation that {@code ChunkBuffer.iterate} performs whenever a
* checksum window straddles multiple underlying buffers - the
* {@link ChecksumByteBuffer#update(ByteBuffer)} and
* {@link MessageDigest#update(ByteBuffer)} contracts both define
* incremental updates as byte-equivalent to a single update over the
* concatenation.
*/
interface StreamingChecksum {
void reset();

void update(ByteBuffer slice);

ByteString finish();
}

private static StreamingChecksum streamingCrc(
Supplier<ChecksumByteBuffer> ctor) {
final ChecksumByteBuffer cb = ctor.get();
return new StreamingChecksum() {
@Override
public void reset() {
cb.reset();
}

@Override
public void update(ByteBuffer slice) {
cb.update(slice);
}

@Override
public ByteString finish() {
return int2ByteString((int) cb.getValue());
}
};
}

private static StreamingChecksum streamingDigest(String algorithm) {
final MessageDigest md = newMessageDigest(algorithm);
return new StreamingChecksum() {
@Override
public void reset() {
md.reset();
}

@Override
public void update(ByteBuffer slice) {
md.update(slice);
}

@Override
public ByteString finish() {
return UnsafeByteOperations.unsafeWrap(md.digest());
}
};
}

/** The algorithms for {@link ChecksumType}. */
enum Algorithm {
NONE(() -> data -> ByteString.EMPTY),
CRC32(() ->
newChecksumByteBufferFunction(ChecksumByteBufferFactory::crc32Impl)),
CRC32C(() ->
newChecksumByteBufferFunction(ChecksumByteBufferFactory::crc32CImpl)),
SHA256(() -> newMessageDigestFunction("SHA-256")),
MD5(() -> newMessageDigestFunction("MD5"));

private final Supplier<Function<ByteBuffer, ByteString>> constructor;
// NONE is reachable via Algorithm.valueOf(ChecksumType.NONE) only if
// computeChecksum's NONE short-circuit is bypassed; throw to surface
// such a misuse rather than silently producing empty checksums.
NONE(() -> {
throw new UnsupportedOperationException(
"ChecksumType.NONE has no StreamingChecksum");
}),
CRC32(() -> streamingCrc(ChecksumByteBufferFactory::crc32Impl)),
CRC32C(() -> streamingCrc(ChecksumByteBufferFactory::crc32CImpl)),
SHA256(() -> streamingDigest("SHA-256")),
MD5(() -> streamingDigest("MD5"));

private final Supplier<StreamingChecksum> constructor;

static Algorithm valueOf(ChecksumType type) {
return valueOf(type.name());
}

Algorithm(Supplier<Function<ByteBuffer, ByteString>> constructor) {
Algorithm(Supplier<StreamingChecksum> constructor) {
this.constructor = constructor;
}

Function<ByteBuffer, ByteString> newChecksumFunction() {
StreamingChecksum newStreamingChecksum() {
return constructor.get();
}
}
Expand Down Expand Up @@ -218,65 +265,71 @@ public ChecksumData computeChecksum(ChunkBuffer data)
return computeChecksum(data, false);
}

/**
* @implNote The position of {@code data}'s underlying buffers is not
* advanced by this method - both the no-cache and cache paths slice via
* {@link ByteBuffer#duplicate()}.
*/
public ChecksumData computeChecksum(ChunkBuffer data, boolean useCache)
throws OzoneChecksumException {
if (checksumType == ChecksumType.NONE) {
// Since type is set to NONE, we do not need to compute the checksums
return new ChecksumData(checksumType, bytesPerChecksum);
}

final Function<ByteBuffer, ByteString> function;
final StreamingChecksum algo;
try {
function = Algorithm.valueOf(checksumType).newChecksumFunction();
algo = Algorithm.valueOf(checksumType).newStreamingChecksum();
} catch (Exception e) {
throw new OzoneChecksumException("Failed to get the checksum function for " + checksumType, e);
throw new OzoneChecksumException(
"Failed to get the checksum function for " + checksumType, e);
}

final List<ByteString> checksumList;
if (checksumCache == null || !useCache) {
// When checksumCache is not enabled:
// Checksum is computed for each bytesPerChecksum number of bytes of data
// starting at offset 0. The last checksum might be computed for the
// remaining data with length less than bytesPerChecksum.
checksumList = new ArrayList<>();
for (ByteBuffer b : data.iterate(bytesPerChecksum)) {
checksumList.add(computeChecksum(b, function, bytesPerChecksum)); // merge this?
}
} else {
// When checksumCache is enabled:
// We only need to update the last checksum in the cache, then pass it along.
checksumList = checksumCache.computeChecksum(data, function);
}
final List<ByteString> checksumList = (checksumCache == null || !useCache)
? computeChecksumDirect(data, algo)
: checksumCache.computeChecksum(data, algo, bytesPerChecksum);
return new ChecksumData(checksumType, bytesPerChecksum, checksumList);
}

/**
* Compute checksum using the algorithm for the data upto the max length.
* @param data input data
* @param function the checksum function
* @param maxLength the max length of data
* @return computed checksum ByteString
* Walk {@code data}'s underlying ByteBuffer list, slicing each window of
* {@link #bytesPerChecksum} bytes via {@link ByteBuffer#duplicate()} and
* feeding slices to {@code algo}. No linearization byte[] is allocated
* when a window straddles multiple buffers.
*/
protected static ByteString computeChecksum(ByteBuffer data,
Function<ByteBuffer, ByteString> function, int maxLength) {
final int limit = data.limit();
try {
final int maxIndex = data.position() + maxLength;
if (limit > maxIndex) {
data.limit(maxIndex);
private List<ByteString> computeChecksumDirect(ChunkBuffer data,
StreamingChecksum algo) {
final int dataLimit = data.limit();
final List<ByteString> result = new ArrayList<>(
(dataLimit + bytesPerChecksum - 1) / bytesPerChecksum);
int windowRemaining = bytesPerChecksum;
algo.reset();

for (ByteBuffer src : data.asByteBufferList()) {
int srcPos = src.position();
final int srcLim = src.limit();
while (srcPos < srcLim) {
final int n = Math.min(srcLim - srcPos, windowRemaining);
algo.update(BufferUtils.slice(src, srcPos, n));
srcPos += n;
windowRemaining -= n;
if (windowRemaining == 0) {
result.add(algo.finish());
algo.reset();
windowRemaining = bytesPerChecksum;
}
}
return function.apply(data);
} finally {
data.limit(limit);
}
if (windowRemaining < bytesPerChecksum) {
// Unaligned trailing window.
result.add(algo.finish());
}
return result;
}

public static void verifySingleChecksum(ByteBuffer buffer, int offset, int bytesPerChecksum,
ByteString checksum, ChecksumType checksumType) throws OzoneChecksumException {
final ByteBuffer duplicated = buffer.duplicate();
duplicated.position(offset).limit(offset + bytesPerChecksum);
final ChecksumData cd = new ChecksumData(checksumType, bytesPerChecksum, Collections.singletonList(checksum));
verifyChecksum(duplicated, cd, 0);
verifyChecksum(BufferUtils.slice(buffer, offset, bytesPerChecksum), cd, 0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import org.apache.hadoop.ozone.common.Checksum.StreamingChecksum;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -65,58 +66,102 @@ public List<ByteString> getChecksums() {
return checksums;
}

public List<ByteString> computeChecksum(ChunkBuffer data, Function<ByteBuffer, ByteString> function) {
// Indicates how much data the current chunk buffer holds
/**
* Recompute checksums for the windows that have changed since the last
* call: bytes {@code [ciStart * bytesPerChecksum, currChunkLength)} where
* {@code ciStart = prevChunkLength / bytesPerChecksum} (the index of the
* first window whose result may have changed - either the previously-
* partial last window now has more bytes, or new full windows have been
* appended).
*
* <p>Walks {@code data}'s underlying buffer list directly (no
* {@code iterate()} byte[] linearization) and feeds slices to {@code algo}
* incrementally; the cached prefix is skipped via index arithmetic so
* those bytes are never re-fed.
*/
public List<ByteString> computeChecksum(ChunkBuffer data,
StreamingChecksum algo, int chksumSize) {
if (chksumSize != bytesPerChecksum) {
throw new IllegalArgumentException("bytesPerChecksum mismatch: cache="
+ bytesPerChecksum + " call=" + chksumSize);
}
final int currChunkLength = data.limit();

if (currChunkLength == prevChunkLength) {
LOG.debug("ChunkBuffer data limit same as last time ({}). No new checksums need to be computed", prevChunkLength);
return checksums;
}

// Sanity check
if (currChunkLength < prevChunkLength) {
// If currChunkLength <= lastChunkLength, it indicates a bug that needs to be addressed.
// It means BOS has not properly clear()ed the cache when a new chunk is started in that code path.
throw new IllegalArgumentException("ChunkBuffer data limit (" + currChunkLength + ")" +
" must not be smaller than last time (" + prevChunkLength + ")");
// Indicates a bug: BOS did not clear() the cache before starting a new chunk.
throw new IllegalArgumentException("ChunkBuffer data limit (" + currChunkLength + ")"
+ " must not be smaller than last time (" + prevChunkLength + ")");
}

// One or more checksums need to be computed

// Start of the checksum index that need to be (re)computed
// Index of the first window that needs (re)computing.
final int ciStart = prevChunkLength / bytesPerChecksum;
final int ciEnd = currChunkLength / bytesPerChecksum + (currChunkLength % bytesPerChecksum == 0 ? 0 : 1);
int i = 0;
for (ByteBuffer b : data.iterate(bytesPerChecksum)) {
if (i < ciStart) {
i++;
final int ciEnd = currChunkLength / bytesPerChecksum
+ (currChunkLength % bytesPerChecksum == 0 ? 0 : 1);
// Bytes to skip (the cached full windows preceding ciStart).
long bytesToSkip = (long) ciStart * bytesPerChecksum;

int i = ciStart;
int windowRemaining = bytesPerChecksum;
algo.reset();
long position = 0;

for (ByteBuffer src : data.asByteBufferList()) {
int srcPos = src.position();
final int srcLim = src.limit();
final int srcLen = srcLim - srcPos;

// Fast-forward through buffers that lie entirely within the cached
// prefix.
if (position + srcLen <= bytesToSkip) {
position += srcLen;
continue;
}

// variable i can either point to:
// 1. the last element in the list -- in which case the checksum needs to be updated
// 2. one after the last element -- in which case a new checksum needs to be added
assert i == checksums.size() - 1 || i == checksums.size();

// TODO: Furthermore for CRC32/CRC32C, it can be even more efficient by updating the last checksum byte-by-byte.
final ByteString checksum = Checksum.computeChecksum(b, function, bytesPerChecksum);
if (i == checksums.size()) {
checksums.add(checksum);
} else {
checksums.set(i, checksum);
// First buffer that crosses into the not-yet-cached region: advance
// srcPos to the boundary.
if (position < bytesToSkip) {
srcPos += (int) (bytesToSkip - position);
position = bytesToSkip;
}

i++;
while (srcPos < srcLim) {
final int n = Math.min(srcLim - srcPos, windowRemaining);
algo.update(BufferUtils.slice(src, srcPos, n));
srcPos += n;
position += n;
windowRemaining -= n;
if (windowRemaining == 0) {
storeChecksum(i++, algo.finish());
algo.reset();
windowRemaining = bytesPerChecksum;
}
}
}
if (windowRemaining < bytesPerChecksum) {
// Unaligned trailing window.
storeChecksum(i++, algo.finish());
}

// Sanity check
if (i != ciEnd) {
throw new IllegalStateException("ChecksumCache: Checksum index end does not match expectation");
}

// Update last written index
prevChunkLength = currChunkLength;
return checksums;
}

private void storeChecksum(int i, ByteString cs) {
// i can either point to the last cached element (recompute - the
// previously-partial trailing window) or one past it (append a new
// checksum for newly-arrived bytes).
assert i == checksums.size() - 1 || i == checksums.size();
if (i == checksums.size()) {
checksums.add(cs);
} else {
checksums.set(i, cs);
}
}
}
Loading