Skip to content

Commit

Permalink
do not use myid() to differentiate master & worker (#32879)
Browse files Browse the repository at this point in the history
Occasionally while adding a large number of workers and particularly worker-worker connections are not lazy, it is possible to encounter the following error:

```
ERROR (unhandled task failure): MethodError: no method matching manage(::Base.Distributed.DefaultClusterManager, ::Int64, ::WorkerConfig, ::Symbol)
Closest candidates are:
  manage(!Matched::Base.Distributed.SSHManager, ::Integer, ::WorkerConfig, ::Symbol) at distributed/managers.jl:224
  manage(!Matched::Base.Distributed.LocalManager, ::Integer, ::WorkerConfig, ::Symbol) at distributed/managers.jl:337
  manage(!Matched::Union{ClusterManagers.PBSManager, ClusterManagers.QRSHManager, ClusterManagers.SGEManager}, ::Int64, ::WorkerConfig, ::Symbol) at /home/jrun/.julia/v0.6/ClusterManagers/src/qsub.jl:115
  ...
Stacktrace:
 [1] deregister_worker(::Base.Distributed.ProcessGroup, ::Int64) at ./distributed/cluster.jl:903
 [2] message_handler_loop(::TCPSocket, ::TCPSocket, ::Bool) at ./distributed/process_messages.jl:220
 [3] process_tcp_streams(::TCPSocket, ::TCPSocket, ::Bool) at ./distributed/process_messages.jl:118
 [4] (::Base.Distributed.##101#102{TCPSocket,TCPSocket,Bool})() at ./event.jl:73
```

It can be simulated with this exact sequence of events:
- worker2 in process of connecting to master
    - master has received the worker2s listen port, connected to it, sent the JoinPGRP message to it
    - master is now aware of worker2, and has added it to its list of workers
    - worker2 has still not processed the JoinPGRP message, so it is still unaware of its worker id
- worker3 now connects to master
    - master sends the JoinPGRP message along with list of existing workers that includes worker2
- worker3 connects to worker2
- worker2 receives a new connection from worker3 and attempts to process it
- worker3 faces an error and exits, thus breaking the connection
- worker2 gets an error processing message from worker3
    - goes into error handling
    - the current error handling code sees the self pid as 1 and incorrectly thinks it is the master
    - attempts to process the worker disconnection as a master and gets the error we see

The MethodError prevents proper cleanup at the worker where it happens.

The issue seems to be that it is not correct to identify whether a Julia process is master or worker by looking at the process id. Instead we should have a dedicated indicator for that.

This change adds a new local process role variable that is set to `:master` by default, but is set to `:worker` when `start_worker` is invoked. This allows a process to know that it is running as a worker irrespective of whether it has received a process id or not.
  • Loading branch information
tanmaykm authored and JeffBezanson committed Aug 15, 2019
1 parent 44d54fe commit a7ac267
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 1 deletion.
11 changes: 10 additions & 1 deletion src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ process as a worker using TCP/IP sockets for transport.
`cookie` is a [`cluster_cookie`](@ref).
"""
function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
myrole!(:worker)

# On workers, the default cluster manager connects via TCP sockets. Custom
# transports will need to call this function with their own manager.
global cluster_manager
Expand Down Expand Up @@ -783,12 +785,19 @@ end

# globals
const LPROC = LocalProcess()
const LPROCROLE = Ref{Symbol}(:master)
const HDR_VERSION_LEN=16
const HDR_COOKIE_LEN=16
const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}()
const map_sock_wrkr = IdDict()
const map_del_wrkr = Set{Int}()

# whether process is a master or worker in a distributed setup
myrole() = LPROCROLE[]
function myrole!(proctype::Symbol)
LPROCROLE[] = proctype
end

# cluster management related API
"""
myid()
Expand Down Expand Up @@ -1108,7 +1117,7 @@ function deregister_worker(pg, pid)
end
end

if myid() == 1 && isdefined(w, :config)
if myid() == 1 && (myrole() === :master) && isdefined(w, :config)
# Notify the cluster manager of this workers death
manage(w.manager, w.id, w.config, :deregister)
if PGRP.topology != :all_to_all || isclusterlazy()
Expand Down
10 changes: 10 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ end
id_me = myid()
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]

# Test role
@everywhere using Distributed
@test Distributed.myrole() === :master
for wid = workers()
wrole = remotecall_fetch(wid) do
Distributed.myrole()
end
@test wrole === :worker
end

# Test remote()
let
pool = default_worker_pool()
Expand Down

0 comments on commit a7ac267

Please sign in to comment.