diff --git a/base/client.jl b/base/client.jl index 867821d6c13f0..6ef8d41055cd8 100644 --- a/base/client.jl +++ b/base/client.jl @@ -30,7 +30,7 @@ quit() = exit() function repl_callback(ast::ANY, show_value) # use root task to execute user input global _repl_enough_stdin = true - #stop_reading(STDIN) + stop_reading(STDIN) STDIN.readcb = false put(_jl_repl_channel, (ast, show_value)) end diff --git a/base/multi.jl b/base/multi.jl index 2492a9fd70909..c5cef7d93083b 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -206,7 +206,7 @@ function add_workers(PGRP::ProcessGroup, w::Array{Any,1}) push(PGRP.workers, w[i]) w[i].id = PGRP.np+i send_msg_now(w[i], w[i].id, newlocs) - Deserializer(message_handler_loop,w[i].socket) + create_message_handler_loop(w[i].socket) end PGRP.locs = newlocs PGRP.np += n @@ -720,6 +720,7 @@ function perform_work(job::WorkItem) global Waiting, Workqueue local result try + ccall(:jl_register_toplevel_eh, Void, ()) if isa(job.task,Task) # continuing interrupted work item arg = job.argument @@ -732,11 +733,10 @@ function perform_work(job::WorkItem) result = yieldto(job.task) end catch e - #show(e) print("exception on ", myid(), ": ") show(e) println() - result = e + result = e.e end # restart job by yielding back to whatever task just switched to us job.task = current_task().last @@ -799,8 +799,6 @@ function deliver_result(sock::Stream, msg, oid, value) end end -deliver_result(sock::Deserializer,msg,oid,value) = deliver_result(sock.stream,msg,oid,value) - const _jl_empty_cell_ = {} function deliver_result(sock::(), msg, oid, value_thunk) global Waiting @@ -864,96 +862,99 @@ function accept_handler(server::TcpSocket, status::Int32) if err!=0 print("accept error: ", _uv_lasterror(globalEventLoop()), "\n") else - Deserializer(message_handler_loop,client) + create_message_handler_loop(client) end end type DisconnectException <: Exception end -function message_handler_loop(this::Deserializer) - global PGRP - #println("message_handler_loop") - refs = (PGRP::ProcessGroup).refs - if PGRP.np == 0 - # first connection; get process group info from client - PGRP.myid = force(deserialize(this)) - PGRP.locs = locs = force(deserialize(this)) - #print("\nLocation: ",locs,"\nId:",PGRP.myid,"\n") - # joining existing process group - PGRP.np = length(PGRP.locs) - PGRP.workers = w = cell(PGRP.np) - w[1] = Worker("", 0, this.stream, 1) - for i = 2:(PGRP.myid-1) - w[i] = Worker(locs[i].host, locs[i].port) - w[i].id = i - Deserializer(message_handler_loop,w[i].socket) - send_msg_now(w[i], :identify_socket, PGRP.myid) - end - w[PGRP.myid] = LocalProcess() - for i = (PGRP.myid+1):PGRP.np - w[i] = nothing - end - end - #println("loop") - while true - #try - msg = force(deserialize(this)) - #println("got msg: ",msg) - # handle message - if is(msg, :call) || is(msg, :call_fetch) || is(msg, :call_wait) - id = force(deserialize(this)) - f = deserialize(this) - args = deserialize(this) - #print("$(myid()) got call $id\n") - wi = schedule_call(id, f, args) - if is(msg, :call_fetch) - wi.notify = (this, :call_fetch, id, wi.notify) - elseif is(msg, :call_wait) - wi.notify = (this, :wait, id, wi.notify) +function create_message_handler_loop(this::AsyncStream) #returns immediately + enq_work(@task begin + global PGRP + #println("message_handler_loop") + refs = (PGRP::ProcessGroup).refs + start_reading(this) + if PGRP.np == 0 + # first connection; get process group info from client + PGRP.myid = force(deserialize(this)) + PGRP.locs = locs = force(deserialize(this)) + #print("\nLocation: ",locs,"\nId:",PGRP.myid,"\n") + # joining existing process group + PGRP.np = length(PGRP.locs) + PGRP.workers = w = cell(PGRP.np) + w[1] = Worker("", 0, this, 1) + for i = 2:(PGRP.myid-1) + w[i] = Worker(locs[i].host, locs[i].port) + w[i].id = i + create_message_handler_loop(w[i].socket) + send_msg_now(w[i], :identify_socket, PGRP.myid) end - elseif is(msg, :do) - f = deserialize(this) - args = deserialize(this) - #print("got args: $args\n") - let func=f, ar=args - enq_work(WorkItem(()->apply(force(func),force(ar)))) + w[PGRP.myid] = LocalProcess() + for i = (PGRP.myid+1):PGRP.np + w[i] = nothing end - elseif is(msg, :result) - # used to deliver result of wait or fetch - mkind = force(deserialize(this)) - oid = force(deserialize(this)) - val = deserialize(this) - deliver_result((), mkind, oid, val) - elseif is(msg, :identify_socket) - otherid = force(deserialize(this)) - _jl_identify_socket(otherid, this.stream) - else - # the synchronization messages - oid = force(deserialize(this))::(Int,Int) - wi = lookup_ref(oid) - if wi.done - deliver_result(this.stream, msg, oid, work_result(wi)) + end + #println("loop") + while true + #try + msg = force(deserialize(this)) + #println("got msg: ",msg) + # handle message + if is(msg, :call) || is(msg, :call_fetch) || is(msg, :call_wait) + id = force(deserialize(this)) + f = deserialize(this) + args = deserialize(this) + #print("$(myid()) got call $id\n") + wi = schedule_call(id, f, args) + if is(msg, :call_fetch) + wi.notify = (this, :call_fetch, id, wi.notify) + elseif is(msg, :call_wait) + wi.notify = (this, :wait, id, wi.notify) + end + elseif is(msg, :do) + f = deserialize(this) + args = deserialize(this) + #print("got args: $args\n") + let func=f, ar=args + enq_work(WorkItem(()->apply(force(func),force(ar)))) + end + elseif is(msg, :result) + # used to deliver result of wait or fetch + mkind = force(deserialize(this)) + oid = force(deserialize(this)) + val = deserialize(this) + deliver_result((), mkind, oid, val) + elseif is(msg, :identify_socket) + otherid = force(deserialize(this)) + _jl_identify_socket(otherid, this) else - # add to WorkItem's notify list - # TODO: should store the worker here, not the socket, - # so we don't need to look up the worker later - wi.notify = (this, msg, oid, wi.notify) + # the synchronization messages + oid = force(deserialize(this))::(Int,Int) + wi = lookup_ref(oid) + if wi.done + deliver_result(this, msg, oid, work_result(wi)) + else + # add to WorkItem's notify list + # TODO: should store the worker here, not the socket, + # so we don't need to look up the worker later + wi.notify = (this, msg, oid, wi.notify) + end end - end - #catch e - # if isa(e,EOFError) - # print("eof. $(myid()) exiting\n") - # stop_reading(this.stream) - # # TODO: remove machine from group - # throw(DisconnectException()) - # else - # print("deserialization error: ", e, "\n") - # #while nb_available(sock) > 0 #|| select(sock) - # # read(sock, Uint8) - # #end - # end - #end - end + #catch e + # if isa(e,EOFError) + # print("eof. $(myid()) exiting\n") + # stop_reading(this) + # # TODO: remove machine from group + # throw(DisconnectException()) + # else + # print("deserialization error: ", e, "\n") + # #while nb_available(sock) > 0 #|| select(sock) + # # read(sock, Uint8) + # #end + # end + #end + end + end) end ## worker creation and setup ## @@ -1017,21 +1018,22 @@ function start_remote_workers(machines, cmds) break end end - w[i] = wrker = Worker(hostname, port) - - # redirect console output from workers to the client's stdout: - start_reading(stream,function(stream::AsyncStream,nread::Int) - if(nread>0) - try - line = readbytes(stream.buffer, nread) - print("\tFrom worker $(wrker.id):\t",line) - catch e - println("\tError parsing reply from worker $(wrker.id):\t",e) - return false + let wrker = Worker(hostname, port) + w[i] = wrker + # redirect console output from workers to the client's stdout: + start_reading(stream,function(stream::AsyncStream,nread::Int) + if(nread>0) + try + line = readbytes(stream.buffer, nread) + print("\tFrom worker $(wrker.id):\t",line) + catch e + println("\tError parsing reply from worker $(wrker.id):\t",e) + return false + end end - end - true - end) + true + end) + end end w end @@ -1613,13 +1615,12 @@ function event_loop(isclient) multi_cb_handles.work_cb = SingleAsyncWork(globalEventLoop(),_jl_work_cb) multi_cb_handles.fgcm = SingleAsyncWork(globalEventLoop(),(args...)->flush_gc_msgs()); timer = TimeoutAsyncWork(globalEventLoop(),(args...)->queueAsync(multi_cb_handles.work_cb)) - startTimer(timer,int64(1),int64(10000)) #do work every 10s + startTimer(timer,int64(1),int64(1000)) #do work every 10s iserr, lasterr = false, () while true try ccall(:jl_register_toplevel_eh, Void, ()) if iserr - println(typeof(lasterr)) show(lasterr) iserr, lasterr = false, () else diff --git a/base/serialize.jl b/base/serialize.jl index a2671d32238ed..d0480fe7c9720 100644 --- a/base/serialize.jl +++ b/base/serialize.jl @@ -243,49 +243,6 @@ force(x::Function) = x() # we actually make objects. this allows for constructing objects that might # interfere with I/O by reading, writing, blocking, etc. -type Deserializer <: Stream #TODO: rename to SyncStream - stream::AsyncStream - function Deserializer(loop::Function,stream::AsyncStream) - this = new(stream) - enq_work(() -> loop(this)) - start_reading(stream) - this - end -end -show(io::IO,d::Deserializer) = print(io,"Deserializer()") - -function read{T}(this::Deserializer, a::Array{T}) - if isa(T, BitsKind) - nb = numel(a)*sizeof(T) - buf = this.stream.buffer - assert(buf.seekable == false) - assert(buf.maxsize >= nb) - wait_readnb(this.stream,nb) - read(this.stream.buffer, a) - return a - else - #error("Read from Buffer only supports bits types or arrays of bits types; got $T.") - error("Read from Buffer only supports bits types or arrays of bits types") - end -end - -function read(this::Deserializer,::Type{Uint8}) - buf = this.stream.buffer - assert(buf.seekable == false) - wait_readnb(this.stream,1) - read(buf,Uint8) -end - -function readline(this::Deserializer) - buf = this.stream.buffer - assert(buf.seekable == false) - wait_readline(this.stream) - readline(buf) -end - -write(::Deserializer,args...) = error("write not implemented for deserializer") -position(d::Deserializer) = d.pos - function deserialize(s) b = int32(read(s, Uint8)) if b == 0 diff --git a/base/stream.jl b/base/stream.jl index bf553b5664669..28f98210dc7c6 100644 --- a/base/stream.jl +++ b/base/stream.jl @@ -1,7 +1,5 @@ -#TODO: function writeall(Cmd, String) #TODO: Move stdio detection from C to Julia (might require some Clang magic) - ## types ## typealias Executable Union(Vector{ByteString},Function) @@ -11,7 +9,7 @@ type WaitTask filter::Callback #runs task only if false localdata::Any WaitTask(task::Task,test::Callback,localdata) = new(task,test,localdata) - WaitTask(task::Task) = new(task, false) + WaitTask(task::Task) = new(task, false, nothing) end abstract AsyncStream <: Stream @@ -124,7 +122,7 @@ type TcpSocket <: Socket this = TcpSocket(C_NULL,false) this.handle = ccall(:jl_make_tcp,Ptr{Void},(Ptr{Void},TcpSocket),globalEventLoop(),this) if(this.handle == C_NULL) - error("Failed to start reading: ",_uv_lasterror(globalEventLoop())) + error("Failed to start reading: ",_uv_lasterror()) end this end @@ -214,7 +212,8 @@ function tasknotify(waittasks::Vector{WaitTask}, args...) newwts = WaitTask[] ct = current_task() for wt in waittasks - if (isa(wt.filter, Function) ? wt.filter(wt.localdata, args...) : wt) === false + f = wt.filter + if (isa(f, Function) ? f(wt.localdata, args...) : f) === false work = WorkItem(wt.task) work.argument = args enq_work(work) @@ -236,7 +235,7 @@ wait_readline_filter(w::AsyncStream, args...) = memchr(w.buffer,'\n') <= 0 #general form of generated calls is: wait_(o::NotificationObject, [args::AsRequired...]) for (fcn, notify, filter_fcn, types) in ((:wait_exit,:closenotify,:wait_exit_filter,:Process), #close happens almost immediately after exit, but gives I/O time to finish - (:wait_connect,:connectnotify,:wait_connect_filter,:AsyncStream), + (:wait_connected,:connectnotify,:wait_connect_filter,:AsyncStream), (:wait_close,:closenotify,:wait_close_filter,:(Union(AsyncStream,Process))), (:wait_readable,:readnotify,:wait_readable_filter,:AsyncStream), (:wait_readline,:readnotify,:wait_readline_filter,:AsyncStream), @@ -279,9 +278,6 @@ for (fcn, notify, filter_fcn, types) in end args end - if isa($types,Tuple) - $fcn(x,y...) = $fcn((x,y...)) #allow either form - end end end wait_exit(x::ProcessChain) = wait_exit(x.processes) @@ -298,6 +294,42 @@ function wait_success(x::Union(Process,Vector{Process})) kill(x) success(x) end +wait_readnb(a::AsyncStream,b::Int) = wait_readnb((a,b)) +function wait_accept(server::TcpSocket) + client = TcpSocket() + err = accept(server,client) + if err == 0 + return client + else + err = _uv_lasterror() + if err != 4 #EAGAIN + error("accept error: ", err, "\n") + end + end + ct = current_task() + tw = WaitTask(ct) + while true + push(server.connectnotify,tw) + ct.runnable = false + args = yield() + if isa(args,InterruptException) + error(args) + end + status = args[2]::Int32 + if status == -1 + error("listen error: ", _uv_lasterror(), "\n") + end + err = accept(server,client) + if err == 0 + return client + else + err = _uv_lasterror() + if err != 4 #EAGAIN + error("accept error: ", err, "\n") + end + end + end +end #from `connect` function _uv_hook_connectcb(sock::AsyncStream, status::Int32) @@ -314,7 +346,7 @@ function _uv_hook_connectioncb(sock::AsyncStream, status::Int32) tasknotify(sock.connectnotify, sock, status) end -function _jl_listen(sock::AsyncStream,backlog::Int32,cb::Function) +function _jl_listen(sock::AsyncStream,backlog::Int32,cb::Callback) sock.ccb = cb ccall(:jl_listen,Int32,(Ptr{Void},Int32),sock.handle,backlog) end @@ -324,7 +356,7 @@ _jl_tcp_accept(server::Ptr,client::Ptr) = ccall(:uv_accept,Int32,(Ptr{Void},Ptr{ accept(server::TcpSocket,client::TcpSocket) = _jl_tcp_accept(server.handle,client.handle) connect(sock::TcpSocket,addr::Ip4Addr) = ccall(:jl_tcp4_connect,Int32,(Ptr{Void},Uint32,Uint16),sock.handle,addr.host,hton(addr.port)) -function open_any_tcp_port(preferred_port::Uint16,cb::Function) +function open_any_tcp_port(preferred_port::Uint16,cb::Callback) socket = TcpSocket(); addr = Ip4Addr(preferred_port,uint32(0)) #bind prefereed port on all adresses while true @@ -338,7 +370,7 @@ function open_any_tcp_port(preferred_port::Uint16,cb::Function) end return (addr.port,socket) end -open_any_tcp_port(preferred_port::Integer,cb::Function)=open_any_tcp_port(uint16(preferred_port),cb) +open_any_tcp_port(preferred_port::Integer,cb::Callback)=open_any_tcp_port(uint16(preferred_port),cb) ## BUFFER ## ## Allocate a simple buffer @@ -381,7 +413,7 @@ function _uv_hook_readcb(stream::AsyncStream, nread::Int, base::Ptr, len::Int32) if(isa(stream.closecb,Function)) stream.closecb() end - if(_uv_lasterror(globalEventLoop()) != 1) #UV_EOF == 1 + if(_uv_lasterror() != 1) #UV_EOF == 1 error("Failed to start reading: ",_uv_lasterror(globalEventLoop())) end #EOF @@ -515,19 +547,21 @@ function link_pipe(read_end2::NamedPipe,readable_julia_only::Bool,write_end::Ptr read_end2.handle = malloc_pipe() end link_pipe(read_end2.handle,readable_julia_only,write_end,writeable_julia_only,read_end2) + read_end2.open = true end function link_pipe(read_end::Ptr{Void},readable_julia_only::Bool,write_end::NamedPipe,writeable_julia_only::Bool) if(write_end.handle == C_NULL) write_end.handle = malloc_pipe() end link_pipe(read_end,readable_julia_only,write_end.handle,writeable_julia_only,write_end) + write_end.open = true end close_pipe_sync(handle::UVHandle) = ccall(:uv_pipe_close_sync,Void,(UVHandle,),handle) function close(stream::AsyncStream) if stream.open - ccall(:jl_close_uv,Void,(Ptr{Void},),stream.handle) stream.open = false + ccall(:jl_close_uv,Void,(Ptr{Void},),stream.handle) end end @@ -552,6 +586,36 @@ function readall(stream::AsyncStream) return takebuf_string(stream.buffer) end +function read{T}(this::AsyncStream, a::Array{T}) + if isa(T, BitsKind) + nb = numel(a)*sizeof(T) + buf = this.buffer + assert(buf.seekable == false) + assert(buf.maxsize >= nb) + wait_readnb(this,nb) + read(this.buffer, a) + return a + else + #error("Read from Buffer only supports bits types or arrays of bits types; got $T.") + error("Read from Buffer only supports bits types or arrays of bits types") + end +end + +function read(this::AsyncStream,::Type{Uint8}) + buf = this.buffer + assert(buf.seekable == false) + wait_readnb(this,1) + read(buf,Uint8) +end + +function readline(this::AsyncStream) + buf = this.buffer + assert(buf.seekable == false) + start_reading(this) + wait_readline(this) + readline(buf) +end + show(io, p::Process) = print(io, "Process(", p.cmd, ", ", process_status(p), ")") function finish_read(pipe::NamedPipe) @@ -974,11 +1038,12 @@ _jl_getaddrinfo(loop::Ptr,host::ByteString,service::Ptr,cb::Function) = ccall(:j _jl_sockaddr_from_addrinfo(addrinfo::Ptr) = ccall(:jl_sockaddr_from_addrinfo,Ptr{Void},(Ptr,),addrinfo) _jl_sockaddr_set_port(ptr::Ptr{Void},port::Uint16) = ccall(:jl_sockaddr_set_port,Void,(Ptr{Void},Uint16),ptr,port) _uv_lasterror(loop::Ptr{Void}) = ccall(:jl_last_errno,Int32,(Ptr{Void},),loop) +_uv_lasterror() = _uv_lasterror(globalEventLoop()) function connect_callback(sock::TcpSocket,status::Int32) #println("connect_callback") if(status==-1) - error("Socket connection failed: ",_uv_lasterror(globalEventLoop())) + error("Socket connection failed: ",_uv_lasterror()) end sock.open = true; end @@ -1010,7 +1075,7 @@ function connect_to_host(host::ByteString,port::Uint16) if(err!=0) error("Failed to initilize request to resolve hostname: ",host) end - wait_connect(sock) + wait_connected(sock) return sock end diff --git a/extras/julia_web_base.jl b/extras/julia_web_base.jl index af623e875ce75..78ccb57978c02 100644 --- a/extras/julia_web_base.jl +++ b/extras/julia_web_base.jl @@ -1,6 +1,4 @@ -import Base.* - -########################################## +########################################### # protocol ########################################### @@ -20,38 +18,20 @@ import Base.* # import the message types load("webrepl_msgtypes_h.jl") -#macro debug_only(x); x; end -macro debug_only(x); end - ########################################### # set up the socket connection ########################################### # open a socket on any port -function connect_cb(server::AsyncStream,status::Int32) - @debug_only println(STDERR,"Julia: Client instance connected") - global __client - if(status == -1) - error("An error occured during the creation of the server") - end - client = TcpSocket() - __client = client - err = accept(server,client) - if err!=0 - print("accept error: ", Base._uv_lasterror(Base.globalEventLoop()), "\n") - else - p=__PartialMessageBuffer() - client.readcb = (args...)->__socket_callback(client,p,args...) - Base.start_reading(client) - end -end - -(port,sock) = Base.open_any_tcp_port(4444,connect_cb) +(port,sock) = Base.open_any_tcp_port(4444,false) # print the socket number so the server knows what it is println(STDOUT,int16(port)) -@debug_only println(STDERR,"Julia Instance Started") +# wait for the server to connect to the socket +__io = Base.wait_accept(sock) +Base.start_reading(__io) +println("__io connection accepted") ########################################### # protocol implementation @@ -61,40 +41,42 @@ println(STDOUT,int16(port)) type __Message msg_type::Uint8 args::Array{Any, 1} - __Message(msg_type::Uint8,args::Array{Any,1})=new(msg_type,args) - __Message() = new(255,cell(0)) end -type __PartialMessageBuffer - current::__Message - num_args::Uint8 - curArg::ASCIIString - curArgLength::Int32 - curArgHeaderByteNum::Uint8 - curArgPos::Int32 - __PartialMessageBuffer()=new(__Message(),255,"",0,0,1) +# read a message +function __read_message() + msg_type = read(__io, Uint8) + args = {} + num_args = read(__io, Uint8) + for i=1:num_args + arg_length = read(__io, Uint32) + arg = ASCIIString(read(__io, Uint8, arg_length)) + push(args, arg) + end + return __Message(msg_type, args) end # send a message -function __write_message(client::TcpSocket,msg) - @debug_only __print_message(msg) - write(client, uint8(msg.msg_type)) - write(client, uint8(length(msg.args))) +function __write_message(msg) + write(__io, uint8(msg.msg_type)) + write(__io, uint8(length(msg.args))) for arg=msg.args - write(client, uint32(length(arg))) - write(client, arg) + write(__io, uint32(length(arg))) + write(__io, arg) end + #flush(__io) end -__write_message(msg) = __write_message(__client,msg) # print a message (useful for debugging) function __print_message(msg) - println(STDERR,"Writing message: ",msg.msg_type) - println(STDERR,"Number of arguments: ",length(msg.args)) - show(STDERR,msg.args) + print(msg.msg_type) + print(": [ ") for arg=msg.args - println(STDERR,"Argument Length: ",length(arg)) + print("\"") + print(arg) + print("\" ") end + println("]") end ########################################### @@ -111,153 +93,84 @@ load("julia_web.jl") # store the result of the previous input ans = nothing - # callback for that event handler -function __socket_callback(client::TcpSocket,p::__PartialMessageBuffer,stream::TcpSocket) - @debug_only println(STDERR,"received") - arr = stream.buffer.data - @debug_only println(STDERR,"Callback: ",arr) - pos = 0 - nread = stream.buffer.ptr-1 - while(pos=p.num_args) - __msg=p.current - p.current=__Message() - p.num_args=255 - - # MSG_INPUT_EVAL - if __msg.msg_type == __MSG_INPUT_EVAL && length(__msg.args) == 3 - @debug_only println(STDERR,"Evaluating input") - # parse the arguments - __user_name = __msg.args[1] - __user_id = __msg.args[2] - __input = __msg.args[3] - - # split the input into lines - __lines = split(__input, '\n') - - # try to parse each line incrementally - __parsed_exprs = {} - __input_so_far = "" - __all_nothing = true - - for i=1:length(__lines) - # add the next line of input - __input_so_far = strcat(__input_so_far, __lines[i], "\n") - - # try to parse it - __expr = parse_input_line(__input_so_far) - - # if there was nothing to parse, just keep going - if __expr == nothing - continue - end - __all_nothing = false - __expr_multitoken = isa(__expr, Expr) - - # stop now if there was a parsing error - if __expr_multitoken && __expr.head == :error - # send everyone the input - __write_message(client,__Message(__MSG_OUTPUT_EVAL_INPUT, {__user_id, __user_name, __input})) - __write_message(client,__Message(__MSG_OUTPUT_EVAL_ERROR, {__user_id, __expr.args[1]})) - stream.buffer.ptr = 1 - return true - end - - # if the expression was incomplete, just keep going - if __expr_multitoken && __expr.head == :continue - continue - end - - # add the parsed expression to the list - __input_so_far = "" - __parsed_exprs = [__parsed_exprs, {(__user_id, __expr)}] - end - - # if the input was empty, stop early - if __all_nothing - # send everyone the input - __write_message(client,__Message(__MSG_OUTPUT_EVAL_INPUT, {__user_id, __user_name, __input})) - __write_message(client,__Message(__MSG_OUTPUT_EVAL_RESULT, {__user_id, ""})) - stream.buffer.ptr = 1 - return true - end - - # tell the browser if we didn't get a complete expression - if length(__parsed_exprs) == 0 - __write_message(client,__Message(__MSG_OUTPUT_EVAL_INCOMPLETE, {__user_id})) - stream.buffer.ptr = 1 - return true - end - # send everyone the input - __write_message(client,__Message(__MSG_OUTPUT_EVAL_INPUT, {__user_id, __user_name, __input})) + # if the input was empty, stop early + if __all_nothing + # send everyone the input + __write_message(__Message(__MSG_OUTPUT_EVAL_INPUT, {__user_id, __user_name, __input})) + return __write_message(__Message(__MSG_OUTPUT_EVAL_RESULT, {__user_id, ""})) + end - __eval_exprs(client, __parsed_exprs) - end + # tell the browser if we didn't get a complete expression + if length(__parsed_exprs) == 0 + return __write_message(__Message(__MSG_OUTPUT_EVAL_INCOMPLETE, {__user_id})) end + + # send everyone the input + __write_message(__Message(__MSG_OUTPUT_EVAL_INPUT, {__user_id, __user_name, __input})) + + put(__eval_channel, __parsed_exprs) end - stream.buffer.ptr=1 - return true end +# event handler for socket input +enq_work(@task while true __socket_callback(__io) end) web_show(user_id, ans) = __Message(__MSG_OUTPUT_EVAL_RESULT, {user_id, sprint(repl_show, ans)}) -function __eval_exprs(client,__parsed_exprs) global ans +function __eval_exprs(__parsed_exprs) + global ans user_id = "" # try to evaluate the expressions @@ -267,15 +180,15 @@ function __eval_exprs(client,__parsed_exprs) global ans try ans = eval(__parsed_exprs[i][2]) catch __error - return __write_message(client,__Message(__MSG_OUTPUT_EVAL_ERROR, {user_id,sprint(repl_show, __error)})) + return __write_message(__Message(__MSG_OUTPUT_EVAL_ERROR, {user_id, sprint(show, __error)})) end end - + # send the result of the last expression if isa(ans,Nothing) - return __write_message(client,__Message(__MSG_OUTPUT_EVAL_RESULT, {user_id,""})) + return __write_message(__Message(__MSG_OUTPUT_EVAL_RESULT, {user_id, ""})) else - return __write_message(client, web_show(user_id, ans)) + return __write_message(web_show(user_id, ans)) end end @@ -290,10 +203,8 @@ println(Base._jl_commit_string, "\n") # wait forever while asynchronous processing happens ########################################### +__eval_channel = RemoteRef() + while true -try - Base.run_event_loop() -catch(err) - print(err) -end + __eval_exprs(take(__eval_channel)) end diff --git a/src/wrapper.c b/src/wrapper.c index 6c99c93e3e5ec..1079580a345e1 100755 --- a/src/wrapper.c +++ b/src/wrapper.c @@ -352,8 +352,8 @@ DLLEXPORT int jl_pututf8(uv_stream_t *s, uint32_t wchar ) } static char chars[] = { - 0, 1, 2, 3, 4, 5, 6, 7, - 8, 9, 10, 11, 12, 13, 13, 15, + 0, 1, 2, 3, 4, 5, 6, 7, + 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, @@ -374,7 +374,7 @@ static char chars[] = { 152,153,154,155,156,157,158,159, 160,161,162,163,164,165,166,167, 168,169,170,171,172,173,174,175, - 167,177,178,179,180,181,182,183, + 176,177,178,179,180,181,182,183, 184,185,186,187,188,189,190,191, 192,193,194,195,196,197,198,199, 200,201,202,203,204,205,206,207, @@ -386,29 +386,31 @@ static char chars[] = { 248,249,250,251,252,253,254,255 }; -void jl_free_buffer() {} +static void jl_free_buffer(uv_write_t* req, int status) { + free(req); +} -DLLEXPORT int jl_putc(unsigned char c, uv_stream_t *stream) -{ +DLLEXPORT int jl_putc(unsigned char c, uv_stream_t *stream) { if(stream->typetype