Skip to content

Commit

Permalink
change open(cmd) to return a Process object (without a separate IO va…
Browse files Browse the repository at this point in the history
…lue)

This reverts commit 8ffdfc2,
and fixes it a bit too :)

fix #9659
  • Loading branch information
vtjnash committed Apr 24, 2017
1 parent 502d96f commit 81dbe44
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 70 deletions.
4 changes: 2 additions & 2 deletions base/distributed/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ function launch_additional(np::Integer, cmd::Cmd)
addresses = Vector{Any}(np)

for i in 1:np
io, pobj = open(pipeline(detach(cmd), stderr=STDERR), "r")
io_objs[i] = io
io = open(detach(cmd))
io_objs[i] = io.out
end

for (i,io) in enumerate(io_objs)
Expand Down
13 changes: 6 additions & 7 deletions base/distributed/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ function launch_on_machine(manager::SSHManager, machine, cnt, params, launched,
# detach launches the command in a new process group, allowing it to outlive
# the initial julia process (Ctrl-C and teardown methods are handled through messages)
# for the launched processes.
io, pobj = open(pipeline(detach(cmd), stderr=STDERR), "r")
io = open(detach(cmd))

wconfig = WorkerConfig()
wconfig.io = io
wconfig.io = io.out
wconfig.host = host
wconfig.tunnel = params[:tunnel]
wconfig.sshflags = sshflags
Expand Down Expand Up @@ -321,12 +321,11 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi
bind_to = manager.restrict ? `127.0.0.1` : `$(LPROC.bind_addr)`

for i in 1:manager.np
io, pobj = open(pipeline(detach(
setenv(`$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker $(cluster_cookie())`, dir=dir)),
stderr=STDERR), "r")
cmd = `$(julia_cmd(exename)) $exeflags --bind-to $bind_to --worker $(cluster_cookie())`
io = open(detach(setenv(cmd, dir=dir)))
wconfig = WorkerConfig()
wconfig.process = pobj
wconfig.io = io
wconfig.process = io
wconfig.io = io.out
wconfig.enable_threaded_blas = params[:enable_threaded_blas]
push!(launched, wconfig)
end
Expand Down
34 changes: 17 additions & 17 deletions base/loading.jl
Original file line number Diff line number Diff line change
Expand Up @@ -586,14 +586,15 @@ function create_expr_cache(input::String, output::String, concrete_deps::Vector{
eval(Main, deserialize(STDIN))
end
"""
io, pobj = open(pipeline(detach(`$(julia_cmd()) -O0
--output-ji $output --output-incremental=yes
--startup-file=no --history-file=no
--color=$(have_color ? "yes" : "no")
--eval $code_object`), stderr=STDERR),
"w", STDOUT)
io = open(pipeline(detach(`$(julia_cmd()) -O0
--output-ji $output --output-incremental=yes
--startup-file=no --history-file=no
--color=$(have_color ? "yes" : "no")
--eval $code_object`), stderr=STDERR),
"w", STDOUT)
in = io.in
try
serialize(io, quote
serialize(in, quote
empty!(Base.LOAD_PATH)
append!(Base.LOAD_PATH, $LOAD_PATH)
empty!(Base.LOAD_CACHE_PATH)
Expand All @@ -606,22 +607,21 @@ function create_expr_cache(input::String, output::String, concrete_deps::Vector{
end)
source = source_path(nothing)
if source !== nothing
serialize(io, quote
serialize(in, quote
task_local_storage()[:SOURCE_PATH] = $(source)
end)
end
serialize(io, :(Base.include($(abspath(input)))))
serialize(in, :(Base.include($(abspath(input)))))
if source !== nothing
serialize(io, :(delete!(task_local_storage(), :SOURCE_PATH)))
serialize(in, :(delete!(task_local_storage(), :SOURCE_PATH)))
end
close(io)
wait(pobj)
return pobj
catch
kill(pobj)
close(io)
rethrow()
close(in)
catch ex
close(in)
process_running(io) && Timer(t -> kill(io), 5.0) # wait a short time before killing the process to give it a chance to clean up on its own first
rethrow(ex)
end
return io
end

compilecache(mod::Symbol) = compilecache(string(mod))
Expand Down
63 changes: 37 additions & 26 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -577,38 +577,45 @@ the process's standard input and `stdio` optionally specifies the process's stan
stream.
"""
function open(cmds::AbstractCmd, mode::AbstractString="r", other::Redirectable=DevNull)
if mode == "r"
if mode == "r+" || mode == "w+"
other === DevNull || throw(ArgumentError("no other stream for mode rw+"))
in = Pipe()
out = Pipe()
processes = spawn(cmds, (in,out,STDERR))
close(in.out)
close(out.in)
elseif mode == "r"
in = other
out = io = Pipe()
out = Pipe()
processes = spawn(cmds, (in,out,STDERR))
close(out.in)
elseif mode == "w"
in = io = Pipe()
in = Pipe()
out = other
processes = spawn(cmds, (in,out,STDERR))
close(in.out)
else
throw(ArgumentError("mode must be \"r\" or \"w\", not \"$mode\""))
end
return (io, processes)
return processes
end

"""
open(f::Function, command, mode::AbstractString="r", stdio=DevNull)
Similar to `open(command, mode, stdio)`, but calls `f(stream)` on the resulting read or
write stream, then closes the stream and waits for the process to complete. Returns the
value returned by `f`.
Similar to `open(command, mode, stdio)`, but calls `f(stream)` on the resulting process
stream, then closes the input stream and waits for the process to complete.
Returns the value returned by `f`.
"""
function open(f::Function, cmds::AbstractCmd, args...)
io, P = open(cmds, args...)
P = open(cmds, args...)
ret = try
f(io)
catch
f(P)
catch e
kill(P)
rethrow()
rethrow(e)
finally
close(io)
close(P.in)
end
success(P) || pipeline_error(P)
return ret
Expand All @@ -623,15 +630,14 @@ Starts running a command asynchronously, and returns a tuple (stdout,stdin,proce
output stream and input stream of the process, and the process object itself.
"""
function readandwrite(cmds::AbstractCmd)
in = Pipe()
out, processes = open(cmds, "r", in)
(out, in, processes)
processes = open(cmds, "r+")
return (processes.out, processes.in, processes)
end

function read(cmd::AbstractCmd, stdin::Redirectable=DevNull)
out, procs = open(cmd, "r", stdin)
bytes = read(out)
!success(procs) && pipeline_error(procs)
procs = open(cmd, "r", stdin)
bytes = read(procs.out)
success(procs) || pipeline_error(procs)
return bytes
end

Expand All @@ -656,9 +662,17 @@ function run(cmds::AbstractCmd, args...)
success(ps) ? nothing : pipeline_error(ps)
end

const SIGPIPE = 13
# some common signal numbers that are usually available on all platforms
# and might be useful as arguments to `kill` or testing against `Process.termsignal`
const SIGHUP = 1
const SIGINT = 2
const SIGQUIT = 3 # !windows
const SIGKILL = 9
const SIGPIPE = 13 # !windows
const SIGTERM = 15

function test_success(proc::Process)
assert(process_exited(proc))
@assert process_exited(proc)
if proc.exitcode < 0
#TODO: this codepath is not currently tested
throw(UVError("could not start process $(string(proc.cmd))", proc.exitcode))
Expand All @@ -668,8 +682,7 @@ end

function success(x::Process)
wait(x)
kill(x)
test_success(x)
return test_success(x)
end
success(procs::Vector{Process}) = mapreduce(success, &, procs)
success(procs::ProcessChain) = success(procs.processes)
Expand Down Expand Up @@ -705,8 +718,6 @@ function pipeline_error(procs::ProcessChain)
error(msg)
end

_jl_kill(p::Process, signum::Integer) = ccall(:uv_process_kill, Int32, (Ptr{Void},Int32), p.handle, signum)

"""
kill(p::Process, signum=SIGTERM)
Expand All @@ -715,14 +726,14 @@ Send a signal to a process. The default is to terminate the process.
function kill(p::Process, signum::Integer)
if process_running(p)
@assert p.handle != C_NULL
_jl_kill(p, signum)
ccall(:uv_process_kill, Int32, (Ptr{Void}, Int32), p.handle, signum)
else
Int32(-1)
end
end
kill(ps::Vector{Process}) = map(kill, ps)
kill(ps::ProcessChain) = map(kill, ps.processes)
kill(p::Process) = kill(p, 15) #SIGTERM
kill(p::Process) = kill(p, SIGTERM)

function _contains_newline(bufptr::Ptr{Void}, len::Int32)
return (ccall(:memchr, Ptr{Void}, (Ptr{Void},Int32,Csize_t), bufptr, '\n', len) != C_NULL)
Expand Down
4 changes: 2 additions & 2 deletions examples/clustermanager/simple/UnixDomainCM.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ function launch(manager::UnixDomainCM, params::Dict, launched::Array, c::Conditi
sockname = tempname()
try
cmd = `$(params[:exename]) --startup-file=no $(@__FILE__) udwrkr $sockname $cookie`
io, pobj = open(cmd, "r")
pobj = open(cmd)

wconfig = WorkerConfig()
wconfig.userdata = Dict(:sockname=>sockname, :io=>io, :process=>pobj)
wconfig.userdata = Dict(:sockname=>sockname, :io=>pobj.out, :process=>pobj)
push!(launched, wconfig)
notify(c)
catch e
Expand Down
12 changes: 4 additions & 8 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -901,11 +901,7 @@ if DoFullTest
# is already running in parallel under the default topology.
script = joinpath(dirname(@__FILE__), "topology.jl")
cmd = `$(Base.julia_cmd()) $script`

(strm, proc) = open(pipeline(cmd, stderr=STDERR))
wait(proc)
if !success(proc) && ccall(:jl_running_on_valgrind,Cint,()) == 0
println(readstring(strm))
if !success(pipeline(cmd, stdout=STDOUT)) && ccall(:jl_running_on_valgrind,Cint,()) == 0
error("Topology tests failed : $cmd")
end

Expand Down Expand Up @@ -1310,11 +1306,11 @@ function Base.launch(manager::ErrorSimulator, params::Dict, launched::Array, c::
else
error("Unknown mode")
end
io, pobj = open(pipeline(detach(setenv(cmd, dir=dir)); stderr=STDERR), "r")
io = open(detach(setenv(cmd, dir=dir)))

wconfig = WorkerConfig()
wconfig.process = pobj
wconfig.io = io
wconfig.process = io
wconfig.io = io.out
push!(launched, wconfig)
notify(c)
end
Expand Down
16 changes: 8 additions & 8 deletions test/spawn.jl
Original file line number Diff line number Diff line change
Expand Up @@ -242,26 +242,26 @@ let fname = tempname()
function thrash(handle::Ptr{Void})
# Kill the memory, but write a nice low value in the libuv type field to
# trigger the right code path
ccall(:memset,Ptr{Void},(Ptr{Void},Cint,Csize_t),handle,0xee,3*sizeof(Ptr{Void}))
unsafe_store!(convert(Ptr{Cint},handle+2*sizeof(Ptr{Void})),15)
ccall(:memset, Ptr{Void}, (Ptr{Void}, Cint, Csize_t), handle, 0xee, 3 * sizeof(Ptr{Void}))
unsafe_store!(convert(Ptr{Cint}, handle + 2 * sizeof(Ptr{Void})), 15)
nothing
end
OLD_STDERR = STDERR
redirect_stderr(open("$(escape_string(fname))","w"))
redirect_stderr(open("$(escape_string(fname))", "w"))
# Usually this would be done by GC. Do it manually, to make the failure
# case more reliable.
oldhandle = OLD_STDERR.handle
OLD_STDERR.status = Base.StatusClosing
OLD_STDERR.handle = C_NULL
ccall(:uv_close,Void,(Ptr{Void},Ptr{Void}),oldhandle,cfunction(thrash,Void,(Ptr{Void},)))
ccall(:uv_close, Void, (Ptr{Void}, Ptr{Void}), oldhandle, cfunction(thrash, Void, (Ptr{Void},)))
sleep(1)
import Base.zzzInvalidIdentifier
"""
try
(in,p) = open(pipeline(`$exename --startup-file=no`, stderr=STDERR), "w")
write(in,cmd)
close(in)
wait(p)
io = open(pipeline(`$exename --startup-file=no`, stderr=STDERR), "w")
write(io, cmd)
close(io)
wait(io)
catch
error("IOStream redirect failed. Child stderr was \n$(readstring(fname))\n")
finally
Expand Down

0 comments on commit 81dbe44

Please sign in to comment.