Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[k2] add file functions in runtime light #1161

Merged
merged 27 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 17 additions & 20 deletions builtin-functions/kphp-light/file.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
<?php

// === SUPPORTED ===

define('FILE_APPEND', 1);

// === SUPPORTED ===
define('STDIN', 'php://stdin');
define('STDOUT', 'php://stdout');
define('STDERR', 'php://stderr');

function basename ($name ::: string, $suffix ::: string = ''): string;


define('STREAM_CLIENT_CONNECT', 1);
define('DEFAULT_SOCKET_TIMEOUT', 60);

function stream_socket_client ($url ::: string, &$error_number ::: mixed = TODO, &$error_description ::: mixed = TODO, $timeout ::: float = DEFAULT_SOCKET_TIMEOUT, $flags ::: int = STREAM_CLIENT_CONNECT, $context ::: mixed = null) ::: mixed;
function fopen ($filename ::: string, $mode ::: string) ::: mixed;
/** @kphp-extern-func-info interruptible */
function fwrite ($stream ::: mixed, $text ::: string) ::: int | false;
function fflush ($stream ::: mixed) ::: bool;
function fclose ($stream ::: mixed) ::: bool;

function file_get_contents ($name ::: string) ::: string | false;

// === UNSUPPORTED ===

/** @kphp-extern-func-info generate-stub */
Expand All @@ -18,8 +34,6 @@ function dirname ($name ::: string) ::: string;
/** @kphp-extern-func-info generate-stub */
function file ($name ::: string) ::: string[] | false;
/** @kphp-extern-func-info generate-stub */
function file_get_contents ($name ::: string) ::: string | false;
/** @kphp-extern-func-info generate-stub */
function file_put_contents ($name ::: string, $content ::: mixed, $flags ::: int = 0) ::: int | false;
/** @kphp-extern-func-info generate-stub */
function file_exists ($name ::: string) ::: bool;
Expand Down Expand Up @@ -54,19 +68,10 @@ function unlink ($name ::: string) ::: bool;
/** @kphp-extern-func-info generate-stub */
function fgetcsv ($stream ::: mixed, $length ::: int = 0, $delimiter ::: string = ",", $enclosure ::: string = "\"", $escape ::: string = "\\") ::: mixed[] | false;

define('STDIN', 'php://stdin');
define('STDOUT', 'php://stdout');
define('STDERR', 'php://stderr');


define('SEEK_SET', 0);
define('SEEK_END', 1);
define('SEEK_CUR', 2);

/** @kphp-extern-func-info generate-stub */
function fopen ($filename ::: string, $mode ::: string) ::: mixed;
/** @kphp-extern-func-info generate-stub */
function fwrite ($stream ::: mixed, $text ::: string) ::: int | false;
/** @kphp-extern-func-info generate-stub */
function fseek ($stream ::: mixed, $offset ::: int, $whence ::: int = SEEK_SET) ::: int;
/** @kphp-extern-func-info generate-stub */
Expand All @@ -82,19 +87,11 @@ function fpassthru ($stream ::: mixed) ::: int | false;
/** @kphp-extern-func-info generate-stub */
function fgets ($stream ::: mixed, $length ::: int = -1) ::: string | false;
/** @kphp-extern-func-info generate-stub */
function fflush ($stream ::: mixed) ::: bool;
/** @kphp-extern-func-info generate-stub */
function feof ($stream ::: mixed) ::: bool;
/** @kphp-extern-func-info generate-stub */
function fclose ($stream ::: mixed) ::: bool;

define('STREAM_CLIENT_CONNECT', 1);
define('DEFAULT_SOCKET_TIMEOUT', 60);

/** @kphp-extern-func-info generate-stub */
function stream_context_create ($options ::: mixed = array()) ::: mixed;

function stream_socket_client ($url ::: string, &$error_number ::: mixed = TODO, &$error_description ::: mixed = TODO, $timeout ::: float = DEFAULT_SOCKET_TIMEOUT, $flags ::: int = STREAM_CLIENT_CONNECT, $context ::: mixed = null) ::: mixed;

function fprintf ($stream, $format ::: string, ...$args) ::: int;
function fputcsv ($stream, $fields ::: array, $delimiter = ",", $enclosure = "\"", $escape = "\\") ::: int | false;
Expand Down
17 changes: 10 additions & 7 deletions runtime-light/coroutine/awaitable.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
#include <cstdint>
#include <memory>
#include <optional>
#include <tuple>
#include <type_traits>
#include <utility>

#include "runtime-common/core/utils/kphp-assert-core.h"
#include "runtime-light/coroutine/task.h"
#include "runtime-light/k2-platform/k2-api.h"
#include "runtime-light/scheduler/scheduler.h"
#include "runtime-light/state/instance-state.h"
#include "runtime-light/stdlib/fork/fork-state.h"
Expand Down Expand Up @@ -66,7 +68,7 @@ class wait_for_update_t : public awaitable_impl_::fork_id_watcher_t {
, suspend_token(std::noop_coroutine(), WaitEvent::UpdateOnStream{.stream_d = stream_d}) {}

wait_for_update_t(wait_for_update_t &&other) noexcept
: stream_d(std::exchange(other.stream_d, INVALID_PLATFORM_DESCRIPTOR))
: stream_d(std::exchange(other.stream_d, k2::INVALID_PLATFORM_DESCRIPTOR))
, suspend_token(std::exchange(other.suspend_token, std::make_pair(std::noop_coroutine(), WaitEvent::Rechedule{})))
, state(std::exchange(other.state, awaitable_impl_::State::End)) {}

Expand Down Expand Up @@ -146,7 +148,7 @@ class wait_for_incoming_stream_t : awaitable_impl_::fork_id_watcher_t {
state = awaitable_impl_::State::End;
fork_id_watcher_t::await_resume();
const auto incoming_stream_d{InstanceState::get().take_incoming_stream()};
php_assert(incoming_stream_d != INVALID_PLATFORM_DESCRIPTOR);
php_assert(incoming_stream_d != k2::INVALID_PLATFORM_DESCRIPTOR);
return incoming_stream_d;
}

Expand Down Expand Up @@ -213,7 +215,7 @@ class wait_for_reschedule_t : awaitable_impl_::fork_id_watcher_t {

class wait_for_timer_t : awaitable_impl_::fork_id_watcher_t {
std::chrono::nanoseconds duration;
uint64_t timer_d{INVALID_PLATFORM_DESCRIPTOR};
uint64_t timer_d{k2::INVALID_PLATFORM_DESCRIPTOR};
SuspendToken suspend_token{std::noop_coroutine(), WaitEvent::Rechedule{}};
awaitable_impl_::State state{awaitable_impl_::State::Init};

Expand All @@ -223,7 +225,7 @@ class wait_for_timer_t : awaitable_impl_::fork_id_watcher_t {

wait_for_timer_t(wait_for_timer_t &&other) noexcept
: duration(std::exchange(other.duration, std::chrono::nanoseconds{0}))
, timer_d(std::exchange(other.timer_d, INVALID_PLATFORM_DESCRIPTOR))
, timer_d(std::exchange(other.timer_d, k2::INVALID_PLATFORM_DESCRIPTOR))
, suspend_token(std::exchange(other.suspend_token, std::make_pair(std::noop_coroutine(), WaitEvent::Rechedule{})))
, state(std::exchange(other.state, awaitable_impl_::State::End)) {}

Expand All @@ -235,7 +237,7 @@ class wait_for_timer_t : awaitable_impl_::fork_id_watcher_t {
if (state == awaitable_impl_::State::Suspend) {
cancel();
}
if (timer_d != INVALID_PLATFORM_DESCRIPTOR) {
if (timer_d != k2::INVALID_PLATFORM_DESCRIPTOR) {
InstanceState::get().release_stream(timer_d);
}
}
Expand All @@ -247,8 +249,9 @@ class wait_for_timer_t : awaitable_impl_::fork_id_watcher_t {

void await_suspend(std::coroutine_handle<> coro) noexcept {
state = awaitable_impl_::State::Suspend;
timer_d = InstanceState::get().set_timer(duration);
if (timer_d != INVALID_PLATFORM_DESCRIPTOR) {
int32_t errc{};
std::tie(timer_d, errc) = InstanceState::get().set_timer(duration);
if (errc == k2::errno_ok) {
suspend_token = std::make_pair(coro, WaitEvent::UpdateOnTimer{.timer_d = timer_d});
}
CoroutineScheduler::get().suspend(suspend_token);
Expand Down
18 changes: 16 additions & 2 deletions runtime-light/k2-platform/k2-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#pragma once

#include <cerrno>
#include <cstddef>
#include <cstdint>
#include <memory>
Expand All @@ -22,6 +23,11 @@ inline constexpr size_t DEFAULT_MEMORY_ALIGN = 16;
} // namespace k2_impl_

inline constexpr int32_t errno_ok = 0;
inline constexpr int32_t errno_einval = EINVAL;

inline constexpr uint64_t INVALID_PLATFORM_DESCRIPTOR = 0;

enum class StreamKind : uint8_t { Component, UDP, TCP };

using IOStatus = IOStatus;

Expand Down Expand Up @@ -161,8 +167,16 @@ inline auto arg_fetch(uint32_t arg_num) noexcept {
key_buffer[key_len] = '\0';
value_buffer[value_len] = '\0';

return std::make_pair(std::unique_ptr<char, decltype(std::addressof(k2::free))>{key_buffer, std::addressof(k2::free)},
std::unique_ptr<char, decltype(std::addressof(k2::free))>{value_buffer, std::addressof(k2::free)});
return std::make_pair(std::unique_ptr<char, decltype(std::addressof(k2::free))>{key_buffer, k2::free},
std::unique_ptr<char, decltype(std::addressof(k2::free))>{value_buffer, k2::free});
}

inline int32_t udp_connect(uint64_t *socket_d, const char *host, size_t host_len) noexcept {
return k2_udp_connect(socket_d, host, host_len);
}

inline int32_t tcp_connect(uint64_t *socket_d, const char *host, size_t host_len) noexcept {
return k2_tcp_connect(socket_d, host, host_len);
}

} // namespace k2
7 changes: 4 additions & 3 deletions runtime-light/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <utility>
#include <variant>

#include "runtime-light/k2-platform/k2-api.h"
#include "runtime-light/state/instance-state.h"

// === SimpleCoroutineScheduler ===================================================================
Expand Down Expand Up @@ -42,7 +43,7 @@ ScheduleStatus SimpleCoroutineScheduler::scheduleOnIncomingStream() noexcept {
}

ScheduleStatus SimpleCoroutineScheduler::scheduleOnStreamUpdate(uint64_t stream_d) noexcept {
if (stream_d == INVALID_PLATFORM_DESCRIPTOR) {
if (stream_d == k2::INVALID_PLATFORM_DESCRIPTOR) {
return ScheduleStatus::Error;
} else if (const auto it_token{awaiting_for_update_tokens.find(stream_d)}; it_token != awaiting_for_update_tokens.cend()) {
const auto token{it_token->second};
Expand Down Expand Up @@ -90,12 +91,12 @@ void SimpleCoroutineScheduler::suspend(SuspendToken token) noexcept {
} else if constexpr (std::is_same_v<event_t, WaitEvent::IncomingStream>) {
awaiting_for_stream_tokens.push_back(token);
} else if constexpr (std::is_same_v<event_t, WaitEvent::UpdateOnStream>) {
if (event.stream_d == INVALID_PLATFORM_DESCRIPTOR) {
if (event.stream_d == k2::INVALID_PLATFORM_DESCRIPTOR) {
return;
}
awaiting_for_update_tokens.emplace(event.stream_d, token);
} else if constexpr (std::is_same_v<event_t, WaitEvent::UpdateOnTimer>) {
if (event.timer_d == INVALID_PLATFORM_DESCRIPTOR) {
if (event.timer_d == k2::INVALID_PLATFORM_DESCRIPTOR) {
return;
}
awaiting_for_update_tokens.emplace(event.timer_d, token);
Expand Down
2 changes: 1 addition & 1 deletion runtime-light/state/init-functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ task_t<uint64_t> init_kphp_server_component() noexcept {
// release standard stream in case of a no reply job worker since we don't need that stream anymore
if (JobWorkerServerInstanceState::get().kind == JobWorkerServerInstanceState::Kind::NoReply) {
InstanceState::get().release_stream(stream_d);
stream_d = INVALID_PLATFORM_DESCRIPTOR;
stream_d = k2::INVALID_PLATFORM_DESCRIPTOR;
}
break;
}
Expand Down
50 changes: 34 additions & 16 deletions runtime-light/state/instance-state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,39 +167,57 @@ void InstanceState::process_platform_updates() noexcept {
uint64_t InstanceState::take_incoming_stream() noexcept {
if (incoming_streams_.empty()) {
php_warning("can't take incoming stream cause we don't have them");
return INVALID_PLATFORM_DESCRIPTOR;
return k2::INVALID_PLATFORM_DESCRIPTOR;
}
const auto stream_d{incoming_streams_.front()};
incoming_streams_.pop_front();
php_debug("take incoming stream %" PRIu64, stream_d);
return stream_d;
}

uint64_t InstanceState::open_stream(std::string_view component_name_view) noexcept {
std::pair<uint64_t, int32_t> InstanceState::open_stream(std::string_view component_name, k2::StreamKind stream_kind) noexcept {
uint64_t stream_d{};
if (const auto open_stream_res{k2::open(std::addressof(stream_d), component_name_view.size(), component_name_view.data())}; open_stream_res != k2::errno_ok) {
php_warning("can't open stream to %s", component_name_view.data());
return INVALID_PLATFORM_DESCRIPTOR;
int32_t error_code{};
switch (stream_kind) {
case k2::StreamKind::Component:
error_code = k2::open(std::addressof(stream_d), component_name.size(), component_name.data());
break;
case k2::StreamKind::TCP:
error_code = k2::tcp_connect(std::addressof(stream_d), component_name.data(), component_name.size());
break;
case k2::StreamKind::UDP:
error_code = k2::udp_connect(std::addressof(stream_d), component_name.data(), component_name.size());
break;
default:
error_code = k2::errno_einval;
break;
}
opened_streams_.insert(stream_d);
php_debug("opened a stream %" PRIu64 " to %s", stream_d, component_name_view.data());
return stream_d;

if (error_code == k2::errno_ok) [[likely]] {
opened_streams_.insert(stream_d);
php_debug("opened a stream %" PRIu64 " to %s", stream_d, component_name.data());
} else {
php_warning("can't open stream to %s", component_name.data());
}
return {stream_d, error_code};
}

uint64_t InstanceState::set_timer(std::chrono::nanoseconds duration) noexcept {
std::pair<uint64_t, int32_t> InstanceState::set_timer(std::chrono::nanoseconds duration) noexcept {
uint64_t timer_d{};
if (const auto set_timer_res{k2::new_timer(std::addressof(timer_d), static_cast<uint64_t>(duration.count()))}; set_timer_res != k2::errno_ok) {
int32_t error_code{k2::new_timer(std::addressof(timer_d), static_cast<uint64_t>(duration.count()))};

if (error_code == k2::errno_ok) [[likely]] {
opened_streams_.insert(timer_d);
php_debug("set timer %" PRIu64 " for %.9f sec", timer_d, std::chrono::duration<double>(duration).count());
} else {
php_warning("can't set timer for %.9f sec", std::chrono::duration<double>(duration).count());
return INVALID_PLATFORM_DESCRIPTOR;
}
opened_streams_.insert(timer_d);
php_debug("set timer %" PRIu64 " for %.9f sec", timer_d, std::chrono::duration<double>(duration).count());
return timer_d;
return {timer_d, error_code};
}

void InstanceState::release_stream(uint64_t stream_d) noexcept {
if (stream_d == standard_stream_) {
standard_stream_ = INVALID_PLATFORM_DESCRIPTOR;
standard_stream_ = k2::INVALID_PLATFORM_DESCRIPTOR;
}
opened_streams_.erase(stream_d);
pending_updates_.erase(stream_d); // also erase pending updates if exists
Expand All @@ -208,7 +226,7 @@ void InstanceState::release_stream(uint64_t stream_d) noexcept {
}

void InstanceState::release_all_streams() noexcept {
standard_stream_ = INVALID_PLATFORM_DESCRIPTOR;
standard_stream_ = k2::INVALID_PLATFORM_DESCRIPTOR;
for (const auto stream_d : opened_streams_) {
k2::free_descriptor(stream_d);
php_debug("released a stream %" PRIu64, stream_d);
Expand Down
16 changes: 6 additions & 10 deletions runtime-light/state/instance-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <cstddef>
#include <cstdint>
#include <string_view>
#include <utility>

#include "common/mixin/not_copyable.h"
#include "runtime-common/core/memory-resource/resource_allocator.h"
Expand All @@ -21,7 +22,7 @@
#include "runtime-light/server/job-worker/job-worker-server-state.h"
#include "runtime-light/stdlib/crypto/crypto-state.h"
#include "runtime-light/stdlib/curl/curl-state.h"
#include "runtime-light/stdlib/file/file-stream-state.h"
#include "runtime-light/stdlib/file/file-system-state.h"
#include "runtime-light/stdlib/fork/fork-state.h"
#include "runtime-light/stdlib/job-worker/job-worker-client-state.h"
#include "runtime-light/stdlib/math/random-state.h"
Expand All @@ -32,8 +33,6 @@
#include "runtime-light/stdlib/string/string-state.h"
#include "runtime-light/stdlib/system/system-state.h"

inline constexpr uint64_t INVALID_PLATFORM_DESCRIPTOR = 0;

// Coroutine scheduler type. Change it here if you want to use another scheduler
using CoroutineScheduler = SimpleCoroutineScheduler;
static_assert(CoroutineSchedulerConcept<CoroutineScheduler>);
Expand Down Expand Up @@ -103,12 +102,9 @@ struct InstanceState final : vk::not_copyable {
}
uint64_t take_incoming_stream() noexcept;

uint64_t open_stream(std::string_view) noexcept;
uint64_t open_stream(const string &component_name) noexcept {
return open_stream(std::string_view{component_name.c_str(), static_cast<size_t>(component_name.size())});
}
std::pair<uint64_t, int32_t> open_stream(std::string_view, k2::StreamKind) noexcept;
std::pair<uint64_t, int32_t> set_timer(std::chrono::nanoseconds) noexcept;

uint64_t set_timer(std::chrono::nanoseconds) noexcept;
void release_stream(uint64_t) noexcept;
void release_all_streams() noexcept;

Expand All @@ -134,7 +130,7 @@ struct InstanceState final : vk::not_copyable {
CryptoInstanceState crypto_instance_state{};
StringInstanceState string_instance_state{};
SystemInstanceState system_instance_state{};
FileStreamInstanceState file_stream_instance_state{};
FileSystemInstanceState file_system_instance_state{};

list<task_t<void>> shutdown_functions;

Expand All @@ -145,7 +141,7 @@ struct InstanceState final : vk::not_copyable {
shutdown_state shutdown_state_{shutdown_state::not_started};

ImageKind image_kind_{ImageKind::Invalid};
uint64_t standard_stream_{INVALID_PLATFORM_DESCRIPTOR};
uint64_t standard_stream_{k2::INVALID_PLATFORM_DESCRIPTOR};
deque<uint64_t> incoming_streams_;
unordered_set<uint64_t> opened_streams_;
unordered_set<uint64_t> pending_updates_;
Expand Down
Loading
Loading