From d5325631a9bc707c848da863366461a190b20e04 Mon Sep 17 00:00:00 2001 From: Son Luong Ngoc Date: Tue, 23 Apr 2024 07:17:37 +0200 Subject: [PATCH] fetch_server: support digest function and blake3 (#6382) Co-authored-by: Brandon Duffany --- server/remote_asset/fetch_server/BUILD | 22 +- .../remote_asset/fetch_server/fetch_server.go | 187 +++++++--- .../fetch_server/fetch_server_test.go | 326 ++++++++++++++++++ 3 files changed, 477 insertions(+), 58 deletions(-) create mode 100644 server/remote_asset/fetch_server/fetch_server_test.go diff --git a/server/remote_asset/fetch_server/BUILD b/server/remote_asset/fetch_server/BUILD index ae20da79ed3..42f126faf88 100644 --- a/server/remote_asset/fetch_server/BUILD +++ b/server/remote_asset/fetch_server/BUILD @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "fetch_server", @@ -23,3 +23,23 @@ go_library( "@org_golang_google_protobuf//types/known/durationpb", ], ) + +go_test( + name = "fetch_server_test", + srcs = ["fetch_server_test.go"], + deps = [ + ":fetch_server", + "//proto:remote_asset_go_proto", + "//proto:remote_execution_go_proto", + "//proto:resource_go_proto", + "//server/remote_cache/byte_stream_server", + "//server/remote_cache/digest", + "//server/testutil/testenv", + "//server/util/prefix", + "//server/util/scratchspace", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_golang_google_genproto_googleapis_bytestream//:bytestream", + "@org_golang_google_grpc//:go_default_library", + ], +) diff --git a/server/remote_asset/fetch_server/fetch_server.go b/server/remote_asset/fetch_server/fetch_server.go index e0f2d3044c8..96cfb4398c2 100644 --- a/server/remote_asset/fetch_server/fetch_server.go +++ b/server/remote_asset/fetch_server/fetch_server.go @@ -31,8 +31,9 @@ import ( ) const ( - checksumQualifier = "checksum.sri" + ChecksumQualifier = "checksum.sri" sha256Prefix = "sha256-" + blake3Prefix = "blake3-" maxHTTPTimeout = 60 * time.Minute ) @@ -64,9 +65,6 @@ func checkPreconditions(env environment.Env) error { if env.GetCache() == nil { return status.FailedPreconditionError("missing Cache") } - if env.GetByteStreamClient() == nil { - return status.FailedPreconditionError("missing ByteStreamClient") - } return nil } @@ -109,52 +107,43 @@ func (p *FetchServer) FetchBlob(ctx context.Context, req *rapb.FetchBlobRequest) return nil, err } - var expectedSHA256 string - + storageFunc := req.GetDigestFunction() + if storageFunc == repb.DigestFunction_UNKNOWN { + storageFunc = repb.DigestFunction_SHA256 + } + var checksumFunc repb.DigestFunction_Value + var expectedChecksum string for _, qualifier := range req.GetQualifiers() { - if qualifier.GetName() == checksumQualifier && strings.HasPrefix(qualifier.GetValue(), sha256Prefix) { - b64sha256 := strings.TrimPrefix(qualifier.GetValue(), sha256Prefix) - sha256, err := base64.StdEncoding.DecodeString(b64sha256) - if err != nil { - return nil, status.FailedPreconditionErrorf("Error decoding qualifier %q: %s", qualifier.GetName(), err.Error()) - } - blobDigest := &repb.Digest{ - Hash: fmt.Sprintf("%x", sha256), - // The digest size is unknown since the client only sends up - // the hash. We can look up the size using the Metadata API, - // which looks up only using the hash, so the size we pass here - // doesn't matter. - SizeBytes: 1, + var prefix string + if qualifier.GetName() == ChecksumQualifier { + if strings.HasPrefix(qualifier.GetValue(), sha256Prefix) { + checksumFunc = repb.DigestFunction_SHA256 + prefix = sha256Prefix + } else if strings.HasPrefix(qualifier.GetValue(), blake3Prefix) { + checksumFunc = repb.DigestFunction_BLAKE3 + prefix = blake3Prefix } - expectedSHA256 = blobDigest.Hash - cacheRN := digest.NewResourceName(blobDigest, req.GetInstanceName(), rspb.CacheType_CAS, repb.DigestFunction_SHA256) - - log.CtxInfof(ctx, "Looking up %s in cache", blobDigest.Hash) - - // Lookup metadata to get the correct digest size to be returned to - // the client. - cache := p.env.GetCache() - md, err := cache.Metadata(ctx, cacheRN.ToProto()) + } + if prefix != "" { + b64hash := strings.TrimPrefix(qualifier.GetValue(), prefix) + decodedHash, err := base64.StdEncoding.DecodeString(b64hash) if err != nil { - log.CtxInfof(ctx, "FetchServer failed to get metadata for %s: %s", expectedSHA256, err) - continue + return nil, status.FailedPreconditionErrorf("Error decoding qualifier %q: %s", qualifier.GetName(), err.Error()) } - blobDigest.SizeBytes = md.DigestSizeBytes + expectedChecksum = fmt.Sprintf("%x", decodedHash) + break + } + } + if len(expectedChecksum) != 0 { + blobDigest := p.findBlobInCache(ctx, req.GetInstanceName(), checksumFunc, expectedChecksum) + // If the digestFunc is supplied and differ from the checksum sri, + // after looking up the cached blob using checksum sri, re-upload + // that blob using the requested digestFunc. + if blobDigest != nil && checksumFunc != storageFunc { + blobDigest = p.rewriteToCache(ctx, blobDigest, req.GetInstanceName(), checksumFunc, storageFunc) + } - // Even though we successfully fetched metadata, we need to renew - // the cache entry (using Contains()) to ensure that it doesn't - // expire by the time the client requests it from cache. - cacheRN = digest.NewResourceName(blobDigest, req.GetInstanceName(), rspb.CacheType_CAS, repb.DigestFunction_SHA256) - exists, err := cache.Contains(ctx, cacheRN.ToProto()) - if err != nil { - log.CtxErrorf(ctx, "Failed to renew %s: %s", digest.String(blobDigest), err) - continue - } - if !exists { - log.CtxInfof(ctx, "Blob %s expired before we could renew it", digest.String(blobDigest)) - continue - } - log.CtxInfof(ctx, "FetchServer found %s in cache", digest.String(blobDigest)) + if blobDigest != nil { return &rapb.FetchBlobResponse{ Status: &statuspb.Status{Code: int32(gcodes.OK)}, BlobDigest: blobDigest, @@ -172,7 +161,7 @@ func (p *FetchServer) FetchBlob(ctx context.Context, req *rapb.FetchBlobRequest) if err != nil { return nil, status.InvalidArgumentErrorf("unparsable URI: %q", uri) } - blobDigest, err := mirrorToCache(ctx, p.env.GetByteStreamClient(), req.GetInstanceName(), httpClient, uri, expectedSHA256) + blobDigest, err := mirrorToCache(ctx, p.env.GetByteStreamClient(), req.GetInstanceName(), httpClient, uri, storageFunc, checksumFunc, expectedChecksum) if err != nil { lastFetchErr = err log.CtxWarningf(ctx, "Failed to mirror %q to cache: %s", uri, err) @@ -203,12 +192,81 @@ func (p *FetchServer) FetchDirectory(ctx context.Context, req *rapb.FetchDirecto return nil, status.UnimplementedError("FetchDirectory is not yet implemented") } +func (p *FetchServer) rewriteToCache(ctx context.Context, blobDigest *repb.Digest, instanceName string, fromFunc, toFunc repb.DigestFunction_Value) *repb.Digest { + cacheRN := digest.NewResourceName(blobDigest, instanceName, rspb.CacheType_CAS, fromFunc) + cache := p.env.GetCache() + reader, err := cache.Reader(ctx, cacheRN.ToProto(), 0, 0) + if err != nil { + log.CtxErrorf(ctx, "Failed to get cache reader for %s: %s", digest.String(blobDigest), err) + return nil + } + + tmpFilePath, err := tempCopy(reader) + if err != nil { + log.CtxErrorf(ctx, "Failed to copy from reader to temp for %s: %s", digest.String(blobDigest), err) + return nil + } + defer func() { + if err := os.Remove(tmpFilePath); err != nil { + log.Errorf("Failed to remove temp file: %s", err) + } + }() + + bsClient := p.env.GetByteStreamClient() + storageDigest, err := cachetools.UploadFile(ctx, bsClient, instanceName, toFunc, tmpFilePath) + if err != nil { + log.CtxErrorf(ctx, "Failed to re-upload blob with new digestFunc %s for %s: %s", toFunc, digest.String(blobDigest), err) + return nil + } + return storageDigest +} + +func (p *FetchServer) findBlobInCache(ctx context.Context, instanceName string, checksumFunc repb.DigestFunction_Value, expectedChecksum string) *repb.Digest { + blobDigest := &repb.Digest{ + Hash: expectedChecksum, + // The digest size is unknown since the client only sends up + // the hash. We can look up the size using the Metadata API, + // which looks up only using the hash, so the size we pass here + // doesn't matter. + SizeBytes: 1, + } + cacheRN := digest.NewResourceName(blobDigest, instanceName, rspb.CacheType_CAS, checksumFunc) + log.CtxDebugf(ctx, "Looking up %s in cache", blobDigest.Hash) + + // Lookup metadata to get the correct digest size to be returned to + // the client. + cache := p.env.GetCache() + md, err := cache.Metadata(ctx, cacheRN.ToProto()) + if err != nil { + log.CtxInfof(ctx, "FetchServer failed to get metadata for %s: %s", expectedChecksum, err) + return nil + } + blobDigest.SizeBytes = md.DigestSizeBytes + + // Even though we successfully fetched metadata, we need to renew + // the cache entry (using Contains()) to ensure that it doesn't + // expire by the time the client requests it from cache. + cacheRN = digest.NewResourceName(blobDigest, instanceName, rspb.CacheType_CAS, checksumFunc) + exists, err := cache.Contains(ctx, cacheRN.ToProto()) + if err != nil { + log.CtxErrorf(ctx, "Failed to renew %s: %s", digest.String(blobDigest), err) + return nil + } + if !exists { + log.CtxInfof(ctx, "Blob %s expired before we could renew it", digest.String(blobDigest)) + return nil + } + + log.CtxDebugf(ctx, "FetchServer found %s in cache", digest.String(blobDigest)) + return blobDigest +} + // mirrorToCache uploads the contents at the given URI to the given cache, // returning the digest. The fetched contents are checked against the given -// expectedSHA256 (if non-empty), and if there is a mismatch then an error is +// expectedChecksum (if non-empty), and if there is a mismatch then an error is // returned. -func mirrorToCache(ctx context.Context, bsClient bspb.ByteStreamClient, remoteInstanceName string, httpClient *http.Client, uri, expectedSHA256 string) (*repb.Digest, error) { - log.CtxInfof(ctx, "Fetching %s", uri) +func mirrorToCache(ctx context.Context, bsClient bspb.ByteStreamClient, remoteInstanceName string, httpClient *http.Client, uri string, storageFunc repb.DigestFunction_Value, checksumFunc repb.DigestFunction_Value, expectedChecksum string) (*repb.Digest, error) { + log.CtxDebugf(ctx, "Fetching %s", uri) rsp, err := httpClient.Get(uri) if err != nil { return nil, status.UnavailableErrorf("failed to fetch %q: HTTP GET failed: %s", uri, err) @@ -218,12 +276,12 @@ func mirrorToCache(ctx context.Context, bsClient bspb.ByteStreamClient, remoteIn return nil, status.UnavailableErrorf("failed to fetch %q: HTTP %s", uri, err) } - // If we know what the SHA256 should be and the content length is known, + // If we know what the hash should be and the content length is known, // then we know the full digest, and can pipe directly from the HTTP // response to cache. - if expectedSHA256 != "" && rsp.ContentLength >= 0 { - d := &repb.Digest{Hash: expectedSHA256, SizeBytes: rsp.ContentLength} - rn := digest.NewResourceName(d, remoteInstanceName, rspb.CacheType_CAS, repb.DigestFunction_SHA256) + if checksumFunc == storageFunc && expectedChecksum != "" && rsp.ContentLength >= 0 { + d := &repb.Digest{Hash: expectedChecksum, SizeBytes: rsp.ContentLength} + rn := digest.NewResourceName(d, remoteInstanceName, rspb.CacheType_CAS, storageFunc) if _, err := cachetools.UploadFromReader(ctx, bsClient, rn, rsp.Body); err != nil { return nil, status.UnavailableErrorf("failed to upload %s to cache: %s", digest.String(d), err) } @@ -246,14 +304,29 @@ func mirrorToCache(ctx context.Context, bsClient bspb.ByteStreamClient, remoteIn log.Errorf("Failed to remove temp file: %s", err) } }() - blobDigest, err := cachetools.UploadFile(ctx, bsClient, remoteInstanceName, repb.DigestFunction_SHA256, tmpFilePath) + + // If the requested digestFunc is supplied and differ from the checksum sri, + // verify the downloaded file with the checksum sri before storing it to + // our cache. + if checksumFunc != storageFunc { + checksumDigestRN, err := cachetools.ComputeFileDigest(tmpFilePath, remoteInstanceName, checksumFunc) + if err != nil { + return nil, status.UnavailableErrorf("failed to compute checksum digest: %s", err) + } + if expectedChecksum != "" && checksumDigestRN.GetDigest().GetHash() != expectedChecksum { + return nil, status.InvalidArgumentErrorf("response body checksum for %q was %q but wanted %q", uri, checksumDigestRN.GetDigest().Hash, expectedChecksum) + } + } + blobDigest, err := cachetools.UploadFile(ctx, bsClient, remoteInstanceName, storageFunc, tmpFilePath) if err != nil { return nil, status.UnavailableErrorf("failed to add object to cache: %s", err) } - if expectedSHA256 != "" && blobDigest.Hash != expectedSHA256 { - return nil, status.InvalidArgumentErrorf("response body checksum for %q was %q but wanted %q", uri, blobDigest.Hash, expectedSHA256) + // If the requested digestFunc is supplied is the same with the checksum sri, + // verify the expected checksum of the downloaded file after storing it in our cache. + if checksumFunc == storageFunc && expectedChecksum != "" && blobDigest.Hash != expectedChecksum { + return nil, status.InvalidArgumentErrorf("response body checksum for %q was %q but wanted %q", uri, blobDigest.Hash, expectedChecksum) } - log.CtxInfof(ctx, "Mirrored %s to cache (digest: %s)", uri, digest.String(blobDigest)) + log.CtxDebugf(ctx, "Mirrored %s to cache (digest: %s)", uri, digest.String(blobDigest)) return blobDigest, nil } diff --git a/server/remote_asset/fetch_server/fetch_server_test.go b/server/remote_asset/fetch_server/fetch_server_test.go new file mode 100644 index 00000000000..15fd9d4ef0b --- /dev/null +++ b/server/remote_asset/fetch_server/fetch_server_test.go @@ -0,0 +1,326 @@ +package fetch_server_test + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/hex" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/buildbuddy-io/buildbuddy/proto/resource" + "github.com/buildbuddy-io/buildbuddy/server/remote_asset/fetch_server" + "github.com/buildbuddy-io/buildbuddy/server/remote_cache/byte_stream_server" + "github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest" + "github.com/buildbuddy-io/buildbuddy/server/testutil/testenv" + "github.com/buildbuddy-io/buildbuddy/server/util/prefix" + "github.com/buildbuddy-io/buildbuddy/server/util/scratchspace" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + + rapb "github.com/buildbuddy-io/buildbuddy/proto/remote_asset" + repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution" + bspb "google.golang.org/genproto/googleapis/bytestream" +) + +func runFetchServer(ctx context.Context, env *testenv.TestEnv, t *testing.T) *grpc.ClientConn { + byteStreamServer, err := byte_stream_server.NewByteStreamServer(env) + require.NoError(t, err) + fetchServer, err := fetch_server.NewFetchServer(env) + require.NoError(t, err) + + grpcServer, runFunc := testenv.RegisterLocalGRPCServer(env) + bspb.RegisterByteStreamServer(grpcServer, byteStreamServer) + rapb.RegisterFetchServer(grpcServer, fetchServer) + + go runFunc() + t.Cleanup(func() { grpcServer.GracefulStop() }) + + clientConn, err := testenv.LocalGRPCConn(ctx, env) + require.NoError(t, err) + + env.SetByteStreamClient(bspb.NewByteStreamClient(clientConn)) + return clientConn +} + +func checksumQualifierFromContent(t *testing.T, contentHash string, digestFunc repb.DigestFunction_Value) string { + h, err := hex.DecodeString(contentHash) + require.NoError(t, err) + base64hash := base64.StdEncoding.EncodeToString(h) + + var prefix string + if digestFunc == repb.DigestFunction_UNKNOWN { + prefix = "sha256" + } else { + prefix = strings.ToLower(digestFunc.String()) + } + + return fmt.Sprintf("%s-%s", prefix, base64hash) +} + +func TestFetchBlob(t *testing.T) { + for _, tc := range []struct { + name string + content string + digestFunc repb.DigestFunction_Value + }{ + { + name: "default_digest_func", + content: "default", + digestFunc: repb.DigestFunction_UNKNOWN, + }, + { + name: "sha256_content", + content: "sha256", + digestFunc: repb.DigestFunction_SHA256, + }, + { + name: "blake3_content", + content: "blake3", + digestFunc: repb.DigestFunction_BLAKE3, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + te := testenv.GetTestEnv(t) + clientConn := runFetchServer(ctx, te, t) + fetchClient := rapb.NewFetchClient(clientConn) + + contentDigest, err := digest.Compute(bytes.NewReader([]byte(tc.content)), tc.digestFunc) + require.NoError(t, err) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, tc.content) + })) + defer ts.Close() + + resp, err := fetchClient.FetchBlob(ctx, &rapb.FetchBlobRequest{ + Uris: []string{ts.URL}, + Qualifiers: []*rapb.Qualifier{ + { + Name: fetch_server.ChecksumQualifier, + Value: checksumQualifierFromContent(t, contentDigest.GetHash(), tc.digestFunc), + }, + }, + DigestFunction: tc.digestFunc, + }) + assert.NoError(t, err) + require.NotNil(t, resp) + assert.Equal(t, int32(0), resp.GetStatus().Code) + assert.Equal(t, "", resp.GetStatus().Message) + assert.Contains(t, resp.GetUri(), ts.URL) + assert.Equal(t, contentDigest.GetHash(), resp.GetBlobDigest().GetHash()) + assert.Equal(t, contentDigest.GetSizeBytes(), resp.GetBlobDigest().GetSizeBytes()) + }) + } +} + +// Precompute content to be used in the following tests +const ( + content = "content" + contentSize = int64(7) + + contentSHA256 = "ed7002b439e9ac845f22357d822bac1444730fbdb6016d3ec9432297b9ec9f73" + contentBLAKE3 = "3fba5250be9ac259c56e7250c526bc83bacb4be825f2799d3d59e5b4878dd74e" + + // see checksumQualifierFromContent for logic on recreating these values + sha256CRI = "sha256-7XACtDnprIRfIjV9giusFERzD722AW0+yUMil7nsn3M=" + blake3CRI = "blake3-P7pSUL6awlnFbnJQxSa8g7rLS+gl8nmdPVnltIeN104=" +) + +// TestFetchBlobWithCache verifies that the blob in cache is prioritized +// over the externaly fetching from HTTP upstream. +// Also test that if the checksum qualifier and request use different +// hash algorithms, properly replicate the cached blob from the checksum +// digest function to the request's digest function. +func TestFetchBlobWithCache(t *testing.T) { + for _, tc := range []struct { + name string + contentHash string + checksumFunc repb.DigestFunction_Value + checksumCRI string + storageFunc repb.DigestFunction_Value + }{ + { + name: "checksum_SHA256__storage_SHA256", + contentHash: contentSHA256, + checksumFunc: repb.DigestFunction_SHA256, + checksumCRI: sha256CRI, + storageFunc: repb.DigestFunction_SHA256, + }, + { + name: "checksum_BLAKE3__storage_BLAKE3", + contentHash: contentBLAKE3, + checksumFunc: repb.DigestFunction_BLAKE3, + checksumCRI: blake3CRI, + storageFunc: repb.DigestFunction_BLAKE3, + }, + { + name: "checksum_SHA256__storage_BLAKE3", + contentHash: contentSHA256, + checksumFunc: repb.DigestFunction_SHA256, + checksumCRI: sha256CRI, + storageFunc: repb.DigestFunction_BLAKE3, + }, + { + name: "checksum_BLAKE3__storage_SHA256", + contentHash: contentBLAKE3, + checksumFunc: repb.DigestFunction_BLAKE3, + checksumCRI: blake3CRI, + storageFunc: repb.DigestFunction_SHA256, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + te := testenv.GetTestEnv(t) + require.NoError(t, scratchspace.Init()) + clientConn := runFetchServer(ctx, te, t) + fetchClient := rapb.NewFetchClient(clientConn) + + ctx, err := prefix.AttachUserPrefixToContext(ctx, te) + require.NoError(t, err) + + err = te.GetCache().Set(ctx, digest.NewResourceName(&repb.Digest{ + Hash: tc.contentHash, + SizeBytes: contentSize, + }, "", resource.CacheType_CAS, tc.checksumFunc).ToProto(), []byte("content")) + require.NoError(t, err) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "should not request this", http.StatusForbidden) + })) + defer ts.Close() + + resp, err := fetchClient.FetchBlob(ctx, &rapb.FetchBlobRequest{ + Uris: []string{ts.URL}, + Qualifiers: []*rapb.Qualifier{ + { + Name: fetch_server.ChecksumQualifier, + Value: tc.checksumCRI, + }, + }, + DigestFunction: tc.storageFunc, + }) + require.NoError(t, err) + require.NotNil(t, resp) + assert.Equal(t, int32(0), resp.GetStatus().Code) + + exist, err := te.GetCache().Contains(ctx, digest.NewResourceName(&repb.Digest{ + Hash: resp.GetBlobDigest().GetHash(), + SizeBytes: resp.GetBlobDigest().GetSizeBytes(), + }, "", resource.CacheType_CAS, tc.storageFunc).ToProto()) + require.NoError(t, err) + require.True(t, exist) + }) + } +} + +func TestFetchBlobMismatch(t *testing.T) { + content := "content" + for _, tc := range []struct { + name string + checksumQualifier string + requestedDigestFunc repb.DigestFunction_Value + expectedDigestFunc repb.DigestFunction_Value + expectedHash string + }{ + { + name: "default_digest_func__sri_sha256", + checksumQualifier: sha256CRI, + requestedDigestFunc: repb.DigestFunction_UNKNOWN, + expectedDigestFunc: repb.DigestFunction_SHA256, + expectedHash: contentSHA256, + }, + { + name: "default_digest_func__sri_blake3", + checksumQualifier: blake3CRI, + requestedDigestFunc: repb.DigestFunction_UNKNOWN, + expectedDigestFunc: repb.DigestFunction_SHA256, + expectedHash: contentSHA256, + }, + { + name: "default_digest_func__no_sri", + checksumQualifier: "", + requestedDigestFunc: repb.DigestFunction_UNKNOWN, + expectedDigestFunc: repb.DigestFunction_SHA256, + expectedHash: contentSHA256, + }, + { + name: "sha256_digest_func__sri_blake3", + checksumQualifier: blake3CRI, + requestedDigestFunc: repb.DigestFunction_SHA256, + expectedDigestFunc: repb.DigestFunction_SHA256, + expectedHash: contentSHA256, + }, + { + name: "sha256_digest_func__no_sri", + checksumQualifier: "", + requestedDigestFunc: repb.DigestFunction_SHA256, + expectedDigestFunc: repb.DigestFunction_SHA256, + expectedHash: contentSHA256, + }, + { + name: "blake3_digest_func__sri_sha256", + checksumQualifier: sha256CRI, + requestedDigestFunc: repb.DigestFunction_BLAKE3, + expectedDigestFunc: repb.DigestFunction_BLAKE3, + expectedHash: contentBLAKE3, + }, + { + name: "blake3_digest_func__no_sri", + checksumQualifier: "", + requestedDigestFunc: repb.DigestFunction_BLAKE3, + expectedDigestFunc: repb.DigestFunction_BLAKE3, + expectedHash: contentBLAKE3, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + te := testenv.GetTestEnv(t) + require.NoError(t, scratchspace.Init()) + clientConn := runFetchServer(ctx, te, t) + fetchClient := rapb.NewFetchClient(clientConn) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, content) + })) + defer ts.Close() + + request := &rapb.FetchBlobRequest{ + Uris: []string{ts.URL}, + DigestFunction: tc.requestedDigestFunc, + } + if tc.checksumQualifier != "" { + request.Qualifiers = []*rapb.Qualifier{ + { + Name: fetch_server.ChecksumQualifier, + Value: tc.checksumQualifier, + }, + } + } + resp, err := fetchClient.FetchBlob(ctx, request) + + assert.NoError(t, err) + require.NotNil(t, resp) + assert.Equal(t, int32(0), resp.GetStatus().Code) + assert.Equal(t, "", resp.GetStatus().Message) + assert.Contains(t, resp.GetUri(), ts.URL) + assert.Equal(t, tc.expectedHash, resp.GetBlobDigest().GetHash()) + }) + } +} + +func TestFetchDirectory(t *testing.T) { + ctx := context.Background() + te := testenv.GetTestEnv(t) + clientConn := runFetchServer(ctx, te, t) + fetchClient := rapb.NewFetchClient(clientConn) + + resp, err := fetchClient.FetchDirectory(ctx, &rapb.FetchDirectoryRequest{}) + assert.EqualError(t, err, "rpc error: code = Unimplemented desc = FetchDirectory is not yet implemented") + assert.Nil(t, resp) +}