Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.io.ObjectInputStream;
Expand All @@ -42,6 +44,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collector;
import javax.annotation.concurrent.Immutable;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
Expand All @@ -63,6 +66,7 @@
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
public final class BufferToDiskThenUpload extends BlobWriteSessionConfig
implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible {

private static final long serialVersionUID = 9059242302276891867L;

/**
Expand Down Expand Up @@ -211,41 +215,78 @@ public ApiFuture<BlobInfo> getResult() {
return result;
}

@SuppressWarnings("UnstableApiUsage")
private final class Flusher implements WritableByteChannel {

private final WritableByteChannel delegate;
private final Hasher cumulativeCrc32c;
private final ReentrantLock lock;

private Flusher(WritableByteChannel delegate) {
this.delegate = delegate;
this.cumulativeCrc32c =
opts.getHasher().initialValue() == null ? null : Hashing.crc32c().newHasher();
this.lock = new ReentrantLock();
}

@Override
public int write(ByteBuffer src) throws IOException {
return delegate.write(src);
lock.lock();
try {
if (cumulativeCrc32c != null) {
cumulativeCrc32c.putBytes(src.duplicate());
}
return delegate.write(src);
} finally {
lock.unlock();
}
}

@Override
public boolean isOpen() {
return delegate.isOpen();
lock.lock();
try {
return delegate.isOpen();
} finally {
lock.unlock();
}
}

@Override
public void close() throws IOException {
delegate.close();
try (RecoveryFile rf = Factory.WriteToFileThenUpload.this.rf) {
Path path = rf.getPath();
long size = Files.size(path);
ThroughputSink.computeThroughput(
clock,
gcs,
size,
() -> {
BlobInfo blob = storage.internalCreateFrom(path, info, opts);
result.set(blob);
});
} catch (StorageException | IOException e) {
result.setException(e);
throw e;
lock.lock();
try {

delegate.close();
try (RecoveryFile rf = Factory.WriteToFileThenUpload.this.rf) {
Path path = rf.getPath();
long size = Files.size(path);
ThroughputSink.computeThroughput(
clock,
gcs,
size,
() -> {
BlobInfo pendingInfo = info;
Opts<ObjectTargetOpt> pendingOpts = opts;
if (cumulativeCrc32c != null) {
int hashCodeInt = cumulativeCrc32c.hash().asInt();
pendingInfo =
pendingInfo.toBuilder()
.clearMd5()
.clearCrc32c()
.setCrc32c(Utils.crc32cCodec.encode(hashCodeInt))
.build();
pendingOpts = opts.prepend(Opts.from(UnifiedOpts.crc32cMatch(hashCodeInt)));
}
BlobInfo blob = storage.internalCreateFrom(path, pendingInfo, pendingOpts);
result.set(blob);
});
} catch (StorageException | IOException e) {
result.setException(e);
throw e;
}
} finally {
lock.unlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.storage;

import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.Objects;

Expand Down Expand Up @@ -56,6 +57,10 @@ public boolean eqValue(Crc32cValue<?> other) {
return this.getValue() == other.getValue();
}

static Crc32cLengthKnown zero() {
return Crc32cLengthKnown.ZERO;
}

static Crc32cLengthUnknown of(int value) {
return new Crc32cLengthUnknown(value);
}
Expand All @@ -81,6 +86,9 @@ public int getValue() {

@Override
public Crc32cLengthUnknown concat(Crc32cLengthKnown other) {
if (other == Crc32cLengthKnown.ZERO) {
return this;
}
int combined = Crc32cUtility.concatCrc32c(value, other.value, other.length);
return new Crc32cLengthUnknown(combined);
}
Expand Down Expand Up @@ -118,6 +126,7 @@ public int hashCode() {
}

static final class Crc32cLengthKnown extends Crc32cValue<Crc32cLengthKnown> {
private static final Crc32cLengthKnown ZERO = Hasher.enabled().hash(ByteBuffer.allocate(0));
private final int value;
private final long length;

Expand All @@ -137,6 +146,11 @@ public long getLength() {

@Override
public Crc32cLengthKnown concat(Crc32cLengthKnown other) {
if (other == ZERO) {
return this;
} else if (this == ZERO) {
return other;
}
int combined = Crc32cUtility.concatCrc32c(value, other.value, other.length);
return new Crc32cLengthKnown(combined, length + other.length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
grpc.storageClient
.writeObjectCallable()
.withDefaultCallContext(grpcCallContext))
.setHasher(Hasher.noop())
.setHasher(opts.getHasher())
.setByteStringStrategy(ByteStringStrategy.copy())
.resumable()
.withRetryConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ final class GapicWritableByteChannelSessionBuilder {
GapicWritableByteChannelSessionBuilder(
ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write) {
this.write = write;
this.hasher = Hasher.noop();
this.hasher = Hasher.defaultHasher();
this.byteStringStrategy = ByteStringStrategy.copy();
}

/**
* Set the {@link Hasher} to apply to the bytes passing through the built session's channel.
*
* <p>Default: {@link Hasher#noop()}
* <p>Default: {@link Hasher#defaultHasher()}
*
* @see Hasher#enabled()
* @see Hasher#noop()
Expand Down Expand Up @@ -179,14 +179,17 @@ UnbufferedDirectUploadBuilder setRequest(WriteObjectRequest req) {
}

UnbufferedWritableByteChannelSession<WriteObjectResponse> build() {
ChunkSegmenter chunkSegmenter = getChunkSegmenter();
return new UnbufferedWriteSession<>(
ApiFutures.immediateFuture(requireNonNull(req, "req must be non null")),
lift((WriteObjectRequest start, SettableApiFuture<WriteObjectResponse> resultFuture) ->
new GapicUnbufferedDirectWritableByteChannel(
resultFuture,
getChunkSegmenter(),
chunkSegmenter,
write,
new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(start))))
WriteCtx.of(
WriteObjectRequestBuilderFactory.simple(start),
chunkSegmenter.getHasher())))
.andThen(StorageByteChannels.writable()::createSynchronized));
}
}
Expand All @@ -207,14 +210,17 @@ BufferedDirectUploadBuilder setRequest(WriteObjectRequest req) {
}

BufferedWritableByteChannelSession<WriteObjectResponse> build() {
ChunkSegmenter chunkSegmenter = getChunkSegmenter();
return new BufferedWriteSession<>(
ApiFutures.immediateFuture(requireNonNull(req, "req must be non null")),
lift((WriteObjectRequest start, SettableApiFuture<WriteObjectResponse> resultFuture) ->
new GapicUnbufferedDirectWritableByteChannel(
resultFuture,
getChunkSegmenter(),
chunkSegmenter,
write,
new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(start))))
WriteCtx.of(
WriteObjectRequestBuilderFactory.simple(start),
chunkSegmenter.getHasher())))
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
.andThen(StorageByteChannels.writable()::createSynchronized));
}
Expand Down Expand Up @@ -290,20 +296,24 @@ UnbufferedResumableUploadBuilder setStartAsync(ApiFuture<ResumableWrite> start)

UnbufferedWritableByteChannelSession<WriteObjectResponse> build() {
RetrierWithAlg boundRetrier = retrier;
ChunkSegmenter chunkSegmenter = getChunkSegmenter();
return new UnbufferedWriteSession<>(
requireNonNull(start, "start must be non null"),
lift((ResumableWrite start, SettableApiFuture<WriteObjectResponse> result) -> {
if (fsyncEvery) {
return new GapicUnbufferedChunkedResumableWritableByteChannel(
result,
getChunkSegmenter(),
chunkSegmenter,
write,
new WriteCtx<>(start),
WriteCtx.of(start, chunkSegmenter.getHasher()),
boundRetrier,
Retrying::newCallContext);
} else {
return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
result, getChunkSegmenter(), write, new WriteCtx<>(start));
result,
chunkSegmenter,
write,
WriteCtx.of(start, chunkSegmenter.getHasher()));
}
})
.andThen(StorageByteChannels.writable()::createSynchronized));
Expand All @@ -330,20 +340,24 @@ BufferedResumableUploadBuilder setStartAsync(ApiFuture<ResumableWrite> start) {
}

BufferedWritableByteChannelSession<WriteObjectResponse> build() {
ChunkSegmenter chunkSegmenter = getChunkSegmenter();
return new BufferedWriteSession<>(
requireNonNull(start, "start must be non null"),
lift((ResumableWrite start, SettableApiFuture<WriteObjectResponse> result) -> {
if (fsyncEvery) {
return new GapicUnbufferedChunkedResumableWritableByteChannel(
result,
getChunkSegmenter(),
chunkSegmenter,
write,
new WriteCtx<>(start),
WriteCtx.of(start, chunkSegmenter.getHasher()),
retrier,
Retrying::newCallContext);
} else {
return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
result, getChunkSegmenter(), write, new WriteCtx<>(start));
result,
chunkSegmenter,
write,
WriteCtx.of(start, chunkSegmenter.getHasher()));
}
})
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... op
GrpcCallContext grpcCallContext =
optsWithDefaults.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, optsWithDefaults);
Hasher hasher = Hasher.enabled();
Hasher hasher = optsWithDefaults.getHasher();
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
Expand Down Expand Up @@ -324,7 +324,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> o
write,
storageClient.queryWriteStatusCallable(),
rw,
Hasher.noop()),
opts.getHasher()),
MoreExecutors.directExecutor());
try {
GrpcResumableSession got = session2.get();
Expand Down Expand Up @@ -365,7 +365,7 @@ public Blob createFrom(
.write()
.byteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
.setHasher(Hasher.noop())
.setHasher(opts.getHasher())
.setByteStringStrategy(ByteStringStrategy.noCopy())
.resumable()
.withRetryConfig(retrier.withAlg(retryAlgorithmManager.idempotent()))
Expand Down Expand Up @@ -779,7 +779,7 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Hasher hasher = Hasher.noop();
Hasher hasher = opts.getHasher();
// in JSON, the starting of the resumable session happens before the invocation of write can
// happen. Emulate the same thing here.
// 1. create the future
Expand All @@ -804,7 +804,7 @@ public BlobInfo internalDirectUpload(
GrpcCallContext grpcCallContext =
optsWithDefaults.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, optsWithDefaults);
Hasher hasher = Hasher.enabled();
Hasher hasher = opts.getHasher();
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
RewindableContent content = RewindableContent.of(buf);
return retrier.run(
Expand Down
Loading