Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
cirospaciari committed Dec 14, 2024
1 parent fe15667 commit ea9dfdd
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 104 deletions.
151 changes: 122 additions & 29 deletions src/bun.js/webcore/blob.zig
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ const Request = JSC.WebCore.Request;

const libuv = bun.windows.libuv;

const AWS = @import("../../s3.zig").AWSCredentials;
const AWSCredentials = @import("../../s3.zig").AWSCredentials;
const AWS = AWSCredentials;

const PathOrBlob = union(enum) {
path: JSC.Node.PathOrFileDescriptor,
blob: Blob,
Expand Down Expand Up @@ -271,6 +273,10 @@ pub const Blob = struct {
blob.resolveSize();
}
switch (store.data) {
.s3 => |_| {
// TODO: s3
// we need to make this async and use s3Download/s3DownloadSlice
},
.file => |file| {

// TODO: make this async + lazy
Expand Down Expand Up @@ -690,6 +696,15 @@ pub const Blob = struct {
{
const store = this.store.?;
switch (store.data) {
.s3 => |s3| {
try writer.writeAll(comptime Output.prettyFmt("<r>S3Ref<r>", enable_ansi_colors));
try writer.print(
comptime Output.prettyFmt(" (<green>\"{s}\"<r>)<r>", enable_ansi_colors),
.{
s3.pathlike.slice(),
},
);
},
.file => |file| {
try writer.writeAll(comptime Output.prettyFmt("<r>FileRef<r>", enable_ansi_colors));
switch (file.pathlike) {
Expand Down Expand Up @@ -1439,6 +1454,7 @@ pub const Blob = struct {
return globalThis.throwInvalidArguments("new Blob() expects an Array", .{});
},
};
//TODO: S3 add more options here for credentials

if (blob.store) |store_| {
switch (store_.data) {
Expand All @@ -1447,7 +1463,7 @@ pub const Blob = struct {
(name_value_str.toUTF8WithoutRef(bun.default_allocator).clone(bun.default_allocator) catch unreachable).slice(),
);
},
.file => {
.s3, .file => {
blob.name = name_value_str.dupeRef();
},
}
Expand Down Expand Up @@ -1524,6 +1540,7 @@ pub const Blob = struct {
store.data.bytes.len;
},
.file => size += store.data.file.pathlike.estimatedSize(),
.s3 => size += store.data.s3.estimatedSize(),
}
}

Expand Down Expand Up @@ -1650,6 +1667,13 @@ pub const Blob = struct {
}
};

if (path == .path) {
if (strings.startsWith(path.path.slice(), "s3://")) {
const credentials = globalThis.bunVM().bundler.env.getAWSCredentials();
return Blob.initWithStore(Blob.Store.initS3(path.path, null, credentials, allocator) catch bun.outOfMemory(), globalThis);
}
}

return Blob.initWithStore(Blob.Store.initFile(path, null, allocator) catch bun.outOfMemory(), globalThis);
}

Expand All @@ -1666,7 +1690,7 @@ pub const Blob = struct {
pub fn size(this: *const Store) SizeType {
return switch (this.data) {
.bytes => this.data.bytes.len,
.file => Blob.max_size,
.s3, .file => Blob.max_size,
};
}

Expand All @@ -1675,6 +1699,7 @@ pub const Blob = struct {
pub const Data = union(enum) {
bytes: ByteStore,
file: FileStore,
s3: S3Store,
};

pub fn ref(this: *Store) void {
Expand Down Expand Up @@ -1703,6 +1728,30 @@ pub const Blob = struct {
this.deref();
}

pub fn initS3(pathlike: JSC.Node.PathLike, mime_type: ?http.MimeType, credentials: AWSCredentials, allocator: std.mem.Allocator) !*Store {
const store = Blob.Store.new(.{
.data = .{
.s3 = S3Store.init(
pathlike,
mime_type orelse brk: {
const sliced = pathlike.slice();
if (sliced.len > 0) {
var extname = std.fs.path.extension(sliced);
extname = std.mem.trim(u8, extname, ".");
if (http.MimeType.byExtensionNoDefault(extname)) |mime| {
break :brk mime;
}
}
break :brk null;
},
credentials,
),
},
.allocator = allocator,
.ref_count = std.atomic.Value(u32).init(1),
});
return store;
}
pub fn initFile(pathlike: JSC.Node.PathOrFileDescriptor, mime_type: ?http.MimeType, allocator: std.mem.Allocator) !*Store {
const store = Blob.Store.new(.{
.data = .{
Expand Down Expand Up @@ -1772,6 +1821,9 @@ pub const Blob = struct {
}
}
},
.s3 => |s3| {
s3.deinit(allocator);
},
}

this.destroy();
Expand Down Expand Up @@ -1800,6 +1852,14 @@ pub const Blob = struct {
},
}
},
.s3 => |s3| {
const pathlike_tag: JSC.Node.PathOrFileDescriptor.SerializeTag = .path;
try writer.writeInt(u8, @intFromEnum(pathlike_tag), .little);

const path_slice = s3.pathlike.slice();
try writer.writeInt(u32, @as(u32, @truncate(path_slice.len)), .little);
try writer.writeAll(path_slice);
},
.bytes => |bytes| {
const slice = bytes.slice();
try writer.writeInt(u32, @truncate(slice.len), .little);
Expand Down Expand Up @@ -3171,6 +3231,44 @@ pub const Blob = struct {
}
};

pub const S3Store = struct {
pathlike: JSC.Node.PathLike,
mime_type: http.MimeType = http.MimeType.other,
credentials: AWSCredentials,
pub fn isSeekable(_: *const @This()) ?bool {
return true;
}

pub fn getCredentials(this: *@This()) AWSCredentials {
return this.credentials;
}

pub fn path(this: *@This()) []const u8 {
return bun.URL.parse(this.pathlike.slice()).s3Path();
}

pub fn init(pathlike: JSC.Node.PathLike, mime_type: ?http.MimeType, credentials: AWSCredentials) S3Store {
return .{
.credentials = credentials,
.pathlike = pathlike,
.mime_type = mime_type orelse http.MimeType.other,
};
}
pub fn estimatedSize(this: *@This()) usize {
// TODO: credentials size here
return this.pathlike.estimatedSize();
}

pub fn deinit(this: *const @This(), allocator: std.mem.Allocator) void {
if (this.pathlike == .string) {
allocator.free(@constCast(this.pathlike.slice()));
} else {
this.pathlike.deinit();
}
//TODO: clear credentials
}
};

pub const ByteStore = struct {
ptr: [*]u8 = undefined,
len: SizeType = 0,
Expand Down Expand Up @@ -3433,12 +3531,7 @@ pub const Blob = struct {

fn isS3(this: *Blob) bool {
if (this.store) |store| {
if (store.data == .file) {
if (store.data.file.pathlike == .path) {
const slice = store.data.file.pathlike.path.slice();
return strings.startsWith(slice, "s3://");
}
}
return store.data == .s3;
}
return false;
}
Expand Down Expand Up @@ -3490,20 +3583,20 @@ pub const Blob = struct {
});
const promise = this.promise.value();
const env = this.globalThis.bunVM().bundler.env;
const credentials = env.getAWSCredentials();
const url = bun.URL.parse(this.blob.store.?.data.file.pathlike.path.slice());
const credentials = this.blob.store.?.data.s3.getCredentials();
const path = this.blob.store.?.data.s3.path();

this.poll_ref.ref(globalThis.bunVM());
const path = url.s3Path();
if (blob.offset > 0) {
const len: ?usize = if (blob.size != Blob.max_size) @intCast(blob.size) else null;
const offset: usize = @intCast(blob.offset);
credentials.s3DownloadSlice(path, offset, len, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null);
credentials.s3DownloadSlice(path, offset, len, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(true, null)) |proxy| proxy.href else null);
} else if (blob.size == Blob.max_size) {
credentials.s3Download(path, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null);
credentials.s3Download(path, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(true, null)) |proxy| proxy.href else null);
} else {
const len: usize = @intCast(blob.size);
const offset: usize = @intCast(blob.offset);
credentials.s3DownloadSlice(path, offset, len, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null);
credentials.s3DownloadSlice(path, offset, len, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(true, null)) |proxy| proxy.href else null);
}
return promise;
}
Expand Down Expand Up @@ -3554,12 +3647,12 @@ pub const Blob = struct {
.strong_ref = JSC.Strong.create(js_blob, globalThis),
});
const promise = this.promise.value();
const env = this.globalThis.bunVM().bundler.env;
const credentials = env.getAWSCredentials();
const url = bun.URL.parse(this.blob.store.?.data.file.pathlike.path.slice());
this.poll_ref.ref(globalThis.bunVM());
const credentials = this.blob.store.?.data.s3.getCredentials();
const path = this.blob.store.?.data.s3.path();
const env = globalThis.bunVM().bundler.env;

credentials.s3Stat(url.s3Path(), @ptrCast(&S3BlobStatTask.onS3StatResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null);
credentials.s3Stat(path, @ptrCast(&S3BlobStatTask.onS3StatResolved), this, if (env.getHttpProxy(true, null)) |proxy| proxy.href else null);
return promise;
}

Expand Down Expand Up @@ -3590,10 +3683,10 @@ pub const Blob = struct {
}
}
}
const url = bun.URL.parse(this.store.?.data.file.pathlike.path.slice());
const env = this.globalThis.bunVM().bundler.env;
const credentials = env.getAWSCredentials();
const result = credentials.signRequest(url.s3Path(), method, null, .{ .expires = expires }) catch |sign_err| {
const credentials = this.store.?.data.s3.getCredentials();
const path = this.store.?.data.s3.path();

const result = credentials.signRequest(path, method, null, .{ .expires = expires }, false) catch |sign_err| {
return switch (sign_err) {
error.MissingCredentials => globalThis.throwError(sign_err, "missing s3 credentials"),
error.InvalidMethod => globalThis.throwError(sign_err, "method must be GET, PUT, DELETE or HEAD when using s3 protocol"),
Expand Down Expand Up @@ -3636,16 +3729,16 @@ pub const Blob = struct {
var store = this.store orelse {
return globalThis.throwInvalidArguments("Blob is detached", .{});
};
if (this.isS3()) {
const env = globalThis.bunVM().bundler.env;
const credentials = this.store.?.data.s3.getCredentials();
const path = this.store.?.data.s3.path();

return try credentials.s3WritableStream(path, globalThis, if (env.getHttpProxy(true, null)) |proxy| proxy.href else null);
}
if (store.data != .file) {
return globalThis.throwInvalidArguments("Blob is read-only", .{});
}
if (this.isS3()) {
const env = globalThis.bunVM().bundler.env;
const credentials = env.getAWSCredentials();
const url = bun.URL.parse(store.data.file.pathlike.path.slice());
return try credentials.s3WritableStream(url.s3Path(), globalThis, if (env.getHttpProxy(url)) |proxy| proxy.href else null);
}

if (Environment.isWindows) {
const pathlike = store.data.file.pathlike;
Expand Down
6 changes: 3 additions & 3 deletions src/bun.js/webcore/response.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1890,10 +1890,10 @@ pub const Fetch = struct {
var proxy: ?ZigURL = null;
if (fetch_options.proxy) |proxy_opt| {
if (!proxy_opt.isEmpty()) { //if is empty just ignore proxy
proxy = fetch_options.proxy orelse jsc_vm.bundler.env.getHttpProxy(fetch_options.url);
proxy = fetch_options.proxy orelse jsc_vm.bundler.env.getHttpProxyFor(fetch_options.url);
}
} else {
proxy = jsc_vm.bundler.env.getHttpProxy(fetch_options.url);
proxy = jsc_vm.bundler.env.getHttpProxyFor(fetch_options.url);
}

if (fetch_tasklet.check_server_identity.has() and fetch_tasklet.reject_unauthorized) {
Expand Down Expand Up @@ -3270,7 +3270,7 @@ pub const Fetch = struct {
}

// TODO: should we generate the content hash? presigned never uses content-hash, maybe only if a extra option is passed to avoid the cost
var result = credentials.signRequest(url.s3Path(), method, null, null) catch |sign_err| {
var result = credentials.signRequest(url.s3Path(), method, null, null, false) catch |sign_err| {
switch (sign_err) {
error.MissingCredentials => {
const err = JSC.toTypeError(.ERR_INVALID_ARG_VALUE, "missing s3 credentials", .{}, ctx);
Expand Down
39 changes: 35 additions & 4 deletions src/bun.js/webcore/streams.zig
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ pub const ReadableStream = struct {

return reader.toReadableStream(globalThis);
},
.s3 => {
// TODO: S3 readableStream
return .undefined;
},
}
}

Expand Down Expand Up @@ -2611,8 +2615,12 @@ pub const FetchTaskletChunkedRequestSink = struct {
buffer: bun.io.StreamBuffer,
ended: bool = false,
done: bool = false,
aws_check_sum: bool = false,

auto_flusher: AutoFlusher = AutoFlusher{},

hasher: std.hash.Crc32 = std.hash.Crc32.init(),

pub usingnamespace bun.New(FetchTaskletChunkedRequestSink);
const HTTPWritableStream = union(enum) {
fetch: *JSC.WebCore.Fetch.FetchTasklet,
Expand Down Expand Up @@ -2646,6 +2654,7 @@ pub const FetchTaskletChunkedRequestSink = struct {
if (this.ended) {
return .{ .result = {} };
}

switch (stream_start) {
.chunk_size => |chunk_size| {
if (chunk_size > 0) {
Expand Down Expand Up @@ -2702,15 +2711,37 @@ pub const FetchTaskletChunkedRequestSink = struct {
if (is_last) this.done = true;

if (data.len == 0) {
sendRequestData(task, bun.http.end_of_chunked_http1_1_encoding_response_body, true);
if (this.aws_check_sum) {
const final = this.hasher.final();
var encoded_buf: [8]u8 = undefined;
const checksum = std.base64.standard.Encoder.encode(&encoded_buf, std.mem.asBytes(&final));
const chunk = std.fmt.allocPrint(bun.default_allocator, "{x}\r\n{s}\r\n0\r\nx-amz-checksum-crc32:{s}\r\n\r\n", .{ data.len, data, checksum }) catch return error.OOM;

sendRequestData(task, chunk, true);
} else {
sendRequestData(task, bun.http.end_of_chunked_http1_1_encoding_response_body, true);
}
return;
}

// chunk encoding is really simple
if (is_last) {
const chunk = std.fmt.allocPrint(bun.default_allocator, "{x}\r\n{s}\r\n0\r\n\r\n", .{ data.len, data }) catch return error.OOM;
sendRequestData(task, chunk, true);
if (this.aws_check_sum) {
this.hasher.update(data);
const final = this.hasher.final();
var encoded_buf: [8]u8 = undefined;
const checksum = std.base64.standard.Encoder.encode(&encoded_buf, std.mem.asBytes(&final));

const chunk = std.fmt.allocPrint(bun.default_allocator, "{x}\r\n{s}\r\n0\r\nx-amz-checksum-crc32:{s}\r\n\r\n", .{ data.len, data, checksum }) catch return error.OOM;
sendRequestData(task, chunk, true);
} else {
const chunk = std.fmt.allocPrint(bun.default_allocator, "{x}\r\n{s}\r\n0\r\n\r\n", .{ data.len, data }) catch return error.OOM;
sendRequestData(task, chunk, true);
}
} else {
if (this.aws_check_sum) {
this.hasher.update(data);
}
const chunk = std.fmt.allocPrint(bun.default_allocator, "{x}\r\n{s}\r\n", .{ data.len, data }) catch return error.OOM;
sendRequestData(task, chunk, false);
}
Expand Down Expand Up @@ -3989,7 +4020,7 @@ pub const FileReader = struct {
var file_type: bun.io.FileType = .file;
if (this.lazy == .blob) {
switch (this.lazy.blob.data) {
.bytes => @panic("Invalid state in FileReader: expected file "),
.s3, .bytes => @panic("Invalid state in FileReader: expected file "),
.file => |*file| {
defer {
this.lazy.blob.deref();
Expand Down
Loading

0 comments on commit ea9dfdd

Please sign in to comment.