Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3) s3 client #15740

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
34 changes: 34 additions & 0 deletions src/bun.js/bindings/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5819,6 +5819,40 @@ extern "C" int JSC__JSValue__toISOString(JSC::JSGlobalObject* globalObject, Enco
return charactersWritten;
}

extern "C" int JSC__JSValue__DateNowISOString(JSC::JSGlobalObject* globalObject, char* buf)
{
char buffer[29];
JSC::DateInstance* thisDateObj = JSC::DateInstance::create(globalObject->vm(), globalObject->dateStructure(), globalObject->jsDateNow());

if (!std::isfinite(thisDateObj->internalNumber()))
return -1;

auto& vm = globalObject->vm();

const GregorianDateTime* gregorianDateTime = thisDateObj->gregorianDateTimeUTC(vm.dateCache);
if (!gregorianDateTime)
return -1;

// If the year is outside the bounds of 0 and 9999 inclusive we want to use the extended year format (ES 15.9.1.15.1).
int ms = static_cast<int>(fmod(thisDateObj->internalNumber(), msPerSecond));
if (ms < 0)
ms += msPerSecond;

int charactersWritten;
if (gregorianDateTime->year() > 9999 || gregorianDateTime->year() < 0)
charactersWritten = snprintf(buffer, sizeof(buffer), "%+07d-%02d-%02dT%02d:%02d:%02d.%03dZ", gregorianDateTime->year(), gregorianDateTime->month() + 1, gregorianDateTime->monthDay(), gregorianDateTime->hour(), gregorianDateTime->minute(), gregorianDateTime->second(), ms);
else
charactersWritten = snprintf(buffer, sizeof(buffer), "%04d-%02d-%02dT%02d:%02d:%02d.%03dZ", gregorianDateTime->year(), gregorianDateTime->month() + 1, gregorianDateTime->monthDay(), gregorianDateTime->hour(), gregorianDateTime->minute(), gregorianDateTime->second(), ms);

memcpy(buf, buffer, charactersWritten);

ASSERT(charactersWritten > 0 && static_cast<unsigned>(charactersWritten) < sizeof(buffer));
if (static_cast<unsigned>(charactersWritten) >= sizeof(buffer))
return -1;

return charactersWritten;
}

#pragma mark - WebCore::DOMFormData

CPP_DECL void WebCore__DOMFormData__append(WebCore__DOMFormData* arg0, ZigString* arg1, ZigString* arg2)
Expand Down
10 changes: 10 additions & 0 deletions src/bun.js/bindings/bindings.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4305,6 +4305,15 @@ pub const JSValue = enum(i64) {

return buf[0..@as(usize, @intCast(count))];
}
extern fn JSC__JSValue__DateNowISOString(*JSGlobalObject, f64) JSValue;
pub fn getDateNowISOString(globalObject: *JSC.JSGlobalObject, buf: *[28]u8) []const u8 {
const count = JSC__JSValue__DateNowISOString(globalObject, buf);
if (count < 0) {
return "";
}

return buf[0..@as(usize, @intCast(count))];
}

/// Return the pointer to the wrapped object only if it is a direct instance of the type.
/// If the object does not match the type, return null.
Expand Down Expand Up @@ -4364,6 +4373,7 @@ pub const JSValue = enum(i64) {
}

extern fn JSC__JSValue__dateInstanceFromNumber(*JSGlobalObject, f64) JSValue;

pub fn fromDateNumber(globalObject: *JSGlobalObject, value: f64) JSValue {
JSC.markBinding(@src());
return JSC__JSValue__dateInstanceFromNumber(globalObject, value);
Expand Down
6 changes: 6 additions & 0 deletions src/bun.js/event_loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const ReadFileTask = WebCore.Blob.ReadFile.ReadFileTask;
const WriteFileTask = WebCore.Blob.WriteFile.WriteFileTask;
const napi_async_work = JSC.napi.napi_async_work;
const FetchTasklet = Fetch.FetchTasklet;
const S3HttpSimpleTask = @import("../s3.zig").AWSCredentials.S3HttpSimpleTask;
const JSValue = JSC.JSValue;
const js = JSC.C;
const Waker = bun.Async.Waker;
Expand Down Expand Up @@ -407,6 +408,7 @@ const ServerAllConnectionsClosedTask = @import("./api/server.zig").ServerAllConn
// Task.get(ReadFileTask) -> ?ReadFileTask
pub const Task = TaggedPointerUnion(.{
FetchTasklet,
S3HttpSimpleTask,
AsyncGlobWalkTask,
AsyncTransformTask,
ReadFileTask,
Expand Down Expand Up @@ -991,6 +993,10 @@ pub const EventLoop = struct {
var fetch_task: *Fetch.FetchTasklet = task.get(Fetch.FetchTasklet).?;
fetch_task.onProgressUpdate();
},
.S3HttpSimpleTask => {
var s3_task: *S3HttpSimpleTask = task.get(S3HttpSimpleTask).?;
s3_task.onResponse();
},
@field(Task.Tag, @typeName(AsyncGlobWalkTask)) => {
var globWalkTask: *AsyncGlobWalkTask = task.get(AsyncGlobWalkTask).?;
globWalkTask.*.runFromJS();
Expand Down
192 changes: 190 additions & 2 deletions src/bun.js/webcore/blob.zig
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ const default_allocator = bun.default_allocator;
const FeatureFlags = bun.FeatureFlags;
const ArrayBuffer = @import("../base.zig").ArrayBuffer;
const Properties = @import("../base.zig").Properties;

const getAllocator = @import("../base.zig").getAllocator;

const Environment = @import("../../env.zig");
Expand All @@ -44,6 +43,7 @@ const Request = JSC.WebCore.Request;

const libuv = bun.windows.libuv;

const AWS = @import("../../s3.zig").AWSCredentials;
const PathOrBlob = union(enum) {
path: JSC.Node.PathOrFileDescriptor,
blob: Blob,
Expand Down Expand Up @@ -147,6 +147,14 @@ pub const Blob = struct {

pub fn doReadFile(this: *Blob, comptime Function: anytype, global: *JSGlobalObject) JSValue {
bloblog("doReadFile", .{});
if (this.isS3()) {
const WrappedFn = struct {
pub fn wrapped(b: *Blob, g: *JSGlobalObject, by: []u8) JSC.JSValue {
return JSC.toJSHostValue(g, Function(b, g, by, .clone));
}
};
return S3BlobDownloadTask.init(global, this, WrappedFn.wrapped);
}

const Handler = NewReadFileHandler(Function);

Expand Down Expand Up @@ -3423,12 +3431,192 @@ pub const Blob = struct {
return JSValue.jsBoolean(bun.isRegularFile(store.data.file.mode) or bun.C.S.ISFIFO(store.data.file.mode));
}

fn isS3(this: *Blob) bool {
if (this.store) |store| {
if (store.data == .file) {
cirospaciari marked this conversation as resolved.
Show resolved Hide resolved
if (store.data.file.pathlike == .path) {
const slice = store.data.file.pathlike.path.slice();
return strings.startsWith(slice, "s3://");
}
}
}
return false;
}

const S3BlobDownloadTask = struct {
blob: Blob,
globalThis: *JSC.JSGlobalObject,
promise: JSC.JSPromise.Strong,
poll_ref: bun.Async.KeepAlive = .{},

handler: S3ReadHandler,
usingnamespace bun.New(S3BlobDownloadTask);
pub const S3ReadHandler = *const fn (this: *Blob, globalthis: *JSGlobalObject, raw_bytes: []u8) JSValue;

pub fn callHandler(this: *S3BlobDownloadTask, raw_bytes: []u8) JSValue {
return this.handler(&this.blob, this.globalThis, raw_bytes);
}
pub fn onS3DownloadResolved(result: AWS.S3DownloadResult, this: *S3BlobDownloadTask) void {
defer this.deinit();
switch (result) {
.not_found => {
const js_err = this.globalThis.createErrorInstance("File not found", .{});
js_err.put(this.globalThis, ZigString.static("code"), ZigString.init("FileNotFound").toJS(this.globalThis));
this.promise.reject(this.globalThis, js_err);
},
.success => |response| {
const bytes = response.body.list.items;
if (this.blob.size == Blob.max_size) {
this.blob.size = @truncate(bytes.len);
}
JSC.AnyPromise.wrap(.{ .normal = this.promise.get() }, this.globalThis, S3BlobDownloadTask.callHandler, .{ this, bytes });
},
.failure => |err| {
const js_err = this.globalThis.createErrorInstance("{s}", .{err.message});
js_err.put(this.globalThis, ZigString.static("code"), ZigString.init(err.code).toJS(this.globalThis));
this.promise.rejectOnNextTick(this.globalThis, js_err);
},
}
}

pub fn init(globalThis: *JSC.JSGlobalObject, blob: *Blob, handler: S3BlobDownloadTask.S3ReadHandler) JSValue {
blob.store.?.ref();

const this = S3BlobDownloadTask.new(.{
.globalThis = globalThis,
.blob = blob.*,
.promise = JSC.JSPromise.Strong.init(globalThis),
.handler = handler,
});
const promise = this.promise.value();
const env = this.globalThis.bunVM().bundler.env;
const credentials = env.getAWSCredentials();
const url = bun.URL.parse(this.blob.store.?.data.file.pathlike.path.slice());
this.poll_ref.ref(globalThis.bunVM());
const path = url.s3Path();
if (blob.offset > 0) {
const len: ?usize = if (blob.size != Blob.max_size) @intCast(blob.size) else null;
const offset: usize = @intCast(blob.offset);
credentials.s3DownloadSlice(path, offset, len, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null);
} else if (blob.size == Blob.max_size) {
credentials.s3Download(path, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null);
} else {
const len: usize = @intCast(blob.size);
const offset: usize = @intCast(blob.offset);
credentials.s3DownloadSlice(path, offset, len, @ptrCast(&S3BlobDownloadTask.onS3DownloadResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null);
}
return promise;
}

pub fn deinit(this: *S3BlobDownloadTask) void {
this.blob.store.?.deref();
this.poll_ref.unrefOnNextTick(this.globalThis.bunVM());
this.promise.deinit();
this.destroy();
}
};

const S3BlobStatTask = struct {
blob: *Blob,
globalThis: *JSC.JSGlobalObject,
promise: JSC.JSPromise.Strong,
strong_ref: JSC.Strong,
poll_ref: bun.Async.KeepAlive = .{},
usingnamespace bun.New(S3BlobStatTask);

pub fn onS3StatResolved(result: AWS.S3StatResult, this: *S3BlobStatTask) void {
defer this.deinit();
switch (result) {
.not_found => {
this.promise.resolve(this.globalThis, .false);
},
.success => |stat| {
if (this.blob.size == Blob.max_size) {
this.blob.size = @truncate(stat.size);
}
this.promise.resolve(this.globalThis, .true);
},
.failure => |err| {
const js_err = this.globalThis.createErrorInstance("{s}", .{err.message});
js_err.put(this.globalThis, ZigString.static("code"), ZigString.init(err.code).toJS(this.globalThis));
this.promise.rejectOnNextTick(this.globalThis, js_err);
},
}
}

pub fn init(globalThis: *JSC.JSGlobalObject, blob: *Blob, js_blob: JSValue) JSValue {
const this = S3BlobStatTask.new(.{
.globalThis = globalThis,
.blob = blob,
.promise = JSC.JSPromise.Strong.init(globalThis),
.strong_ref = JSC.Strong.create(js_blob, globalThis),
});
const promise = this.promise.value();
const env = this.globalThis.bunVM().bundler.env;
const credentials = env.getAWSCredentials();
const url = bun.URL.parse(this.blob.store.?.data.file.pathlike.path.slice());
this.poll_ref.ref(globalThis.bunVM());

credentials.s3Stat(url.s3Path(), @ptrCast(&S3BlobStatTask.onS3StatResolved), this, if (env.getHttpProxy(url)) |proxy| proxy.href else null);
return promise;
}

pub fn deinit(this: *S3BlobStatTask) void {
this.poll_ref.unrefOnNextTick(this.globalThis.bunVM());
this.strong_ref.deinit();
this.promise.deinit();
this.destroy();
}
};

pub fn getPresignUrl(this: *Blob, globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSValue {
if (this.isS3()) {
const args = callframe.arguments_old(1);
var method: bun.http.Method = .GET;
var expires: usize = 86400; // 1 day
if (args.len == 1) {
cirospaciari marked this conversation as resolved.
Show resolved Hide resolved
const options = args.ptr[0];
if (options.isObject()) {
if (try options.getTruthyComptime(globalThis, "method")) |method_| {
method = Method.fromJS(globalThis, method_) orelse {
return globalThis.throwInvalidArguments("method must be GET, PUT, DELETE or HEAD when using s3 protocol", .{});
};
}
if (try options.getTruthyComptime(globalThis, "expiresIn")) |expires_| {
cirospaciari marked this conversation as resolved.
Show resolved Hide resolved
const coerced = expires_.coerce(i32, globalThis);
if (coerced <= 0) return globalThis.throwInvalidArguments("expiresIn must be greather than 0", .{});
expires = @intCast(coerced);
}
}
}
const url = bun.URL.parse(this.store.?.data.file.pathlike.path.slice());
const env = this.globalThis.bunVM().bundler.env;
const credentials = env.getAWSCredentials();
const result = credentials.signRequest(url.s3Path(), method, null, .{ .expires = expires }) catch |sign_err| {
return switch (sign_err) {
error.MissingCredentials => globalThis.throwError(sign_err, "missing s3 credentials"),
error.InvalidMethod => globalThis.throwError(sign_err, "method must be GET, PUT, DELETE or HEAD when using s3 protocol"),
error.InvalidPath => globalThis.throwError(sign_err, "invalid s3 bucket, key combination"),
else => globalThis.throwError(error.SignError, "failed to retrieve s3 content check your credentials"),
};
};
defer result.deinit();
var str = bun.String.fromUTF8(result.url);
return str.transferToJS(this.globalThis);
}
return globalThis.throwError(error.NotSupported, "is only possible to presign s3:// files");
}

// This mostly means 'can it be read?'
pub fn getExists(
this: *Blob,
globalThis: *JSC.JSGlobalObject,
_: *JSC.CallFrame,
this_value: JSC.JSValue,
) bun.JSError!JSValue {
if (this.isS3()) {
return S3BlobStatTask.init(globalThis, this, this_value);
}
return JSC.JSPromise.resolvedPromiseValue(globalThis, this.getExistsSync());
}

Expand Down Expand Up @@ -3783,7 +3971,7 @@ pub const Blob = struct {
if (this.store) |store| {
if (store.data == .file) {
// last_modified can be already set during read.
if (store.data.file.last_modified == JSC.init_timestamp) {
if (store.data.file.last_modified == JSC.init_timestamp and !this.isS3()) {
resolveFileStat(store);
}
return JSValue.jsNumber(store.data.file.last_modified);
Expand Down
4 changes: 3 additions & 1 deletion src/bun.js/webcore/response.classes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ export default [
slice: { fn: "getSlice", length: 2 },
stream: { fn: "getStream", length: 1 },
formData: { fn: "getFormData" },
exists: { fn: "getExists", length: 0 },
exists: { fn: "getExists", length: 0, passThis: true },
// Non-standard, s3 support (migrate to a new place if needed)
presign: { fn: "getPresignUrl", length: 1 },

// Non-standard, but consistent!
bytes: { fn: "getBytes" },
Expand Down
Loading
Loading