Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data_store_mgr: n-window depth field #5609

Closed
oliver-sanders opened this issue Jun 29, 2023 · 6 comments · Fixed by #5660
Closed

data_store_mgr: n-window depth field #5609

oliver-sanders opened this issue Jun 29, 2023 · 6 comments · Fixed by #5660
Assignees
Milestone

Comments

@oliver-sanders
Copy link
Member

oliver-sanders commented Jun 29, 2023

Edit

I had misinterpreted what the depth field meant, turns out it was inheritance depth not window depth, have added a description into the schema to clarify.

This issue now relates to the problem of reporting the n-window depth via GraphQL.

This will be required in order to filter tasks by different n-window levels e.g:


Original Issue (invalid)

Discovered whilst working on #5600, the depth field (which holds the n-window value of a task) is giving some funky results.

Note: Results in this issue were obtained on #5600 which includes a fix where n=0 tasks were reporting as depth=1.

Reproducible Example:

In this example:

[scheduling]                   
    runahead limit = P2
    initial cycle point = 1
    cycling mode = integer                                                      
    [[graph]]                  
        P1 = """               
            a => b => c => d    
            b[-P1] => b        
        """                    
                               
[scheduler]                    
    allow implicit tasks = True  

Then set the window extent to, say, 5.

Several tasks have the wrong depth number.

E.G. 2/c has depth=3, but it should actually be depth=2:

  • 2/a is n=0
  • 2/b is n=1
  • 2/c is n=2 # we're getting n=3
  • 2/d is n=3 # we're getting n=4

Problem

The issue is that the same task can have different n-window numbers when the graph is traced from different n=0 nodes.

E.G. There are three routes from the n=0 window to the task 2/c:

  • Via 1/a:
    • 1/a n=0
    • 1/b n=1
    • 2/b n=2
    • 2/c n=3 # Reported
  • Via 2/a:
    • 2/a n=0
    • 2/b n=1
    • 2/c n=2 # Correct
  • Via 3/a (runahead)
    • 3/a n=0
    • 3/b n=1
    • 2/b n=2
    • 2/c n=3 # Reported

At present the algorithm is iterating in the following order (window-extent=5):

  • 1/a # iterate from n=0 to n=1
  • 1/b
  • 2/a
  • 2/b
  • 3/a
  • 3/b
  • 4/a
  • 4/b
  • 1/a # then go back to n=0 and flesh things out from there
  • 1/b
  • 1/c
  • 1/d
  • 2/b
  • 2/c
  • 2/d
  • 3/b
  • 3/c
  • 3/d

I initially thought that we might be able to get around this by following inter-cycle edges first so that when we come back to flesh out each cycle later we override with lower values.

But I'm not sure that would work, I think we need to store the depth of tasks and amend it when we find a shorter route to the task. This might be best done in combination with #5485 which requires something similar.

Integration Test

async def test_depth(flow, scheduler, start):
    """The n-window value of a task is in the "depth" field."""
    id_ = flow({
        'scheduler': {
            'allow implicit tasks': 'true',
        },
        'scheduling': {
            'initial cycle point': '1',
            'cycling mode': 'integer',
            'runahead limit': 'P2',
            'graph': {
                'P1': '''
                    a => b => c => d
                    b[-P1] => b
                '''
            },
        },
    })
    schd = scheduler(id_)
    async with start(schd):
        schd.data_store_mgr.set_graph_window_extent(5)

        await schd.update_data_structure()

        task_a = schd.pool.get_tasks()[0]

        schd.data_store_mgr.increment_graph_window(
            task_a.tokens,
            task_a.point,
            task_a.flow_nums,
            edge_distance=0,
        )

        # incidentally, we seem to need to call update_data_structure both
        # before and after increment_graph_window to flush the results out
        # of the scheduler???
        await schd.update_data_structure()

        tasks = [
            (Tokens(task.id).relative_id, task.depth)
            for task in
            schd.data_store_mgr.get_data_elements(TASK_PROXIES).added
        ]
        assert set(tasks) == {
            ('1/a', 0),  # n=0
            ('1/b', 1),
            ('1/c', 2),
            ('1/d', 3),

            ('2/a', 0),  # n=0
            ('2/b', 1),
            ('2/c', 2),
            ('2/d', 3),

            # runahead limit

            ('3/a', 0),  # n=0 (runahead)
            ('3/b', 1),
            ('3/c', 2),
            ('3/d', 3),

            ('4/a', 3),
            ('4/b', 2),
            ('4/c', 3),
            ('4/d', 4),

            ('5/a', 4),
            ('5/b', 3),
            ('5/c', 4),
            ('5/d', 5),

            ('6/b', 5),
        }
@oliver-sanders oliver-sanders added the bug Something is wrong :( label Jun 29, 2023
@oliver-sanders oliver-sanders added this to the cylc-8.3.0 milestone Jun 29, 2023
@oliver-sanders
Copy link
Member Author

I don't think this is an issue for the n=1 window in the GUI, because the algorithm is listing all the right tasks, it's just giving them the wrong depth number. At present the depth number isn't used for anything so this is safe.

@hjoliver
Copy link
Member

I think we need to store the depth of tasks and amend it when we find a shorter route to the task. This might be best done in combination with #5485 which requires something similar.

Without looking at the code, this sounds like it should be pretty simple (store depth value in the datastore task proxy; if the same task is encountered again, override the depth value if the new value is lower). Is it not as straightforward as that?

@dwsutherland
Copy link
Member

dwsutherland commented Jun 30, 2023

This field is, in some ways, an artifact of when the window was generated in a different way.. And I could be wrong, but I thought I originally used it as depth in the family hierarchy, but may have repurposed it at some point....

Without looking at the code, this sounds like it should be pretty simple (store depth value in the datastore task proxy; if the same task is encountered again, override the depth value if the new value is lower). Is it not as straightforward as that?

Not sure it's that simple (although I haven't thought about this in a while), because if we're talking about edge depth from the closest active task.. Sometimes it will increase as the active front moves toward the future..

WRT - graph re-walk avoidance, at the moment I'm entertaining the idea of just hold child and parent sets (of ID strings) for every task in the graph.. So Instead of regenerating parents/children, I can create the association (with the active task) and move on... However, I may have to stratify these parent/child sets by depth to avoid iterating over the children (which would be probably be best)..

@oliver-sanders
Copy link
Member Author

oliver-sanders commented Jun 30, 2023

originally used it as depth in the family hierarchy,

Yikes, I could be completely misinterpreting it!

Is it not as straightforward as that?

If a shorter route is found to a task, then the depth of tasks downstream of it would also need to be amended.

But it may be simpler to solve this with an implementation change.

I'm entertaining the idea of just hold child and parent sets
can create the association (with the active task) and move on

Makes sense, to avoid re-walking nodes we will need to hold the set somehow.

There must be a maths'y solution to this, having a quick think, I wonder if we could represent this internally as an adjacency matrix, filling in only the "1" values and use some fancy algorithm to fill in the values we haven't computed yet.

  • Determining the n-window values would involve reading out the numbers from each row corresponding to an active task and taking the minimum for each.
  • Nodes have drifted from the window if their values exceed the n-window.
  • We search for new nodes around tasks with a value of the n-window - 1, we only need to follow one edge, the missing values can be filled in from these.

There's a nice side-effect of this which is that you can read out the n-window around any task within the global n-window.

Bridges to cross:

  • Need a 2D data structure where adding/removing rows/cols is efficient.
  • Need a fancy algorithm to fill in missing values from an adjacency matrix with only the 1's filled in.

Possible benefit of a maths'y approach is that the bulk of the work is sufficiently abstract that it could be compiled e.g. by numba or using a library like numpy.

@oliver-sanders
Copy link
Member Author

oliver-sanders commented Jun 30, 2023

Ok, had a play and think this method works.

The approach is to decant the graph into a matrix. E.G. the first two cycles of this graph:

a => b => c
b[-P1] => b

Can be extracted into this matrix:

     1/a   1/b   1/c   2/a   2/b   2/c  
1/a   0     1                            
1/b   1     0     1           1          
1/c         1     0                      
2/a                     0     1          
2/b         1           1     0     1    
2/c                           1     0   
  • 0 means the row and col are the same node.
  • 1 means the row and col are immediately adjacent.

Once you've got all of the 0's and 1's into the matrix, it's possible to fill in all of the blank values by traversing the matrix:

     1/a   1/b   1/c   2/a   2/b   2/c  
1/a   0     1     2     3     2     3    
1/b   1     0     1     2     1     2    
1/c   2     1     0     3     2     3    
2/a   3     2     3     0     1     2    
2/b   2     1     2     1     0     1    
2/c   3     2     3     2     1     0    

To read out the n-window from the matrix, take the lowest value for each column, in each row corresponding to an n=0 task.

I.E. If the n-0 window is {1/a, 2/a}:

     1/a   1/b   1/c   2/a   2/b   2/c  
1/a   0     1     2     3     2     3    
2/a   3     2     3     0     1     2    
--------------------------------------
n=0   0     1     2     0     1     2
  • n=0
    • 1/a
    • 2/a
  • n=1
    • 1/b
    • 2/b
  • n=2
    • 1/c
    • 2/c

The joy of this approach is that the adjacency values never need to be recomputed. We can read out the same matrix for different n-windows e.g. if the n-window changed to {1/b, 2/a}:

     1/a   1/b   1/c   2/a   2/b   2/c  

1/b   1     0     1     2     1     2    
2/a   3     2     3     0     1     2    
--------------------------------------
n=0   1     0     1     0     1     2
  • n=0
    • 1/b
    • 2/a
  • n=1
    • 1/a
    • 2/b
  • n=2
    • 2/c

To housekeep the matrix, when we read out the n-window, we would remove any rows/columns corresponding to tasks where the n-value exceeds the desired n-window-extent.

To increment the matrix, we would need to iterate only one level of parents/children around the edges of the window.

The easy way to avoid re-iterating over the same parents/children, is to keep the matrix data one n-window level higher than you want to read out. Because we're only storing a 2D array of integers (which is symmetric so can be optimised) the cost of this should be relatively small.

If there is a discontinuity, i.e. two bits of graph aren't connected, store a value of -1 in the matrix and try to find a join between the parts with a later iteration.

Here's some example code which takes us through a set of state changes for the workflow:

a => b => c
b[-P1] => b

I've just scattered in the add_node's as needed for a POC. This would need to be event driven from the pool. The set of active tasks would also be provided by the pool.

Code

Really messy POC, needs niceifying and optimising:

nodes = [ : list[str]                                                                                                              
    '1/a',
    '1/b',
    '1/c',
    '2/a',
    '2/b',
    '2/c',
]
matrix = [ : list[list[Unknown]]                                                                                                   
    [0, 1, None, None, None, None],
    [1, 0, 1, None, 1, None],                                                                                                      
    [None, 1, 0, None, None, None],
    [None, None, None, 0, 1, None],
    [None, 1, None, 1, 0, 1],
    [None, None, None, None, 1, 0],
]
 
 
LEN = 5
 
 
def _format(item): -> (LiteralString | str)                                                                                        
    if item is None:
        return ' ' * LEN
    ret = str(item) : str                                                                                                          
    ret += ' ' * (LEN - len(ret))
    return ret
 
 
def print_matrix(matrix): -> None                                                                                                  
    print(_format(None) + ' '.join(_format(id_) for id_ in nodes))
    for id_, row in zip(nodes, matrix):
        print(' '.join(
            [_format(id_)] +
            [_format(value) for value in row]
        ))
 
 
def fill_matrix(matrix, indicies): -> None                                                                                         
    for index in indicies:
        row = matrix[index] : Unknown                                                                                              
        for x, value in enumerate(row):
            if value is None or value == -1:
                count = 2 : Literal[2]                                                                                             
                _nodes = { : set[int]                                                                                              
                    _ind
                    for _ind, _value in enumerate(matrix[x])
                    if _value == 1
                }
                _visited = set() : set[Unknown]                                                                                    
                while _nodes:
                    _nodes = { : set[int]                                                                                          
                        _ind
                        for _node in _nodes
                        for _ind, _value in enumerate(matrix[_node])
                        if _value == 1
                        if _ind not in _visited
                    }
                    if _nodes & {index}:
                        matrix[index][x] = count : Literal[2]                                                                      
                        break
                    count += 1
                    _visited |= _nodes
                if not _nodes:
                    matrix[index][x] = -1
 
 
def add_node(matrix, id_, adjacents): -> int                                                                                       
    length = len(matrix) : int                                                                                                     
    for row in matrix:
        row.append(None)
    matrix.append([None] * (length + 1))                                   
    nodes.append(id_)                                                                                                              
    # add the self-relationship                                            
    matrix[length][length] = 0                                             
    # add 1's for immediately adjacent nodes                                                                                       
    for adj in adjacents:                                                                                                          
        matrix[adj][length] = 1                                            
        matrix[length][adj] = 1                                            
    return length          

def remove_node(matrix, id_): -> None                                                                                              
    # if id_ not in nodes:
    #     return
    ind = nodes.index(id_) : int                                                                                                   
    for row in matrix:
        row.pop(ind)
    matrix.pop(ind)
    nodes.pop(ind)
 
 
def readout_window(matrix, n_0, extent): -> dict[str, Unknown]                                                                     
    fill_matrix(matrix, n_0)
    ret = { : dict[str, Unknown]                                                                                                   
        node: min({
            matrix[n_0_node][ind]
            for n_0_node in n_0
        } - {None, -1})
        for ind, node in enumerate(nodes)
    }
    for key, value in list(ret.items()):
        if value > extent:
            remove_node(matrix, key)
            ret.pop(key)
    return ret
 
 
active={'1/a', '2/a'} : set[str]                                                                                                   
print(f'# Initialise Matrix active={active}')
print_matrix(matrix)
print('# Readout Window (n=1)')
print(readout_window(matrix, {nodes.index(t) for t in active}, 1))
print_matrix(matrix)
 
print('-' * 40)
active={'1/b', '2/a'} : set[str]                                                                                                   
print('1/a:succeeded, 1/b:running')
add_node(matrix, '1/c', {nodes.index('1/b')})
print(readout_window(matrix, {nodes.index(t) for t in active}, 1))
print_matrix(matrix)
 
print('-' * 40)
active={'1/c', '2/a'} : set[str]                                                                                                   
print('1/b:succeeded, 1/c:running')
print(readout_window(matrix, {nodes.index(t) for t in active}, 1))
print_matrix(matrix)
 
print('-' * 40)
active={'1/c', '2/b'} : set[str]                                                                                                   
print('2/a:succeeded, 2/b:running')
add_node(matrix, '2/c', {nodes.index('2/b')})
print(readout_window(matrix, {nodes.index(t) for t in active}, 1))
print_matrix(matrix)
 
print('-' * 40)
active={'1/c', '2/c', '3/a'} : set[str]                                                                                            
print('2/b:succeeded, 2/c & 3/a:running')
add_node(matrix, '3/b', {nodes.index('2/b')})
add_node(matrix, '2/b', {nodes.index('3/b')})
add_node(matrix, '3/a', {nodes.index('3/b')})
print(readout_window(matrix, {nodes.index(t) for t in active}, 1))
print_matrix(matrix)    

Output:

# Initialise Matrix active={'1/a', '2/a'}
     1/a   1/b   1/c   2/a   2/b   2/c  
1/a   0     1                            
1/b   1     0     1           1          
1/c         1     0                      
2/a                     0     1          
2/b         1           1     0     1    
2/c                           1     0    
# Readout Window (n=1)
{'1/a': 0, '1/b': 1, '2/a': 0, '2/b': 1}
     1/a   1/b   2/a   2/b  
1/a   0     1     3     2    
1/b   1     0           1    
2/a   3     2     0     1    
2/b         1     1     0    
----------------------------------------
1/a:succeeded, 1/b:running
{'1/a': 1, '1/b': 0, '2/a': 0, '2/b': 1, '1/c': 1}
     1/a   1/b   2/a   2/b   1/c  
1/a   0     1     3     2          
1/b   1     0     2     1     1    
2/a   3     2     0     1     3    
2/b         1     1     0          
1/c         1                 0    
----------------------------------------
1/b:succeeded, 1/c:running
{'1/b': 1, '2/a': 0, '2/b': 1, '1/c': 0}
     1/b   2/a   2/b   1/c  
1/b   0     2     1     1    
2/a   2     0     1     3    
2/b   1     1     0          
1/c   1     3     2     0    
----------------------------------------
2/a:succeeded, 2/b:running
{'1/b': 1, '2/a': 1, '2/b': 0, '1/c': 0, '2/c': 1}
     1/b   2/a   2/b   1/c   2/c  
1/b   0     2     1     1          
2/a   2     0     1     3          
2/b   1     1     0     2     1    
1/c   1     3     2     0     3    
2/c               1           0    
----------------------------------------
2/b:succeeded, 2/c & 3/a:running
{'1/b': 1, '1/c': 0, '2/c': 0, '3/b': 1, '3/a': 0}
     1/b   1/c   2/c   3/b   2/b   3/a  
1/b   0     1                            
1/c   1     0     3     3     4     4    
2/c   2     3     0     2     3     3    
3/b                     0     1     1    
2/b                     1     0          
3/a   3     4     3     1     2     0   

@oliver-sanders oliver-sanders removed the bug Something is wrong :( label Jul 3, 2023
@oliver-sanders oliver-sanders changed the title data_store_mgr: depth field (i.e. n-window) incorrect data_store_mgr: n-window depth field Jul 5, 2023
@oliver-sanders oliver-sanders modified the milestones: cylc-8.3.0, cylc-8.x Jul 5, 2023
@oliver-sanders
Copy link
Member Author

oliver-sanders commented Jul 12, 2023

See also: cylc/cylc-uiserver#464

It would be good to keep the possibility of moving the increment_graph_window calls from the Scheduler into the UIS in mind when doing this work. In theory we should be able to increment the graph window from either side (providing we initialise the data store with the right services), but in practice this logic will naturally grow into the cylc-flow code where it is located and currently called from.

@oliver-sanders oliver-sanders linked a pull request Aug 10, 2023 that will close this issue
7 tasks
@oliver-sanders oliver-sanders modified the milestones: cylc-8.x, cylc-8.2.3 Oct 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants