Skip to content

Commit

Permalink
fetch_server: support digest function and blake3
Browse files Browse the repository at this point in the history
Implement support for digest function in FetchBlobRequest as well as
usage of Blake3 as a digest func.
  • Loading branch information
sluongng committed Apr 16, 2024
1 parent 1cab5cb commit bb4c9d9
Show file tree
Hide file tree
Showing 3 changed files with 328 additions and 57 deletions.
20 changes: 19 additions & 1 deletion server/remote_asset/fetch_server/BUILD
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "fetch_server",
Expand All @@ -23,3 +23,21 @@ go_library(
"@org_golang_google_protobuf//types/known/durationpb",
],
)

go_test(
name = "fetch_server_test",
srcs = ["fetch_server_test.go"],
deps = [
":fetch_server",
"//proto:remote_asset_go_proto",
"//proto:remote_execution_go_proto",
"//server/remote_cache/byte_stream_server",
"//server/remote_cache/digest",
"//server/testutil/testenv",
"//server/util/scratchspace",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_genproto_googleapis_bytestream//:bytestream",
"@org_golang_google_grpc//:go_default_library",
],
)
139 changes: 83 additions & 56 deletions server/remote_asset/fetch_server/fetch_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
)

const (
checksumQualifier = "checksum.sri"
ChecksumQualifier = "checksum.sri"
sha256Prefix = "sha256-"
blake3Prefix = "blake3-"
maxHTTPTimeout = 60 * time.Minute
)

Expand Down Expand Up @@ -64,9 +65,6 @@ func checkPreconditions(env environment.Env) error {
if env.GetCache() == nil {
return status.FailedPreconditionError("missing Cache")
}
if env.GetByteStreamClient() == nil {
return status.FailedPreconditionError("missing ByteStreamClient")
}
return nil
}

Expand Down Expand Up @@ -109,52 +107,41 @@ func (p *FetchServer) FetchBlob(ctx context.Context, req *rapb.FetchBlobRequest)
return nil, err
}

var expectedSHA256 string

digestFunction := req.GetDigestFunction()
if digestFunction == repb.DigestFunction_UNKNOWN {
digestFunction = repb.DigestFunction_SHA256
}
var hash string
for _, qualifier := range req.GetQualifiers() {
if qualifier.GetName() == checksumQualifier && strings.HasPrefix(qualifier.GetValue(), sha256Prefix) {
b64sha256 := strings.TrimPrefix(qualifier.GetValue(), sha256Prefix)
sha256, err := base64.StdEncoding.DecodeString(b64sha256)
if err != nil {
return nil, status.FailedPreconditionErrorf("Error decoding qualifier %q: %s", qualifier.GetName(), err.Error())
}
blobDigest := &repb.Digest{
Hash: fmt.Sprintf("%x", sha256),
// The digest size is unknown since the client only sends up
// the hash. We can look up the size using the Metadata API,
// which looks up only using the hash, so the size we pass here
// doesn't matter.
SizeBytes: 1,
var prefix string
if qualifier.GetName() == ChecksumQualifier {
if strings.HasPrefix(qualifier.GetValue(), sha256Prefix) {
if digestFunction != repb.DigestFunction_SHA256 {
log.Warningf("FetchBlob request came with %s digest function but SHA256 checksum.sri: %s", digestFunction, qualifier.GetValue())
}
digestFunction = repb.DigestFunction_SHA256
prefix = sha256Prefix
} else if strings.HasPrefix(qualifier.GetValue(), blake3Prefix) {
if digestFunction != repb.DigestFunction_BLAKE3 {
log.Warningf("FetchBlob request came with %s digest function but BLAKE3 checksum.sri: %s", digestFunction, qualifier.GetValue())
}
digestFunction = repb.DigestFunction_BLAKE3
prefix = blake3Prefix
}
expectedSHA256 = blobDigest.Hash
cacheRN := digest.NewResourceName(blobDigest, req.GetInstanceName(), rspb.CacheType_CAS, repb.DigestFunction_SHA256)

log.CtxInfof(ctx, "Looking up %s in cache", blobDigest.Hash)

// Lookup metadata to get the correct digest size to be returned to
// the client.
cache := p.env.GetCache()
md, err := cache.Metadata(ctx, cacheRN.ToProto())
if err != nil {
log.CtxInfof(ctx, "FetchServer failed to get metadata for %s: %s", expectedSHA256, err)
continue
}
blobDigest.SizeBytes = md.DigestSizeBytes

// Even though we successfully fetched metadata, we need to renew
// the cache entry (using Contains()) to ensure that it doesn't
// expire by the time the client requests it from cache.
cacheRN = digest.NewResourceName(blobDigest, req.GetInstanceName(), rspb.CacheType_CAS, repb.DigestFunction_SHA256)
exists, err := cache.Contains(ctx, cacheRN.ToProto())
}
if prefix != "" {
b64hash := strings.TrimPrefix(qualifier.GetValue(), prefix)
decodedHash, err := base64.StdEncoding.DecodeString(b64hash)
if err != nil {
log.CtxErrorf(ctx, "Failed to renew %s: %s", digest.String(blobDigest), err)
continue
}
if !exists {
log.CtxInfof(ctx, "Blob %s expired before we could renew it", digest.String(blobDigest))
continue
return nil, status.FailedPreconditionErrorf("Error decoding qualifier %q: %s", qualifier.GetName(), err.Error())
}
log.CtxInfof(ctx, "FetchServer found %s in cache", digest.String(blobDigest))
hash = fmt.Sprintf("%x", decodedHash)
break
}
}
if len(hash) != 0 {
blobDigest := p.findBlobInCache(ctx, req.GetInstanceName(), digestFunction, hash)
if blobDigest != nil {
return &rapb.FetchBlobResponse{
Status: &statuspb.Status{Code: int32(gcodes.OK)},
BlobDigest: blobDigest,
Expand All @@ -172,7 +159,7 @@ func (p *FetchServer) FetchBlob(ctx context.Context, req *rapb.FetchBlobRequest)
if err != nil {
return nil, status.InvalidArgumentErrorf("unparsable URI: %q", uri)
}
blobDigest, err := mirrorToCache(ctx, p.env.GetByteStreamClient(), req.GetInstanceName(), httpClient, uri, expectedSHA256)
blobDigest, err := mirrorToCache(ctx, p.env.GetByteStreamClient(), req.GetInstanceName(), digestFunction, httpClient, uri, hash)
if err != nil {
lastFetchErr = err
log.CtxWarningf(ctx, "Failed to mirror %q to cache: %s", uri, err)
Expand Down Expand Up @@ -203,11 +190,51 @@ func (p *FetchServer) FetchDirectory(ctx context.Context, req *rapb.FetchDirecto
return nil, status.UnimplementedError("FetchDirectory is not yet implemented")
}

func (p *FetchServer) findBlobInCache(ctx context.Context, instanceName string, digestFunction repb.DigestFunction_Value, hash string) *repb.Digest {
blobDigest := &repb.Digest{
Hash: hash,
// The digest size is unknown since the client only sends up
// the hash. We can look up the size using the Metadata API,
// which looks up only using the hash, so the size we pass here
// doesn't matter.
SizeBytes: 1,
}
expectedHash := blobDigest.Hash
cacheRN := digest.NewResourceName(blobDigest, instanceName, rspb.CacheType_CAS, digestFunction)
log.CtxInfof(ctx, "Looking up %s in cache", blobDigest.Hash)

// Lookup metadata to get the correct digest size to be returned to
// the client.
cache := p.env.GetCache()
md, err := cache.Metadata(ctx, cacheRN.ToProto())
if err != nil {
log.CtxInfof(ctx, "FetchServer failed to get metadata for %s: %s", expectedHash, err)
return nil
}
blobDigest.SizeBytes = md.DigestSizeBytes

// Even though we successfully fetched metadata, we need to renew
// the cache entry (using Contains()) to ensure that it doesn't
// expire by the time the client requests it from cache.
cacheRN = digest.NewResourceName(blobDigest, instanceName, rspb.CacheType_CAS, digestFunction)
exists, err := cache.Contains(ctx, cacheRN.ToProto())
if err != nil {
log.CtxErrorf(ctx, "Failed to renew %s: %s", digest.String(blobDigest), err)
return nil
}
if !exists {
log.CtxInfof(ctx, "Blob %s expired before we could renew it", digest.String(blobDigest))
return nil
}
log.CtxInfof(ctx, "FetchServer found %s in cache", digest.String(blobDigest))
return blobDigest
}

// mirrorToCache uploads the contents at the given URI to the given cache,
// returning the digest. The fetched contents are checked against the given
// expectedSHA256 (if non-empty), and if there is a mismatch then an error is
// expectedHash (if non-empty), and if there is a mismatch then an error is
// returned.
func mirrorToCache(ctx context.Context, bsClient bspb.ByteStreamClient, remoteInstanceName string, httpClient *http.Client, uri, expectedSHA256 string) (*repb.Digest, error) {
func mirrorToCache(ctx context.Context, bsClient bspb.ByteStreamClient, remoteInstanceName string, digestFunc repb.DigestFunction_Value, httpClient *http.Client, uri, expectedHash string) (*repb.Digest, error) {
log.CtxInfof(ctx, "Fetching %s", uri)
rsp, err := httpClient.Get(uri)
if err != nil {
Expand All @@ -218,12 +245,12 @@ func mirrorToCache(ctx context.Context, bsClient bspb.ByteStreamClient, remoteIn
return nil, status.UnavailableErrorf("failed to fetch %q: HTTP %s", uri, err)
}

// If we know what the SHA256 should be and the content length is known,
// If we know what the hash should be and the content length is known,
// then we know the full digest, and can pipe directly from the HTTP
// response to cache.
if expectedSHA256 != "" && rsp.ContentLength >= 0 {
d := &repb.Digest{Hash: expectedSHA256, SizeBytes: rsp.ContentLength}
rn := digest.NewResourceName(d, remoteInstanceName, rspb.CacheType_CAS, repb.DigestFunction_SHA256)
if expectedHash != "" && rsp.ContentLength >= 0 {
d := &repb.Digest{Hash: expectedHash, SizeBytes: rsp.ContentLength}
rn := digest.NewResourceName(d, remoteInstanceName, rspb.CacheType_CAS, digestFunc)
if _, err := cachetools.UploadFromReader(ctx, bsClient, rn, rsp.Body); err != nil {
return nil, status.UnavailableErrorf("failed to upload %s to cache: %s", digest.String(d), err)
}
Expand All @@ -246,12 +273,12 @@ func mirrorToCache(ctx context.Context, bsClient bspb.ByteStreamClient, remoteIn
log.Errorf("Failed to remove temp file: %s", err)
}
}()
blobDigest, err := cachetools.UploadFile(ctx, bsClient, remoteInstanceName, repb.DigestFunction_SHA256, tmpFilePath)
blobDigest, err := cachetools.UploadFile(ctx, bsClient, remoteInstanceName, digestFunc, tmpFilePath)
if err != nil {
return nil, status.UnavailableErrorf("failed to add object to cache: %s", err)
}
if expectedSHA256 != "" && blobDigest.Hash != expectedSHA256 {
return nil, status.InvalidArgumentErrorf("response body checksum for %q was %q but wanted %q", uri, blobDigest.Hash, expectedSHA256)
if expectedHash != "" && blobDigest.Hash != expectedHash {
return nil, status.InvalidArgumentErrorf("response body checksum for %q was %q but wanted %q", uri, blobDigest.Hash, expectedHash)
}
log.CtxInfof(ctx, "Mirrored %s to cache (digest: %s)", uri, digest.String(blobDigest))
return blobDigest, nil
Expand Down
Loading

0 comments on commit bb4c9d9

Please sign in to comment.